Skip to content

Commit 0635ef8

Browse files
Keep subscription fanout worker warm with adaptive linger policy
1 parent 30b6cee commit 0635ef8

File tree

3 files changed

+156
-2
lines changed

3 files changed

+156
-2
lines changed

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::host::module_host::{DatabaseTableUpdate, ModuleEvent, UpdatesRelValue
1010
use crate::subscription::delta::eval_delta;
1111
use crate::subscription::row_list_builder_pool::{BsatnRowListBuilderPool, JsonRowListBuilderFakePool};
1212
use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilderSource};
13+
use crate::util::adaptive_recv::AdaptiveUnboundedReceiver;
1314
use crate::worker_metrics::WORKER_METRICS;
1415
type V2EvalUpdatesResult = (Vec<V2ClientUpdate>, Vec<(SubscriptionIdV2, Box<str>)>, ExecutionMetrics);
1516
use core::mem;
@@ -37,6 +38,7 @@ use std::collections::BTreeMap;
3738
use std::fmt::Debug;
3839
use std::sync::atomic::{AtomicBool, Ordering};
3940
use std::sync::Arc;
41+
use std::time::Duration;
4042
use tokio::sync::{mpsc, oneshot};
4143

4244
/// Clients are uniquely identified by their Identity and ConnectionId.
@@ -1715,7 +1717,7 @@ impl SendWorkerClient {
17151717
/// See comment on the `send_worker_tx` field in [`SubscriptionManager`] for motivation.
17161718
struct SendWorker {
17171719
/// Receiver end of the [`SubscriptionManager`]'s `send_worker_tx` channel.
1718-
rx: mpsc::UnboundedReceiver<SendWorkerMessage>,
1720+
rx: AdaptiveUnboundedReceiver<SendWorkerMessage>,
17191721

17201722
/// `subscription_send_queue_length` metric labeled for this database's `Identity`.
17211723
///
@@ -1756,6 +1758,12 @@ impl Drop for SendWorker {
17561758
}
17571759

17581760
impl SendWorker {
1761+
// Keep the worker warm briefly after handling a message so bursts do not
1762+
// pay a park/unpark cost on every enqueue, while still parking quickly
1763+
// once traffic goes quiet.
1764+
const BASELINE_LINGER: Duration = Duration::from_micros(50);
1765+
const MAX_LINGER: Duration = Duration::from_micros(500);
1766+
17591767
fn is_client_dropped_or_cancelled(&self, client_id: &ClientId) -> bool {
17601768
self.clients
17611769
.get(client_id)
@@ -1814,7 +1822,7 @@ impl SendWorker {
18141822
database_identity_to_clean_up_metric: Option<Identity>,
18151823
) -> Self {
18161824
Self {
1817-
rx,
1825+
rx: AdaptiveUnboundedReceiver::new(rx, Self::BASELINE_LINGER, Self::MAX_LINGER),
18181826
queue_length_metric,
18191827
clients: Default::default(),
18201828
database_identity_to_clean_up_metric,
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
use std::time::Duration;
2+
3+
use tokio::sync::mpsc;
4+
use tokio::time::timeout;
5+
6+
/// Receives from a Tokio unbounded channel with an adaptive linger policy.
7+
///
8+
/// This helper is intended for single-consumer background workers that want
9+
/// to avoid parking on `recv()` after every message during bursty traffic.
10+
///
11+
/// The receiver has two modes - hot and cold. In cold mode it blocks on
12+
/// `recv()` until the next message arrives. In hot mode it prefers to stay
13+
/// awake, so after receiving a message, it will drain the channel and wait
14+
/// a short period (linger) before falling back to cold mode.
15+
///
16+
/// The linger policy is as follows: If a message arrives while we are in
17+
/// the linger window, double the window up to `max_linger`. If the linger
18+
/// timer expires at any point without receiving a new message, reset the
19+
/// window to `baseline_linger`.
20+
///
21+
/// Note, messages returned immediately by `try_recv()` do not count as hits,
22+
/// and do not double the linger window.
23+
#[derive(Debug)]
24+
pub struct AdaptiveUnboundedReceiver<T> {
25+
rx: mpsc::UnboundedReceiver<T>,
26+
linger: AdaptiveLinger,
27+
is_hot: bool,
28+
}
29+
30+
impl<T> AdaptiveUnboundedReceiver<T> {
31+
/// Create an adaptive receiver around a Tokio unbounded channel.
32+
///
33+
/// `baseline_linger` is the linger window used after a cold wakeup or any
34+
/// linger miss. `max_linger` caps how far the linger window may grow after
35+
/// repeated linger hits.
36+
///
37+
/// This constructor does not spawn any tasks and does not alter the
38+
/// channel's ordering semantics. It only configures how aggressively the
39+
/// consumer stays awake after work arrives.
40+
pub fn new(rx: mpsc::UnboundedReceiver<T>, baseline_linger: Duration, max_linger: Duration) -> Self {
41+
Self {
42+
rx,
43+
linger: AdaptiveLinger::new(baseline_linger, max_linger),
44+
is_hot: false,
45+
}
46+
}
47+
48+
/// Receive the next message while adapting how aggressively we linger
49+
/// before parking again.
50+
///
51+
/// Once a worker has been woken up by one message, subsequent calls try to
52+
/// stay on the hot path:
53+
///
54+
/// 1. Drain any already-queued work immediately with `try_recv()`
55+
/// 2. If the queue is empty, wait for the current linger window
56+
/// 3. On a linger hit, double the window and continue lingering
57+
/// 4. On a linger miss, reset the window to the baseline and park on `recv()`
58+
///
59+
/// This minimizes latency during periods of low activity but maximizes
60+
/// throughput during periods of high activity.
61+
pub async fn recv(&mut self) -> Option<T> {
62+
loop {
63+
if !self.is_hot {
64+
let message = self.rx.recv().await?;
65+
self.is_hot = true;
66+
return Some(message);
67+
}
68+
69+
match self.rx.try_recv() {
70+
Ok(message) => return Some(message),
71+
Err(mpsc::error::TryRecvError::Disconnected) => return None,
72+
Err(mpsc::error::TryRecvError::Empty) => {}
73+
}
74+
75+
let linger = self.linger.current();
76+
if linger.is_zero() {
77+
self.cool_down();
78+
continue;
79+
}
80+
81+
match timeout(linger, self.rx.recv()).await {
82+
Ok(Some(message)) => {
83+
self.linger.on_hit();
84+
return Some(message);
85+
}
86+
Ok(None) => return None,
87+
Err(_) => {
88+
self.cool_down();
89+
}
90+
}
91+
}
92+
}
93+
94+
/// Return the receiver to its cold state after a linger miss.
95+
///
96+
/// The next call to [`Self::recv`] will block on the underlying channel
97+
/// instead of continuing to linger, and the linger policy is reset to its
98+
/// baseline window.
99+
fn cool_down(&mut self) {
100+
self.is_hot = false;
101+
self.linger.on_miss();
102+
}
103+
}
104+
105+
#[derive(Debug)]
106+
struct AdaptiveLinger {
107+
baseline: Duration,
108+
current: Duration,
109+
max: Duration,
110+
}
111+
112+
impl AdaptiveLinger {
113+
/// Create a linger policy with a baseline window and an upper bound.
114+
///
115+
/// `baseline` is the window restored after any linger miss. `max` caps how
116+
/// far the window may grow after repeated linger hits.
117+
fn new(baseline: Duration, max: Duration) -> Self {
118+
assert!(
119+
baseline <= max,
120+
"baseline linger ({baseline:?}) must not exceed max linger ({max:?})"
121+
);
122+
Self {
123+
baseline,
124+
current: baseline,
125+
max,
126+
}
127+
}
128+
129+
/// Return the current linger window.
130+
fn current(&self) -> Duration {
131+
self.current
132+
}
133+
134+
/// Record a linger hit by growing the next linger window.
135+
///
136+
/// The window doubles on each hit until it reaches `self.max`.
137+
fn on_hit(&mut self) {
138+
self.current = self.current.saturating_mul(2).min(self.max);
139+
}
140+
141+
/// Record a linger miss by resetting to the baseline window.
142+
fn on_miss(&mut self) {
143+
self.current = self.baseline;
144+
}
145+
}

crates/core/src/util/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use tracing::Span;
66

77
pub mod prometheus_handle;
88

9+
pub mod adaptive_recv;
910
pub mod jobs;
1011
pub mod notify_once;
1112
pub mod thread_scheduling;

0 commit comments

Comments
 (0)