Skip to content

Commit 4e54331

Browse files
v3 websocket protocol
1 parent f309d05 commit 4e54331

File tree

11 files changed

+158
-54
lines changed

11 files changed

+158
-54
lines changed

crates/client-api-messages/DEVELOP.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,8 @@ spacetime generate -p spacetimedb-cli --lang <SDK lang> \
1919
--out-dir <sdk WebSocket schema bindings dir> \
2020
--module-def ws_schema_v2.json
2121
```
22+
23+
Note, the v3 WebSocket protocol does not have a separate schema.
24+
It reuses the v2 message schema and only changes the websocket binary framing.
25+
In v2 for example, a WebSocket frame contained a single BSATN-encoded v2 message,
26+
but in v3, a single WebSocket frame may contain a batch of one or more v2 messages.

crates/client-api-messages/src/websocket.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@
1717
pub mod common;
1818
pub mod v1;
1919
pub mod v2;
20+
pub mod v3;
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
//! Binary framing for websocket protocol v3.
2+
//!
3+
//! Unlike v2, v3 does not define a new outer message schema.
4+
//! A single binary websocket payload contains one or more BSATN-encoded
5+
//! [`crate::websocket::v2::ClientMessage`] values from client to server,
6+
//! or one or more consecutive BSATN-encoded [`crate::websocket::v2::ServerMessage`]
7+
//! values from server to client.
8+
//!
9+
//! Client and server may coalesce multiple messages into one websocket payload,
10+
//! or send them separately, regardless of what the other one does,
11+
//! so long as logical order is preserved.
12+
13+
pub const BIN_PROTOCOL: &str = "v3.bsatn.spacetimedb";

crates/client-api/src/routes/subscribe.rs

Lines changed: 69 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use spacetimedb::worker_metrics::WORKER_METRICS;
3838
use spacetimedb::Identity;
3939
use spacetimedb_client_api_messages::websocket::v1 as ws_v1;
4040
use spacetimedb_client_api_messages::websocket::v2 as ws_v2;
41+
use spacetimedb_client_api_messages::websocket::v3 as ws_v3;
4142
use spacetimedb_datastore::execution_context::WorkloadType;
4243
use spacetimedb_lib::connection_id::{ConnectionId, ConnectionIdForUrl};
4344
use tokio::sync::{mpsc, watch};
@@ -62,6 +63,8 @@ pub const TEXT_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v1::TEXT_PROT
6263
pub const BIN_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v1::BIN_PROTOCOL);
6364
#[allow(clippy::declare_interior_mutable_const)]
6465
pub const V2_BIN_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v2::BIN_PROTOCOL);
66+
#[allow(clippy::declare_interior_mutable_const)]
67+
pub const V3_BIN_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v3::BIN_PROTOCOL);
6568

6669
pub trait HasWebSocketOptions {
6770
fn websocket_options(&self) -> WebSocketOptions;
@@ -101,7 +104,7 @@ fn resolve_confirmed_reads_default(version: WsVersion, confirmed: Option<bool>)
101104
}
102105
match version {
103106
WsVersion::V1 => false,
104-
WsVersion::V2 => crate::DEFAULT_CONFIRMED_READS,
107+
WsVersion::V2 | WsVersion::V3 => crate::DEFAULT_CONFIRMED_READS,
105108
}
106109
}
107110

@@ -151,6 +154,13 @@ where
151154
}
152155

153156
let (res, ws_upgrade, protocol) = ws.select_protocol([
157+
(
158+
V3_BIN_PROTOCOL,
159+
NegotiatedProtocol {
160+
protocol: Protocol::Binary,
161+
version: WsVersion::V3,
162+
},
163+
),
154164
(
155165
V2_BIN_PROTOCOL,
156166
NegotiatedProtocol {
@@ -284,7 +294,7 @@ where
284294
};
285295
client.send_message(None, OutboundMessage::V1(message.into()))
286296
}
287-
WsVersion::V2 => {
297+
WsVersion::V2 | WsVersion::V3 => {
288298
let message = ws_v2::ServerMessage::InitialConnection(ws_v2::InitialConnection {
289299
identity: client_identity,
290300
connection_id,
@@ -1296,7 +1306,7 @@ async fn ws_encode_task(
12961306
let buf_pool = ArrayQueue::new(BUF_POOL_CAPACITY);
12971307
let mut in_use_bufs: Vec<ScopeGuard<InUseSerializeBuffer, _>> = Vec::with_capacity(BUF_POOL_CAPACITY);
12981308

1299-
while let Some(message) = messages.recv().await {
1309+
'send: while let Some(message) = messages.recv().await {
13001310
// Drop serialize buffers with no external referent,
13011311
// returning them to the pool.
13021312
in_use_bufs.retain(|in_use| !in_use.is_unique());
@@ -1306,55 +1316,62 @@ async fn ws_encode_task(
13061316

13071317
let in_use_buf = match message {
13081318
OutboundWsMessage::Error(message) => {
1309-
if config.version == WsVersion::V2 {
1310-
log::error!("dropping v1 error message sent to a v2 client: {:?}", message);
1319+
if config.version != WsVersion::V1 {
1320+
log::error!(
1321+
"dropping v1 error message sent to a binary websocket client: {:?}",
1322+
message
1323+
);
13111324
continue;
13121325
}
1313-
let (stats, in_use, mut frames) = ws_encode_message(config, buf, message, false, &bsatn_rlb_pool).await;
1314-
metrics.report(None, None, stats);
1315-
if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() {
1316-
break;
1317-
}
1318-
1326+
let Ok(in_use) = ws_forward_frames(
1327+
&metrics,
1328+
&outgoing_frames,
1329+
None,
1330+
None,
1331+
ws_encode_message(config, buf, message, false, &bsatn_rlb_pool).await,
1332+
) else {
1333+
break 'send;
1334+
};
13191335
in_use
13201336
}
13211337
OutboundWsMessage::Message(message) => {
13221338
let workload = message.workload();
13231339
let num_rows = message.num_rows();
13241340
match message {
13251341
OutboundMessage::V2(server_message) => {
1326-
if config.version != WsVersion::V2 {
1342+
if config.version == WsVersion::V1 {
13271343
log::error!("dropping v2 message on v1 connection");
13281344
continue;
13291345
}
13301346

1331-
let (stats, in_use, mut frames) =
1332-
ws_encode_message_v2(config, buf, server_message, false, &bsatn_rlb_pool).await;
1333-
metrics.report(workload, num_rows, stats);
1334-
if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() {
1335-
break;
1336-
}
1337-
1347+
let Ok(in_use) = ws_forward_frames(
1348+
&metrics,
1349+
&outgoing_frames,
1350+
workload,
1351+
num_rows,
1352+
ws_encode_binary_message(config, buf, server_message, false, &bsatn_rlb_pool).await,
1353+
) else {
1354+
break 'send;
1355+
};
13381356
in_use
13391357
}
13401358
OutboundMessage::V1(message) => {
1341-
if config.version == WsVersion::V2 {
1342-
log::error!(
1343-
"dropping v1 message for v2 connection until v2 serialization is implemented: {:?}",
1344-
message
1345-
);
1359+
if config.version != WsVersion::V1 {
1360+
log::error!("dropping v1 message for a binary websocket connection: {:?}", message);
13461361
continue;
13471362
}
13481363

13491364
let is_large = num_rows.is_some_and(|n| n > 1024);
13501365

1351-
let (stats, in_use, mut frames) =
1352-
ws_encode_message(config, buf, message, is_large, &bsatn_rlb_pool).await;
1353-
metrics.report(workload, num_rows, stats);
1354-
if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() {
1355-
break;
1356-
}
1357-
1366+
let Ok(in_use) = ws_forward_frames(
1367+
&metrics,
1368+
&outgoing_frames,
1369+
workload,
1370+
num_rows,
1371+
ws_encode_message(config, buf, message, is_large, &bsatn_rlb_pool).await,
1372+
) else {
1373+
break 'send;
1374+
};
13581375
in_use
13591376
}
13601377
}
@@ -1370,6 +1387,21 @@ async fn ws_encode_task(
13701387
}
13711388
}
13721389

1390+
/// Reports encode metrics for an already-encoded message and forwards its
1391+
/// frames to the websocket send task.
1392+
fn ws_forward_frames(
1393+
metrics: &SendMetrics,
1394+
outgoing_frames: &mpsc::UnboundedSender<Frame>,
1395+
workload: Option<WorkloadType>,
1396+
num_rows: Option<usize>,
1397+
encoded: (EncodeMetrics, InUseSerializeBuffer, impl IntoIterator<Item = Frame>),
1398+
) -> Result<InUseSerializeBuffer, mpsc::error::SendError<Frame>> {
1399+
let (stats, in_use, frames) = encoded;
1400+
metrics.report(workload, num_rows, stats);
1401+
frames.into_iter().try_for_each(|frame| outgoing_frames.send(frame))?;
1402+
Ok(in_use)
1403+
}
1404+
13731405
/// Some stats about serialization and compression.
13741406
///
13751407
/// Returned by [`ws_encode_message`].
@@ -1443,21 +1475,21 @@ async fn ws_encode_message(
14431475
(metrics, msg_alloc, frames)
14441476
}
14451477

1446-
#[allow(dead_code, unused_variables)]
1447-
async fn ws_encode_message_v2(
1478+
async fn ws_encode_binary_message(
14481479
config: ClientConfig,
14491480
buf: SerializeBuffer,
14501481
message: ws_v2::ServerMessage,
14511482
is_large_message: bool,
14521483
bsatn_rlb_pool: &BsatnRowListBuilderPool,
14531484
) -> (EncodeMetrics, InUseSerializeBuffer, impl Iterator<Item = Frame> + use<>) {
14541485
let start = Instant::now();
1486+
let compression = config.compression;
14551487

14561488
let (in_use, data) = if is_large_message {
14571489
let bsatn_rlb_pool = bsatn_rlb_pool.clone();
1458-
spawn_rayon(move || serialize_v2(&bsatn_rlb_pool, buf, message, config.compression)).await
1490+
spawn_rayon(move || serialize_v2(&bsatn_rlb_pool, buf, message, compression)).await
14591491
} else {
1460-
serialize_v2(bsatn_rlb_pool, buf, message, config.compression)
1492+
serialize_v2(bsatn_rlb_pool, buf, message, compression)
14611493
};
14621494

14631495
let metrics = EncodeMetrics {
@@ -2298,9 +2330,11 @@ mod tests {
22982330

22992331
#[test]
23002332
fn confirmed_reads_default_depends_on_ws_version() {
2333+
assert!(resolve_confirmed_reads_default(WsVersion::V3, None));
23012334
assert!(resolve_confirmed_reads_default(WsVersion::V2, None));
23022335
assert!(!resolve_confirmed_reads_default(WsVersion::V1, None));
23032336
assert!(resolve_confirmed_reads_default(WsVersion::V1, Some(true)));
2337+
assert!(!resolve_confirmed_reads_default(WsVersion::V3, Some(false)));
23042338
assert!(!resolve_confirmed_reads_default(WsVersion::V2, Some(false)));
23052339
}
23062340

crates/core/src/client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ pub mod consume_each_list;
77
mod message_handlers;
88
mod message_handlers_v1;
99
mod message_handlers_v2;
10+
mod message_handlers_v3;
1011
pub mod messages;
1112

1213
pub use client_connection::{

crates/core/src/client/client_connection.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pub enum Protocol {
4747
pub enum WsVersion {
4848
V1,
4949
V2,
50+
V3,
5051
}
5152

5253
impl Protocol {
@@ -384,7 +385,7 @@ impl ClientConnectionSender {
384385
debug_assert!(
385386
matches!(
386387
(&self.config.version, &message),
387-
(WsVersion::V1, OutboundMessage::V1(_)) | (WsVersion::V2, OutboundMessage::V2(_))
388+
(WsVersion::V1, OutboundMessage::V1(_)) | (WsVersion::V2 | WsVersion::V3, OutboundMessage::V2(_))
388389
),
389390
"attempted to send message variant that does not match client websocket version"
390391
);

crates/core/src/client/message_handlers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
2323
match client.config.version {
2424
WsVersion::V1 => super::message_handlers_v1::handle(client, message, timer).await,
2525
WsVersion::V2 => super::message_handlers_v2::handle(client, message, timer).await,
26+
WsVersion::V3 => super::message_handlers_v3::handle(client, message, timer).await,
2627
}
2728
}

crates/core/src/client/message_handlers_v2.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,14 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
2020
)))
2121
}
2222
};
23+
handle_decoded_message(client, message, timer).await
24+
}
25+
26+
pub(super) async fn handle_decoded_message(
27+
client: &ClientConnection,
28+
message: ws_v2::ClientMessage,
29+
timer: Instant,
30+
) -> Result<(), MessageHandleError> {
2331
let module = client.module();
2432
let mod_info = module.info();
2533
let mod_metrics = &mod_info.metrics;
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
use super::{ClientConnection, DataMessage, MessageHandleError};
2+
use serde::de::Error as _;
3+
use spacetimedb_lib::bsatn;
4+
use std::time::Instant;
5+
6+
const EMPTY_V3_PAYLOAD_ERR: &str = "v3 websocket binary payload must contain at least one v2 client message";
7+
8+
pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Instant) -> Result<(), MessageHandleError> {
9+
client.observe_websocket_request_message(&message);
10+
match message {
11+
DataMessage::Binary(message_buf) => {
12+
let mut remaining = &message_buf[..];
13+
14+
if remaining.is_empty() {
15+
return Err(bsatn::DecodeError::Other(EMPTY_V3_PAYLOAD_ERR.into()).into());
16+
}
17+
18+
loop {
19+
let message = bsatn::from_reader(&mut remaining)?;
20+
super::message_handlers_v2::handle_decoded_message(client, message, timer).await?;
21+
if remaining.is_empty() {
22+
break;
23+
}
24+
}
25+
}
26+
DataMessage::Text(_) => {
27+
return Err(MessageHandleError::TextDecode(serde_json::Error::custom(
28+
"v3 websocket does not support text messages",
29+
)))
30+
}
31+
}
32+
33+
Ok(())
34+
}

crates/core/src/client/messages.rs

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,25 @@ impl SerializeBuffer {
9797
}
9898
}
9999

100+
/// Finalize a binary websocket payload by optionally compressing the serialized
101+
/// bytes after the caller has written the protocol-specific payload body.
102+
///
103+
/// Callers are responsible for writing the compression tag before invoking this
104+
/// helper.
105+
fn finalize_binary_serialize_buffer(
106+
buffer: SerializeBuffer,
107+
uncompressed_len: usize,
108+
compression: ws_v1::Compression,
109+
) -> (InUseSerializeBuffer, Bytes) {
110+
match decide_compression(uncompressed_len, compression) {
111+
ws_v1::Compression::None => buffer.uncompressed(),
112+
ws_v1::Compression::Brotli => {
113+
buffer.compress_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_BROTLI, brotli_compress)
114+
}
115+
ws_v1::Compression::Gzip => buffer.compress_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_GZIP, gzip_compress),
116+
}
117+
}
118+
100119
type BytesMutWriter<'a> = bytes::buf::Writer<&'a mut BytesMut>;
101120

102121
pub enum InUseSerializeBuffer {
@@ -159,28 +178,20 @@ pub fn serialize(
159178
let srv_msg = buffer.write_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_NONE, |w| {
160179
bsatn::to_writer(w.into_inner(), &msg).unwrap()
161180
});
181+
let srv_msg_len = srv_msg.len();
162182

163183
// At this point, we no longer have a use for `msg`,
164184
// so try to reclaim its buffers.
165185
msg.consume_each_list(&mut |buffer| bsatn_rlb_pool.try_put(buffer));
166186

167187
// Conditionally compress the message.
168-
let (in_use, msg_bytes) = match decide_compression(srv_msg.len(), config.compression) {
169-
ws_v1::Compression::None => buffer.uncompressed(),
170-
ws_v1::Compression::Brotli => {
171-
buffer.compress_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_BROTLI, brotli_compress)
172-
}
173-
ws_v1::Compression::Gzip => {
174-
buffer.compress_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_GZIP, gzip_compress)
175-
}
176-
};
188+
let (in_use, msg_bytes) = finalize_binary_serialize_buffer(buffer, srv_msg_len, config.compression);
177189
(in_use, msg_bytes.into())
178190
}
179191
}
180192
}
181193

182194
/// Serialize `msg` into a [`DataMessage`] containing a [`ws_v2::ServerMessage`].
183-
///
184195
/// This mirrors the v1 framing by prepending the compression tag and applying
185196
/// conditional compression when configured.
186197
pub fn serialize_v2(
@@ -192,18 +203,13 @@ pub fn serialize_v2(
192203
let srv_msg = buffer.write_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_NONE, |w| {
193204
bsatn::to_writer(w.into_inner(), &msg).expect("should be able to bsatn encode v2 message");
194205
});
206+
let srv_msg_len = srv_msg.len();
195207

196208
// At this point, we no longer have a use for `msg`,
197209
// so try to reclaim its buffers.
198210
msg.consume_each_list(&mut |buffer| bsatn_rlb_pool.try_put(buffer));
199211

200-
match decide_compression(srv_msg.len(), compression) {
201-
ws_v1::Compression::None => buffer.uncompressed(),
202-
ws_v1::Compression::Brotli => {
203-
buffer.compress_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_BROTLI, brotli_compress)
204-
}
205-
ws_v1::Compression::Gzip => buffer.compress_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_GZIP, gzip_compress),
206-
}
212+
finalize_binary_serialize_buffer(buffer, srv_msg_len, compression)
207213
}
208214

209215
#[derive(Debug, From)]

0 commit comments

Comments
 (0)