Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
at main 4.3 kB view raw
1use crate::ClientMessage; 2use crate::error::ConsumerError; 3use crate::removable_delay_queue; 4use jetstream::{ 5 DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector, 6 events::{CommitOp, Cursor, EventKind}, 7}; 8use links::collect_links; 9use std::sync::Arc; 10use tokio::sync::broadcast; 11use tokio_util::sync::CancellationToken; 12 13const MAX_LINKS_PER_EVENT: usize = 100; 14 15pub async fn consume( 16 b: broadcast::Sender<Arc<ClientMessage>>, 17 d: removable_delay_queue::Input<(String, usize), Arc<ClientMessage>>, 18 jetstream_endpoint: String, 19 cursor: Option<Cursor>, 20 no_zstd: bool, 21 shutdown: CancellationToken, 22) -> Result<(), ConsumerError> { 23 let endpoint = DefaultJetstreamEndpoints::endpoint_or_shortcut(&jetstream_endpoint); 24 if endpoint == jetstream_endpoint { 25 log::info!("consumer: connecting jetstream at {endpoint}"); 26 } else { 27 log::info!("consumer: connecting jetstream at {jetstream_endpoint} => {endpoint}"); 28 } 29 let config: JetstreamConfig = JetstreamConfig { 30 endpoint, 31 compression: if no_zstd { 32 JetstreamCompression::None 33 } else { 34 JetstreamCompression::Zstd 35 }, 36 replay_on_reconnect: true, 37 channel_size: 1024, // buffer up to ~1s of jetstream events 38 ..Default::default() 39 }; 40 let mut receiver = JetstreamConnector::new(config)? 41 .connect_cursor(cursor) 42 .await?; 43 44 log::info!("consumer: receiving messages.."); 45 loop { 46 if shutdown.is_cancelled() { 47 log::info!("consumer: exiting for shutdown"); 48 return Ok(()); 49 } 50 let Some(event) = receiver.recv().await else { 51 log::error!("consumer: could not receive event, bailing"); 52 break; 53 }; 54 55 if event.kind != EventKind::Commit { 56 continue; 57 } 58 let Some(ref commit) = event.commit else { 59 log::warn!("consumer: commit event missing commit data, ignoring"); 60 continue; 61 }; 62 63 // TODO: something a bit more robust 64 let at_uri = format!( 65 "at://{}/{}/{}", 66 &*event.did, &*commit.collection, &*commit.rkey 67 ); 68 69 // TODO: keep a buffer and remove quick deletes to debounce notifs 70 // for now we just drop all deletes eek 71 if commit.operation == CommitOp::Delete { 72 d.remove_range((at_uri.clone(), 0)..=(at_uri.clone(), MAX_LINKS_PER_EVENT)) 73 .await; 74 continue; 75 } 76 let Some(ref record) = commit.record else { 77 log::warn!("consumer: commit update/delete missing record, ignoring"); 78 continue; 79 }; 80 81 let jv = match record.get().parse() { 82 Ok(v) => v, 83 Err(e) => { 84 log::warn!("consumer: record failed to parse, ignoring: {e}"); 85 continue; 86 } 87 }; 88 89 for (i, link) in collect_links(&jv).into_iter().enumerate() { 90 if i >= MAX_LINKS_PER_EVENT { 91 // todo: indicate if the link limit was reached (-> links omitted) 92 log::warn!("consumer: event has too many links, ignoring the rest"); 93 metrics::counter!("consumer_dropped_links", "reason" => "too_many_links") 94 .increment(1); 95 break; 96 } 97 let client_message = match ClientMessage::new_link(link, &at_uri, commit) { 98 Ok(m) => m, 99 Err(e) => { 100 // TODO indicate to clients that a link has been dropped 101 log::warn!("consumer: failed to serialize link to json: {e:?}"); 102 metrics::counter!("consumer_dropped_links", "reason" => "failed_to_serialize") 103 .increment(1); 104 continue; 105 } 106 }; 107 let message = Arc::new(client_message); 108 let _ = b.send(message.clone()); // only errors if no subscribers are connected, which is just fine. 109 d.enqueue((at_uri.clone(), i), message) 110 .await 111 .map_err(|_| ConsumerError::DelayQueueOutputDropped)?; 112 } 113 } 114 115 Err(ConsumerError::JetstreamEnded) 116}