Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::CachedRecord; 2use crate::error::ConsumerError; 3use foyer::HybridCache; 4use jetstream::{ 5 DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector, 6 events::{CommitOp, Cursor, EventKind}, 7}; 8use tokio_util::sync::CancellationToken; 9 10pub async fn consume( 11 jetstream_endpoint: String, 12 cursor: Option<Cursor>, 13 no_zstd: bool, 14 shutdown: CancellationToken, 15 cache: HybridCache<String, CachedRecord>, 16) -> Result<(), ConsumerError> { 17 let endpoint = DefaultJetstreamEndpoints::endpoint_or_shortcut(&jetstream_endpoint); 18 if endpoint == jetstream_endpoint { 19 log::info!("consumer: connecting jetstream at {endpoint}"); 20 } else { 21 log::info!("consumer: connecting jetstream at {jetstream_endpoint} => {endpoint}"); 22 } 23 let config: JetstreamConfig = JetstreamConfig { 24 endpoint, 25 compression: if no_zstd { 26 JetstreamCompression::None 27 } else { 28 JetstreamCompression::Zstd 29 }, 30 replay_on_reconnect: true, 31 channel_size: 1024, // buffer up to ~1s of jetstream events 32 ..Default::default() 33 }; 34 let mut receiver = JetstreamConnector::new(config)? 35 .connect_cursor(cursor) 36 .await?; 37 38 log::info!("consumer: receiving messages.."); 39 loop { 40 if shutdown.is_cancelled() { 41 log::info!("consumer: exiting for shutdown"); 42 return Ok(()); 43 } 44 let Some(mut event) = receiver.recv().await else { 45 log::error!("consumer: could not receive event, bailing"); 46 break; 47 }; 48 49 if event.kind != EventKind::Commit { 50 continue; 51 } 52 let Some(ref mut commit) = event.commit else { 53 log::warn!("consumer: commit event missing commit data, ignoring"); 54 continue; 55 }; 56 57 // TODO: something a bit more robust 58 let at_uri = format!( 59 "at://{}/{}/{}", 60 &*event.did, &*commit.collection, &*commit.rkey 61 ); 62 63 if commit.operation == CommitOp::Delete { 64 cache.insert(at_uri, CachedRecord::Deleted); 65 } else { 66 let Some(record) = commit.record.take() else { 67 log::warn!("consumer: commit insert or update missing record, ignoring"); 68 continue; 69 }; 70 let Some(cid) = commit.cid.take() else { 71 log::warn!("consumer: commit insert or update missing CID, ignoring"); 72 continue; 73 }; 74 75 cache.insert(at_uri, CachedRecord::Found((cid, record).into())); 76 } 77 } 78 79 Err(ConsumerError::JetstreamEnded) 80}