Skip to content

Commit 2b3aa5a

Browse files
durability: Use async-channel to allow blocking send (#4802)
The previous approaches would either: - panic when the queue becomes full, as `append_tx` is run inside the context of a `LocalSet`, which is basically a glorified current thread runtime - deadlock because the receiver runtime has no way of notifiying the sender of freed capacity in the channel `async-channel` handles wait queues and notifications internally, so can be used freely from either blocking or async contexts. This _may_ come at different performance characteristics, but I haven't measured them. --------- Co-authored-by: joshua-spacetime <josh@clockworklabs.io>
1 parent 7b3bc01 commit 2b3aa5a

File tree

6 files changed

+82
-443
lines changed

6 files changed

+82
-443
lines changed

Cargo.lock

Lines changed: 49 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ ahash = { version = "0.8", default-features = false, features = ["std"] }
153153
anyhow = "1.0.68"
154154
anymap = "0.12"
155155
arrayvec = "0.7.2"
156+
async-channel = "2.5"
156157
async-stream = "0.3.6"
157158
async-trait = "0.1.68"
158159
axum = { version = "0.7", features = ["tracing"] }

crates/durability/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ fallocate = ["spacetimedb-commitlog/fallocate"]
1313

1414
[dependencies]
1515
anyhow.workspace = true
16+
async-channel.workspace = true
1617
futures.workspace = true
1718
itertools.workspace = true
1819
log.workspace = true

crates/durability/src/imp/local.rs

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ pub struct Local<T> {
9797
///
9898
/// The queue is bounded to
9999
/// `Options::QUEUE_CAPACITY_MULTIPLIER * Options::batch_capacity`.
100-
queue: mpsc::Sender<PreparedTx<Txdata<T>>>,
100+
queue: async_channel::Sender<PreparedTx<Txdata<T>>>,
101101
/// How many transactions are pending durability, including items buffered
102102
/// in the queue and items currently being written by the actor.
103103
///
@@ -137,7 +137,7 @@ impl<T: Encode + Send + Sync + 'static> Local<T> {
137137
on_new_segment,
138138
)?);
139139
let queue_capacity = opts.queue_capacity();
140-
let (queue, txdata_rx) = mpsc::channel(queue_capacity);
140+
let (queue, txdata_rx) = async_channel::bounded(queue_capacity);
141141
let queue_depth = Arc::new(AtomicU64::new(0));
142142
let (durable_tx, durable_rx) = watch::channel(clog.max_committed_offset());
143143
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
@@ -218,7 +218,7 @@ impl<T: Encode + Send + Sync + 'static> Actor<T> {
218218
#[instrument(name = "durability::local::actor", skip_all)]
219219
async fn run(
220220
self,
221-
mut transactions_rx: mpsc::Receiver<PreparedTx<Txdata<T>>>,
221+
transactions_rx: async_channel::Receiver<PreparedTx<Txdata<T>>>,
222222
mut shutdown_rx: mpsc::Receiver<oneshot::Sender<OwnedNotified>>,
223223
) {
224224
info!("starting durability actor");
@@ -244,7 +244,7 @@ impl<T: Encode + Send + Sync + 'static> Actor<T> {
244244
// potentially requiring the `tx_buf` to allocate additional
245245
// capacity.
246246
// We'll reclaim capacity in excess of `self.batch_size` below.
247-
n = transactions_rx.recv_many(&mut tx_buf, usize::MAX) => {
247+
n = recv_many(&transactions_rx, &mut tx_buf, usize::MAX) => {
248248
if n == 0 {
249249
break;
250250
}
@@ -344,29 +344,8 @@ impl<T: Send + Sync + 'static> Durability for Local<T> {
344344
type TxData = Txdata<T>;
345345

346346
fn append_tx(&self, tx: PreparedTx<Self::TxData>) {
347-
let mut tx = Some(tx);
348-
let blocked = match self.queue.try_reserve() {
349-
Ok(permit) => {
350-
permit.send(tx.take().expect("tx already sent"));
351-
false
352-
}
353-
Err(mpsc::error::TrySendError::Closed(_)) => {
354-
panic!("durability actor crashed");
355-
}
356-
Err(mpsc::error::TrySendError::Full(_)) => {
357-
let mut send = || self.queue.blocking_send(tx.take().expect("tx already sent"));
358-
if tokio::runtime::Handle::try_current().is_ok() {
359-
tokio::task::block_in_place(send)
360-
} else {
361-
send()
362-
}
363-
.expect("durability actor crashed");
364-
true
365-
}
366-
};
367-
347+
self.queue.send_blocking(tx).expect("local durability: actor vanished");
368348
self.queue_depth.fetch_add(1, Relaxed);
369-
let _ = blocked;
370349
}
371350

372351
fn durable_tx_offset(&self) -> DurableOffset {
@@ -436,3 +415,28 @@ impl<T: Encode + 'static> History for Commitlog<Txdata<T>> {
436415
(min, max)
437416
}
438417
}
418+
419+
/// Implement tokio's `recv_many` for an `async_channel` receiver.
420+
async fn recv_many<T>(chan: &async_channel::Receiver<T>, buf: &mut Vec<T>, limit: usize) -> usize {
421+
let mut n = 0;
422+
if !chan.is_empty() {
423+
buf.reserve(chan.len().min(limit));
424+
while n < limit {
425+
let Ok(val) = chan.try_recv() else {
426+
break;
427+
};
428+
buf.push(val);
429+
n += 1;
430+
}
431+
}
432+
433+
if n == 0 {
434+
let Ok(val) = chan.recv().await else {
435+
return n;
436+
};
437+
buf.push(val);
438+
n += 1;
439+
}
440+
441+
n
442+
}

0 commit comments

Comments
 (0)