forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::ClientMessage;
2use crate::error::ConsumerError;
3use crate::removable_delay_queue;
4use jetstream::{
5 DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
6 events::{CommitOp, Cursor, EventKind},
7};
8use links::collect_links;
9use std::sync::Arc;
10use tokio::sync::broadcast;
11use tokio_util::sync::CancellationToken;
12
13const MAX_LINKS_PER_EVENT: usize = 100;
14
15pub async fn consume(
16 b: broadcast::Sender<Arc<ClientMessage>>,
17 d: removable_delay_queue::Input<(String, usize), Arc<ClientMessage>>,
18 jetstream_endpoint: String,
19 cursor: Option<Cursor>,
20 no_zstd: bool,
21 shutdown: CancellationToken,
22) -> Result<(), ConsumerError> {
23 let endpoint = DefaultJetstreamEndpoints::endpoint_or_shortcut(&jetstream_endpoint);
24 if endpoint == jetstream_endpoint {
25 log::info!("consumer: connecting jetstream at {endpoint}");
26 } else {
27 log::info!("consumer: connecting jetstream at {jetstream_endpoint} => {endpoint}");
28 }
29 let config: JetstreamConfig = JetstreamConfig {
30 endpoint,
31 compression: if no_zstd {
32 JetstreamCompression::None
33 } else {
34 JetstreamCompression::Zstd
35 },
36 replay_on_reconnect: true,
37 channel_size: 1024, // buffer up to ~1s of jetstream events
38 ..Default::default()
39 };
40 let mut receiver = JetstreamConnector::new(config)?
41 .connect_cursor(cursor)
42 .await?;
43
44 log::info!("consumer: receiving messages..");
45 loop {
46 if shutdown.is_cancelled() {
47 log::info!("consumer: exiting for shutdown");
48 return Ok(());
49 }
50 let Some(event) = receiver.recv().await else {
51 log::error!("consumer: could not receive event, bailing");
52 break;
53 };
54
55 if event.kind != EventKind::Commit {
56 continue;
57 }
58 let Some(ref commit) = event.commit else {
59 log::warn!("consumer: commit event missing commit data, ignoring");
60 continue;
61 };
62
63 // TODO: something a bit more robust
64 let at_uri = format!(
65 "at://{}/{}/{}",
66 &*event.did, &*commit.collection, &*commit.rkey
67 );
68
69 // TODO: keep a buffer and remove quick deletes to debounce notifs
70 // for now we just drop all deletes eek
71 if commit.operation == CommitOp::Delete {
72 d.remove_range((at_uri.clone(), 0)..=(at_uri.clone(), MAX_LINKS_PER_EVENT))
73 .await;
74 continue;
75 }
76 let Some(ref record) = commit.record else {
77 log::warn!("consumer: commit update/delete missing record, ignoring");
78 continue;
79 };
80
81 let jv = match record.get().parse() {
82 Ok(v) => v,
83 Err(e) => {
84 log::warn!("consumer: record failed to parse, ignoring: {e}");
85 continue;
86 }
87 };
88
89 for (i, link) in collect_links(&jv).into_iter().enumerate() {
90 if i >= MAX_LINKS_PER_EVENT {
91 // todo: indicate if the link limit was reached (-> links omitted)
92 log::warn!("consumer: event has too many links, ignoring the rest");
93 metrics::counter!("consumer_dropped_links", "reason" => "too_many_links")
94 .increment(1);
95 break;
96 }
97 let client_message = match ClientMessage::new_link(link, &at_uri, commit) {
98 Ok(m) => m,
99 Err(e) => {
100 // TODO indicate to clients that a link has been dropped
101 log::warn!("consumer: failed to serialize link to json: {e:?}");
102 metrics::counter!("consumer_dropped_links", "reason" => "failed_to_serialize")
103 .increment(1);
104 continue;
105 }
106 };
107 let message = Arc::new(client_message);
108 let _ = b.send(message.clone()); // only errors if no subscribers are connected, which is just fine.
109 d.enqueue((at_uri.clone(), i), message)
110 .await
111 .map_err(|_| ConsumerError::DelayQueueOutputDropped)?;
112 }
113 }
114
115 Err(ConsumerError::JetstreamEnded)
116}