forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::CachedRecord;
2use crate::error::ConsumerError;
3use foyer::HybridCache;
4use jetstream::{
5 DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
6 events::{CommitOp, Cursor, EventKind},
7};
8use tokio_util::sync::CancellationToken;
9
10pub async fn consume(
11 jetstream_endpoint: String,
12 cursor: Option<Cursor>,
13 no_zstd: bool,
14 shutdown: CancellationToken,
15 cache: HybridCache<String, CachedRecord>,
16) -> Result<(), ConsumerError> {
17 let endpoint = DefaultJetstreamEndpoints::endpoint_or_shortcut(&jetstream_endpoint);
18 if endpoint == jetstream_endpoint {
19 log::info!("consumer: connecting jetstream at {endpoint}");
20 } else {
21 log::info!("consumer: connecting jetstream at {jetstream_endpoint} => {endpoint}");
22 }
23 let config: JetstreamConfig = JetstreamConfig {
24 endpoint,
25 compression: if no_zstd {
26 JetstreamCompression::None
27 } else {
28 JetstreamCompression::Zstd
29 },
30 replay_on_reconnect: true,
31 channel_size: 1024, // buffer up to ~1s of jetstream events
32 ..Default::default()
33 };
34 let mut receiver = JetstreamConnector::new(config)?
35 .connect_cursor(cursor)
36 .await?;
37
38 log::info!("consumer: receiving messages..");
39 loop {
40 if shutdown.is_cancelled() {
41 log::info!("consumer: exiting for shutdown");
42 return Ok(());
43 }
44 let Some(mut event) = receiver.recv().await else {
45 log::error!("consumer: could not receive event, bailing");
46 break;
47 };
48
49 if event.kind != EventKind::Commit {
50 continue;
51 }
52 let Some(ref mut commit) = event.commit else {
53 log::warn!("consumer: commit event missing commit data, ignoring");
54 continue;
55 };
56
57 // TODO: something a bit more robust
58 let at_uri = format!(
59 "at://{}/{}/{}",
60 &*event.did, &*commit.collection, &*commit.rkey
61 );
62
63 if commit.operation == CommitOp::Delete {
64 cache.insert(at_uri, CachedRecord::Deleted);
65 } else {
66 let Some(record) = commit.record.take() else {
67 log::warn!("consumer: commit insert or update missing record, ignoring");
68 continue;
69 };
70 let Some(cid) = commit.cid.take() else {
71 log::warn!("consumer: commit insert or update missing CID, ignoring");
72 continue;
73 };
74
75 cache.insert(at_uri, CachedRecord::Found((cid, record).into()));
76 }
77 }
78
79 Err(ConsumerError::JetstreamEnded)
80}