Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

losing my mind for a little producer-consumer

i........ i switched flume to a bounded channel so the send calls became blocking in async context and then nothing made sense for a long time ๐Ÿ™ƒ๐Ÿ™ƒ๐Ÿ™ƒ๐Ÿ™ƒ๐Ÿ™ƒ

flume is gone now, tokio built-in mpsc should be fine and harder to mess up in async context.

+303 -20
Cargo.lock
···
]
[[package]]
+
name = "allocator-api2"
+
version = "0.2.21"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923"
+
+
[[package]]
name = "android-tzdata"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
]
[[package]]
+
name = "async-lock"
+
version = "3.4.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18"
+
dependencies = [
+
"event-listener",
+
"event-listener-strategy",
+
"pin-project-lite",
+
]
+
+
[[package]]
name = "async-trait"
version = "0.1.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
[[package]]
name = "atrium-api"
-
version = "0.24.10"
+
version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "9c5d74937642f6b21814e82d80f54d55ebd985b681bffbe27c8a76e726c3c4db"
+
checksum = "ea3ea578c768ec91082e424a8d139517b2cb5c75149bf3cec04371a1e74f00f2"
dependencies = [
+
"atrium-common",
"atrium-xrpc",
"chrono",
"http",
···
"serde_json",
"thiserror 1.0.69",
"trait-variant",
+
]
+
+
[[package]]
+
name = "atrium-common"
+
version = "0.1.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "168e558408847bfed69df1033a32fd051f7a037ebc90ea46e588ccb2bfbd7233"
+
dependencies = [
+
"dashmap",
+
"lru",
+
"moka",
+
"thiserror 1.0.69",
+
"tokio",
+
"trait-variant",
+
"web-time",
]
[[package]]
···
checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
[[package]]
+
name = "concurrent-queue"
+
version = "2.5.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973"
+
dependencies = [
+
"crossbeam-utils",
+
]
+
+
[[package]]
name = "constellation"
version = "0.1.0"
dependencies = [
···
]
[[package]]
+
name = "crossbeam-channel"
+
version = "0.5.14"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471"
+
dependencies = [
+
"crossbeam-utils",
+
]
+
+
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
]
[[package]]
+
name = "dashmap"
+
version = "6.1.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
+
dependencies = [
+
"cfg-if",
+
"crossbeam-utils",
+
"hashbrown 0.14.5",
+
"lock_api",
+
"once_cell",
+
"parking_lot_core",
+
]
+
+
[[package]]
name = "data-encoding"
version = "2.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
]
[[package]]
+
name = "event-listener"
+
version = "5.4.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "3492acde4c3fc54c845eaab3eed8bd00c7a7d881f78bfc801e43a93dec1331ae"
+
dependencies = [
+
"concurrent-queue",
+
"parking",
+
"pin-project-lite",
+
]
+
+
[[package]]
+
name = "event-listener-strategy"
+
version = "0.5.3"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "3c3e4e0dd3673c1139bf041f3008816d9cf2946bbfac2945c09e523b8d7b05b2"
+
dependencies = [
+
"event-listener",
+
"pin-project-lite",
+
]
+
+
[[package]]
name = "fastrand"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095"
dependencies = [
-
"futures-core",
-
"futures-sink",
-
"nanorand",
"spin",
]
···
]
[[package]]
+
name = "generator"
+
version = "0.8.4"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "cc6bd114ceda131d3b1d665eba35788690ad37f5916457286b32ab6fd3c438dd"
+
dependencies = [
+
"cfg-if",
+
"libc",
+
"log",
+
"rustversion",
+
"windows",
+
]
+
+
[[package]]
name = "generic-array"
version = "0.14.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7"
dependencies = [
"cfg-if",
-
"js-sys",
"libc",
"wasi 0.11.0+wasi-snapshot-preview1",
-
"wasm-bindgen",
]
[[package]]
···
[[package]]
name = "hashbrown"
+
version = "0.14.5"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
+
+
[[package]]
+
name = "hashbrown"
version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289"
dependencies = [
+
"allocator-api2",
+
"equivalent",
"foldhash",
···
"atrium-api",
"chrono",
"clap",
-
"flume",
"futures-util",
"log",
"serde",
···
checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f"
[[package]]
+
name = "loom"
+
version = "0.7.2"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca"
+
dependencies = [
+
"cfg-if",
+
"generator",
+
"scoped-tls",
+
"tracing",
+
"tracing-subscriber",
+
]
+
+
[[package]]
+
name = "lru"
+
version = "0.12.5"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38"
+
dependencies = [
+
"hashbrown 0.15.2",
+
]
+
+
[[package]]
name = "lz4-sys"
version = "1.11.1+lz4-1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "19b955cdeb2a02b9117f121ce63aa52d08ade45de53e48fe6a38b39c10f6f709"
dependencies = [
"libc",
+
]
+
+
[[package]]
+
name = "matchers"
+
version = "0.1.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
+
dependencies = [
+
"regex-automata 0.1.10",
[[package]]
···
[[package]]
+
name = "moka"
+
version = "0.12.10"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "a9321642ca94a4282428e6ea4af8cc2ca4eac48ac7a6a4ea8f33f76d0ce70926"
+
dependencies = [
+
"async-lock",
+
"crossbeam-channel",
+
"crossbeam-epoch",
+
"crossbeam-utils",
+
"event-listener",
+
"futures-util",
+
"loom",
+
"parking_lot",
+
"portable-atomic",
+
"rustc_version",
+
"smallvec",
+
"tagptr",
+
"thiserror 1.0.69",
+
"uuid",
+
]
+
+
[[package]]
name = "multibase"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
"core2",
"serde",
"unsigned-varint",
-
]
-
-
[[package]]
-
name = "nanorand"
-
version = "0.7.0"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
-
dependencies = [
-
"getrandom 0.2.15",
[[package]]
···
[[package]]
+
name = "nu-ansi-term"
+
version = "0.46.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
+
dependencies = [
+
"overload",
+
"winapi",
+
]
+
+
[[package]]
name = "num-conv"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
"pkg-config",
"vcpkg",
+
+
[[package]]
+
name = "overload"
+
version = "0.1.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
+
+
[[package]]
+
name = "parking"
+
version = "2.2.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba"
[[package]]
name = "parking_lot"
···
dependencies = [
"aho-corasick",
"memchr",
-
"regex-automata",
-
"regex-syntax",
+
"regex-automata 0.4.9",
+
"regex-syntax 0.8.5",
+
]
+
+
[[package]]
+
name = "regex-automata"
+
version = "0.1.10"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
+
dependencies = [
+
"regex-syntax 0.6.29",
[[package]]
···
dependencies = [
"aho-corasick",
"memchr",
-
"regex-syntax",
+
"regex-syntax 0.8.5",
+
+
[[package]]
+
name = "regex-syntax"
+
version = "0.6.29"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
···
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
+
name = "rustc_version"
+
version = "0.4.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92"
+
dependencies = [
+
"semver",
+
]
+
+
[[package]]
name = "rustix"
version = "0.38.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
[[package]]
+
name = "scoped-tls"
+
version = "1.0.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294"
+
+
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
"core-foundation-sys",
"libc",
+
+
[[package]]
+
name = "semver"
+
version = "1.0.26"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0"
[[package]]
name = "serde"
···
[[package]]
+
name = "sharded-slab"
+
version = "0.1.7"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
+
dependencies = [
+
"lazy_static",
+
]
+
+
[[package]]
name = "shlex"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
[[package]]
+
name = "tagptr"
+
version = "0.2.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
+
+
[[package]]
name = "tempfile"
version = "3.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
[[package]]
+
name = "thread_local"
+
version = "1.1.8"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c"
+
dependencies = [
+
"cfg-if",
+
"once_cell",
+
]
+
+
[[package]]
name = "time"
version = "0.3.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c"
dependencies = [
"once_cell",
+
"valuable",
+
]
+
+
[[package]]
+
name = "tracing-log"
+
version = "0.2.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
+
dependencies = [
+
"log",
+
"once_cell",
+
"tracing-core",
+
]
+
+
[[package]]
+
name = "tracing-subscriber"
+
version = "0.3.19"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
+
dependencies = [
+
"matchers",
+
"nu-ansi-term",
+
"once_cell",
+
"regex",
+
"sharded-slab",
+
"smallvec",
+
"thread_local",
+
"tracing",
+
"tracing-core",
+
"tracing-log",
[[package]]
···
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
+
name = "uuid"
+
version = "1.15.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "e0f540e3240398cce6128b64ba83fdbdd86129c16a3aa1a3a252efd66eb3d587"
+
dependencies = [
+
"getrandom 0.3.1",
+
]
+
+
[[package]]
+
name = "valuable"
+
version = "0.1.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
+
+
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
version = "0.3.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2"
+
dependencies = [
+
"js-sys",
+
"wasm-bindgen",
+
]
+
+
[[package]]
+
name = "web-time"
+
version = "1.1.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb"
dependencies = [
"js-sys",
"wasm-bindgen",
+1 -2
jetstream/Cargo.toml
···
[dependencies]
async-trait = "0.1.83"
-
atrium-api = { version = "0.24.7", default-features = false, features = [
+
atrium-api = { version = "0.25", default-features = false, features = [
"namespace-appbsky",
] }
tokio = { version = "1.41.1", features = ["full", "sync", "time"] }
···
chrono = "0.4.38"
zstd = "0.13.2"
thiserror = "2.0.3"
-
flume = "0.11.1"
log = "0.4.22"
tokio-util = "0.7.13"
+8 -4
jetstream/examples/arbitrary_record.rs
···
..Default::default()
};
-
let jetstream: JetstreamConnector<serde_json::Value> = JetstreamConnector::new(config)?;
-
let receiver = jetstream.connect().await?;
+
let jetstream = JetstreamConnector::new(config)?;
+
let mut receiver = jetstream.connect().await?;
-
println!("Listening for '{}' events on DIDs: {:?}", &*args.nsid, dids);
+
println!(
+
"Listening for '{}' events on DIDs: {:?}",
+
args.nsid.as_str(),
+
dids
+
);
-
while let Ok(event) = receiver.recv_async().await {
+
while let Some(event) = receiver.recv().await {
if let Commit(CommitEvent::Create { commit, .. }) = event {
println!("got record: {:?}", commit.record);
}
+10 -5
jetstream/examples/basic.rs
···
};
let jetstream = JetstreamConnector::new(config)?;
-
let receiver = jetstream.connect().await?;
+
let mut receiver = jetstream.connect().await?;
-
println!("Listening for '{}' events on DIDs: {:?}", &*args.nsid, dids);
+
println!(
+
"Listening for '{}' events on DIDs: {:?}",
+
args.nsid.as_str(),
+
dids
+
);
-
while let Ok(event) = receiver.recv_async().await {
+
while let Some(event) = receiver.recv().await {
if let Commit(commit) = event {
match commit {
CommitEvent::Create { info: _, commit } => {
if let AppBskyFeedPost(record) = commit.record {
println!(
"New post created! ({})\n\n'{}'",
-
commit.info.rkey, record.text
+
commit.info.rkey.as_str(),
+
record.text
);
}
}
CommitEvent::Delete { info: _, commit } => {
-
println!("A post has been deleted. ({})", commit.rkey);
+
println!("A post has been deleted. ({})", commit.rkey.as_str());
}
_ => {}
}
+2
jetstream/src/error.rs
···
CompressionDecoderError(io::Error),
#[error("all receivers were dropped but the websocket connection failed to close cleanly")]
WebSocketCloseFailure,
+
#[error("failed to send ping or pong: {0}")]
+
PingPongError(#[from] tokio_tungstenite::tungstenite::Error),
}
+1 -1
jetstream/src/events/commit.rs
···
/// The type of commit operation that was performed.
pub operation: CommitType,
pub rev: String,
-
pub rkey: String,
+
pub rkey: exports::RecordKey,
/// The NSID of the record type that this commit is associated with.
pub collection: exports::Nsid,
}
+1
jetstream/src/exports.rs
···
Did,
Handle,
Nsid,
+
RecordKey,
};
+23 -11
jetstream/src/lib.rs
···
use serde::de::DeserializeOwned;
use tokio::{
net::TcpStream,
-
sync::Mutex,
+
sync::{
+
mpsc::{
+
channel,
+
Receiver,
+
Sender,
+
},
+
Mutex,
+
},
};
use tokio_tungstenite::{
connect_async,
···
const JETSTREAM_ZSTD_DICTIONARY: &[u8] = include_bytes!("../zstd/dictionary");
/// A receiver channel for consuming Jetstream events.
-
pub type JetstreamReceiver<R> = flume::Receiver<JetstreamEvent<R>>;
+
pub type JetstreamReceiver<R> = Receiver<JetstreamEvent<R>>;
/// An internal sender channel for sending Jetstream events to [JetstreamReceiver]'s.
-
type JetstreamSender<R> = flume::Sender<JetstreamEvent<R>>;
+
type JetstreamSender<R> = Sender<JetstreamEvent<R>>;
/// A wrapper connector type for working with a WebSocket connection to a Jetstream instance to
/// receive and consume events. See [JetstreamConnector::connect] for more info.
···
wanted_dids: Vec::new(),
compression: JetstreamCompression::None,
cursor: None,
-
channel_size: 1024,
+
channel_size: 4096, // a few seconds of firehose buffer
record_type: PhantomData,
}
}
···
.validate()
.map_err(ConnectionError::InvalidConfig)?;
-
let (send_channel, receive_channel) = flume::bounded(self.config.channel_size);
+
let (send_channel, receive_channel) = channel(self.config.channel_size);
let configured_endpoint = self
.config
···
}
if retry_attempt >= max_retries {
+
eprintln!("max retries, bye");
break;
}
+
+
eprintln!("will try to reconnect");
// Exponential backoff
let delay_ms = base_delay_ms * (2_u64.pow(retry_attempt));
···
let false = ping_cancelled.is_cancelled() else {
break;
};
-
log::trace!("Sending ping");
match ping_shared_socket_write
.lock()
.await
-
.send(Message::Ping("ping".as_bytes().to_vec()))
+
.send(Message::Ping("ping!!!!".as_bytes().to_vec()))
.await
{
Ok(_) => (),
Err(error) => {
+
eprintln!("ping send failed");
log::error!("Ping failed: {error}");
break;
}
}
}
+
eprintln!("oh this is bad news.");
});
let mut closing_connection = false;
···
let event = serde_json::from_str(&json)
.map_err(JetstreamEventError::ReceivedMalformedJSON)?;
-
if send_channel.send(event).is_err() {
+
if send_channel.send(event).await.is_err() {
// We can assume that all receivers have been dropped, so we can close
// the connection and exit the task.
log::info!(
···
let event = serde_json::from_str(&json)
.map_err(JetstreamEventError::ReceivedMalformedJSON)?;
-
if send_channel.send(event).is_err() {
+
if send_channel.send(event).await.is_err() {
// We can assume that all receivers have been dropped, so we can close
// the connection and exit the task.
log::info!(
···
}
Message::Ping(vec) => {
log::trace!("Ping recieved, responding");
-
_ = shared_socket_write
+
shared_socket_write
.lock()
.await
.send(Message::Pong(vec))
-
.await;
+
.await
+
.map_err(JetstreamEventError::PingPongError)?;
}
Message::Close(close_frame) => {
if let Some(close_frame) = close_frame {
+148
ufos/src/consumer.rs
···
+
use jetstream::exports::Did;
+
use jetstream::{
+
events::{
+
account::AccountEvent,
+
commit::{CommitData, CommitEvent, CommitInfo},
+
EventInfo, JetstreamEvent,
+
},
+
DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
+
JetstreamReceiver,
+
};
+
use std::mem;
+
use std::time::Duration;
+
use tokio::sync::mpsc::{channel, error::TrySendError, Receiver, Sender};
+
+
use crate::{DeleteRecord, EventBatch, SetRecord};
+
+
const MAX_BATCHED_RECORDS: usize = 32; // non-blocking limit. drops oldest batched record per collection.
+
const MAX_BATCHED_COLLECTIONS: usize = 256; // block at this point. pretty arbitrary, limit unbounded growth since during replay it could grow a lot.
+
const MAX_BATCHED_DELETES: usize = 1024; // block at this point. fairly arbitrary, limit unbounded.
+
const MAX_ACCOUNT_REMOVES: usize = 1; // block at this point. these can be heavy so hold at each one.
+
+
const SEND_TIMEOUT_S: f64 = 3.;
+
+
#[derive(Debug)]
+
struct Batcher {
+
jetstream_receiver: JetstreamReceiver<serde_json::Value>,
+
batch_sender: Sender<EventBatch>,
+
current_batch: EventBatch,
+
}
+
+
pub async fn consume(jetstream_endpoint: &str) -> anyhow::Result<Receiver<EventBatch>> {
+
let config: JetstreamConfig<serde_json::Value> = JetstreamConfig {
+
endpoint: DefaultJetstreamEndpoints::endpoint_or_shortcut(jetstream_endpoint),
+
compression: JetstreamCompression::Zstd,
+
..Default::default()
+
};
+
let jetstream_receiver = JetstreamConnector::new(config)?.connect().await?;
+
let (batch_sender, batch_reciever) = channel::<EventBatch>(1); // *almost* rendezvous: one message in the middle
+
let mut batcher = Batcher::new(jetstream_receiver, batch_sender);
+
tokio::task::spawn(async move { batcher.run().await });
+
Ok(batch_reciever)
+
}
+
+
impl Batcher {
+
fn new(
+
jetstream_receiver: JetstreamReceiver<serde_json::Value>,
+
batch_sender: Sender<EventBatch>,
+
) -> Self {
+
Self {
+
jetstream_receiver,
+
batch_sender,
+
current_batch: Default::default(),
+
}
+
}
+
+
async fn run(&mut self) -> anyhow::Result<()> {
+
loop {
+
if let Some(event) = self.jetstream_receiver.recv().await {
+
self.handle_event(event).await?
+
} else {
+
anyhow::bail!("channel closed");
+
}
+
}
+
}
+
+
async fn handle_event(
+
&mut self,
+
event: JetstreamEvent<serde_json::Value>,
+
) -> anyhow::Result<()> {
+
let batch_full = match event {
+
JetstreamEvent::Commit(CommitEvent::Create { commit, info }) => {
+
self.handle_set_record(true, commit, info)
+
}
+
JetstreamEvent::Commit(CommitEvent::Update { commit, info }) => {
+
self.handle_set_record(false, commit, info)
+
}
+
JetstreamEvent::Commit(CommitEvent::Delete { commit, info }) => {
+
self.handle_delete_record(commit, info)
+
}
+
JetstreamEvent::Identity(_) => false, // identity events are noops for us
+
JetstreamEvent::Account(AccountEvent { info, account }) if !account.active => {
+
self.handle_remove_account(info.did)
+
}
+
JetstreamEvent::Account(_) => false, // ignore account *activations*
+
};
+
if batch_full {
+
self.batch_sender
+
.send_timeout(
+
mem::take(&mut self.current_batch),
+
Duration::from_secs_f64(SEND_TIMEOUT_S),
+
)
+
.await?;
+
} else {
+
match self.batch_sender.try_reserve() {
+
Ok(permit) => permit.send(mem::take(&mut self.current_batch)),
+
Err(TrySendError::Full(())) => {} // no worries if not, keep batching while waiting for capacity
+
Err(TrySendError::Closed(())) => anyhow::bail!("batch channel closed"),
+
}
+
}
+
Ok(())
+
}
+
+
fn handle_set_record(
+
&mut self,
+
new: bool,
+
commit: CommitData<serde_json::Value>,
+
info: EventInfo,
+
) -> bool {
+
let record = SetRecord {
+
new,
+
did: info.did,
+
rkey: commit.info.rkey,
+
record: commit.record,
+
};
+
let mut created_collection = false;
+
let collection = self
+
.current_batch
+
.records
+
.entry(commit.info.collection)
+
.or_insert_with(|| {
+
created_collection = true;
+
Default::default()
+
});
+
collection.push_front(record);
+
collection.truncate(MAX_BATCHED_RECORDS);
+
+
if created_collection {
+
self.current_batch.records.len() >= MAX_BATCHED_COLLECTIONS // full if we have collections to the max
+
} else {
+
false
+
}
+
}
+
+
fn handle_delete_record(&mut self, commit_info: CommitInfo, info: EventInfo) -> bool {
+
let rm = DeleteRecord {
+
did: info.did,
+
collection: commit_info.collection,
+
rkey: commit_info.rkey,
+
};
+
self.current_batch.record_deletes.push(rm);
+
self.current_batch.record_deletes.len() >= MAX_BATCHED_DELETES
+
}
+
+
fn handle_remove_account(&mut self, did: Did) -> bool {
+
self.current_batch.account_removes.push(did);
+
self.current_batch.account_removes.len() >= MAX_ACCOUNT_REMOVES
+
}
+
}
+27
ufos/src/lib.rs
···
+
pub mod consumer;
+
pub mod store;
+
+
use jetstream::exports::{Did, Nsid, RecordKey};
+
use std::collections::{HashMap, VecDeque};
+
+
#[derive(Debug)]
+
pub struct SetRecord {
+
pub new: bool,
+
pub did: Did,
+
pub rkey: RecordKey,
+
pub record: serde_json::Value,
+
}
+
+
#[derive(Debug)]
+
pub struct DeleteRecord {
+
pub did: Did,
+
pub collection: Nsid,
+
pub rkey: RecordKey,
+
}
+
+
#[derive(Debug, Default)]
+
pub struct EventBatch {
+
pub records: HashMap<Nsid, VecDeque<SetRecord>>,
+
pub record_deletes: Vec<DeleteRecord>,
+
pub account_removes: Vec<Did>,
+
}
+4 -70
ufos/src/main.rs
···
use clap::Parser;
use std::path::PathBuf;
-
use std::time::{Duration, Instant};
-
-
use tokio::select;
-
use tokio_util::sync::CancellationToken;
-
-
use jetstream::{
-
events::{commit::CommitEvent, JetstreamEvent::Commit},
-
DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
-
};
+
use ufos::consumer;
+
use ufos::store;
/// Aggregate links in the at-mosphere
#[derive(Parser, Debug)]
···
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();
-
-
let config: JetstreamConfig<serde_json::Value> = JetstreamConfig {
-
endpoint: DefaultJetstreamEndpoints::endpoint_or_shortcut(&args.jetstream),
-
compression: JetstreamCompression::Zstd,
-
..Default::default()
-
};
-
-
let stay_alive = CancellationToken::new();
-
-
ctrlc::set_handler({
-
let mut desperation: u8 = 0;
-
let stay_alive = stay_alive.clone();
-
move || match desperation {
-
0 => {
-
println!("ok, signalling shutdown...");
-
stay_alive.cancel();
-
desperation += 1;
-
}
-
1.. => panic!("fine, panicking!"),
-
}
-
})?;
-
-
let jetstream: JetstreamConnector<serde_json::Value> = JetstreamConnector::new(config)?;
-
let receiver = jetstream.connect().await?;
-
-
println!("Jetstream ready");
-
-
let print_throttle = Duration::from_millis(400);
-
let mut last = Instant::now();
-
loop {
-
select! {
-
_ = stay_alive.cancelled() => {
-
eprintln!("byeeee");
-
break
-
}
-
ev = receiver.recv_async() => {
-
match ev {
-
Ok(event) => {
-
if let Commit(CommitEvent::Create { commit, .. }) = event {
-
let now = Instant::now();
-
let since = now - last;
-
if since >= print_throttle {
-
let overshoot = since - print_throttle; // adjust to keep the rate on average
-
last = now - overshoot;
-
println!(
-
"{}: {}",
-
&*commit.info.collection,
-
serde_json::to_string(&commit.record)?
-
);
-
}
-
}
-
},
-
Err(e) => {
-
eprintln!("jetstream event error: {e:?}");
-
break
-
}
-
}
-
}
-
}
-
}
-
+
let batches = consumer::consume(&args.jetstream).await?;
+
store::receive(batches).await?;
Ok(())
}
+32
ufos/src/store.rs
···
+
use crate::EventBatch;
+
use std::time::Duration;
+
use tokio::sync::mpsc::Receiver;
+
use tokio::time::sleep;
+
+
pub async fn receive(mut receiver: Receiver<EventBatch>) -> anyhow::Result<()> {
+
loop {
+
eprintln!("receive loop, sleeping:");
+
sleep(Duration::from_secs_f64(0.5)).await;
+
eprintln!("slept.");
+
if let Some(batch) = receiver.recv().await {
+
eprintln!("got batch");
+
summarize(batch)
+
} else {
+
anyhow::bail!("receive channel closed")
+
}
+
}
+
}
+
+
fn summarize(batch: EventBatch) {
+
let EventBatch {
+
records,
+
record_deletes,
+
account_removes,
+
} = batch;
+
println!(
+
"got batch with {} collections, {} record deletes, {} account removes",
+
records.len(),
+
record_deletes.len(),
+
account_removes.len()
+
);
+
}