Skip to content

Commit 260e908

Browse files
Keep subscription fanout worker warm with adaptive linger policy
1 parent f672ae2 commit 260e908

File tree

3 files changed

+161
-2
lines changed

3 files changed

+161
-2
lines changed

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::host::module_host::{DatabaseTableUpdate, ModuleEvent, UpdatesRelValue
1010
use crate::subscription::delta::eval_delta;
1111
use crate::subscription::row_list_builder_pool::{BsatnRowListBuilderPool, JsonRowListBuilderFakePool};
1212
use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilderSource};
13+
use crate::util::adaptive_recv::AdaptiveUnboundedReceiver;
1314
use crate::worker_metrics::WORKER_METRICS;
1415
type V2EvalUpdatesResult = (Vec<V2ClientUpdate>, Vec<(SubscriptionIdV2, Box<str>)>, ExecutionMetrics);
1516
use core::mem;
@@ -37,6 +38,7 @@ use std::collections::BTreeMap;
3738
use std::fmt::Debug;
3839
use std::sync::atomic::{AtomicBool, Ordering};
3940
use std::sync::Arc;
41+
use std::time::Duration;
4042
use tokio::sync::{mpsc, oneshot};
4143

4244
/// Clients are uniquely identified by their Identity and ConnectionId.
@@ -1715,7 +1717,7 @@ impl SendWorkerClient {
17151717
/// See comment on the `send_worker_tx` field in [`SubscriptionManager`] for motivation.
17161718
struct SendWorker {
17171719
/// Receiver end of the [`SubscriptionManager`]'s `send_worker_tx` channel.
1718-
rx: mpsc::UnboundedReceiver<SendWorkerMessage>,
1720+
rx: AdaptiveUnboundedReceiver<SendWorkerMessage>,
17191721

17201722
/// `subscription_send_queue_length` metric labeled for this database's `Identity`.
17211723
///
@@ -1756,6 +1758,12 @@ impl Drop for SendWorker {
17561758
}
17571759

17581760
impl SendWorker {
1761+
// Keep the worker warm briefly after handling a message so bursts do not
1762+
// pay a park/unpark cost on every enqueue, while still parking quickly
1763+
// once traffic goes quiet.
1764+
const BASELINE_LINGER: Duration = Duration::from_micros(25);
1765+
const MAX_LINGER: Duration = Duration::from_micros(500);
1766+
17591767
fn is_client_dropped_or_cancelled(&self, client_id: &ClientId) -> bool {
17601768
self.clients
17611769
.get(client_id)
@@ -1814,7 +1822,7 @@ impl SendWorker {
18141822
database_identity_to_clean_up_metric: Option<Identity>,
18151823
) -> Self {
18161824
Self {
1817-
rx,
1825+
rx: AdaptiveUnboundedReceiver::new(rx, Self::BASELINE_LINGER, Self::MAX_LINGER),
18181826
queue_length_metric,
18191827
clients: Default::default(),
18201828
database_identity_to_clean_up_metric,
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
use std::time::Duration;
2+
3+
use tokio::sync::mpsc;
4+
use tokio::time::sleep;
5+
6+
/// Receives from a Tokio unbounded channel with an adaptive linger policy.
7+
///
8+
/// This helper is intended for single-consumer background workers that want
9+
/// to avoid parking on `recv()` after every message during bursty traffic.
10+
///
11+
/// The receiver has two modes - hot and cold. In cold mode it blocks on
12+
/// `recv()` until the next message arrives. In hot mode it prefers to stay
13+
/// awake, so after receiving a message, it will drain the channel, sleep for
14+
/// a short period (linger), and only then poll the channel again. This keeps
15+
/// the receiver off `recv()` during the linger window, so producers can enqueue
16+
/// more work without waking a parked task.
17+
///
18+
/// The linger policy is as follows: If work is present when a linger window
19+
/// expires, double the window up to `max_linger`. If a linger window expires
20+
/// and the queue is still empty, reset the window to `baseline_linger`.
21+
///
22+
/// Note, messages returned immediately by `try_recv()` do not count as hits,
23+
/// and do not double the linger window.
24+
#[derive(Debug)]
25+
pub struct AdaptiveUnboundedReceiver<T> {
26+
rx: mpsc::UnboundedReceiver<T>,
27+
linger: AdaptiveLinger,
28+
is_hot: bool,
29+
}
30+
31+
impl<T> AdaptiveUnboundedReceiver<T> {
32+
/// Create an adaptive receiver around a Tokio unbounded channel.
33+
///
34+
/// `baseline_linger` is the linger window used after a cold wakeup or any
35+
/// linger miss. `max_linger` caps how far the linger window may grow after
36+
/// repeated linger hits.
37+
///
38+
/// This constructor does not spawn any tasks and does not alter the
39+
/// channel's ordering semantics. It only configures how aggressively the
40+
/// consumer stays awake after work arrives.
41+
pub fn new(rx: mpsc::UnboundedReceiver<T>, baseline_linger: Duration, max_linger: Duration) -> Self {
42+
Self {
43+
rx,
44+
linger: AdaptiveLinger::new(baseline_linger, max_linger),
45+
is_hot: false,
46+
}
47+
}
48+
49+
/// Receive the next message while adapting how aggressively we linger
50+
/// before parking again.
51+
///
52+
/// Once a worker has been woken up by one message, subsequent calls try to
53+
/// stay on the hot path:
54+
///
55+
/// 1. Drain any already-queued work immediately with `try_recv()`
56+
/// 2. If the queue is empty, sleep for the current linger window
57+
/// 3. When the sleep fires, poll the queue again with `try_recv()`
58+
/// 4. On a linger hit, double the window and continue lingering
59+
/// 5. On a linger miss, reset the window to the baseline and park on `recv()`
60+
///
61+
/// This trades a small amount of hot-path latency for lower wake overhead.
62+
/// While the receiver is hot, senders enqueue into the channel without
63+
/// waking a parked `recv()` future.
64+
pub async fn recv(&mut self) -> Option<T> {
65+
loop {
66+
if !self.is_hot {
67+
let message = self.rx.recv().await?;
68+
self.is_hot = true;
69+
return Some(message);
70+
}
71+
72+
match self.rx.try_recv() {
73+
Ok(message) => return Some(message),
74+
Err(mpsc::error::TryRecvError::Disconnected) => return None,
75+
Err(mpsc::error::TryRecvError::Empty) => {}
76+
}
77+
78+
let linger = self.linger.current();
79+
if linger.is_zero() {
80+
self.cool_down();
81+
continue;
82+
}
83+
84+
sleep(linger).await;
85+
86+
match self.rx.try_recv() {
87+
Ok(message) => {
88+
self.linger.on_hit();
89+
return Some(message);
90+
}
91+
Err(mpsc::error::TryRecvError::Disconnected) => return None,
92+
Err(mpsc::error::TryRecvError::Empty) => {
93+
self.cool_down();
94+
}
95+
}
96+
}
97+
}
98+
99+
/// Return the receiver to its cold state after a linger miss.
100+
///
101+
/// The next call to [`Self::recv`] will block on the underlying channel
102+
/// instead of continuing to linger, and the linger policy is reset to its
103+
/// baseline window.
104+
fn cool_down(&mut self) {
105+
self.is_hot = false;
106+
self.linger.on_miss();
107+
}
108+
}
109+
110+
#[derive(Debug)]
111+
struct AdaptiveLinger {
112+
baseline: Duration,
113+
current: Duration,
114+
max: Duration,
115+
}
116+
117+
impl AdaptiveLinger {
118+
/// Create a linger policy with a baseline window and an upper bound.
119+
///
120+
/// `baseline` is the window restored after any linger miss. `max` caps how
121+
/// far the window may grow after repeated linger hits.
122+
fn new(baseline: Duration, max: Duration) -> Self {
123+
assert!(
124+
baseline <= max,
125+
"baseline linger ({baseline:?}) must not exceed max linger ({max:?})"
126+
);
127+
Self {
128+
baseline,
129+
current: baseline,
130+
max,
131+
}
132+
}
133+
134+
/// Return the current linger window.
135+
fn current(&self) -> Duration {
136+
self.current
137+
}
138+
139+
/// Record a linger hit by growing the next linger window.
140+
///
141+
/// The window doubles on each hit until it reaches `self.max`.
142+
fn on_hit(&mut self) {
143+
self.current = self.current.saturating_mul(2).min(self.max);
144+
}
145+
146+
/// Record a linger miss by resetting to the baseline window.
147+
fn on_miss(&mut self) {
148+
self.current = self.baseline;
149+
}
150+
}

crates/core/src/util/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use tracing::Span;
66

77
pub mod prometheus_handle;
88

9+
pub mod adaptive_recv;
910
pub mod jobs;
1011
pub mod notify_once;
1112
pub mod thread_scheduling;

0 commit comments

Comments
 (0)