forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::store_types::SketchSecretPrefix;
2use jetstream::{
3 events::{Cursor, EventKind, JetstreamEvent},
4 exports::{Did, Nsid},
5 DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
6 JetstreamReceiver,
7};
8use metrics::{
9 counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram, Unit,
10};
11use std::mem;
12use std::time::Duration;
13use tokio::sync::mpsc::{channel, Receiver, Sender};
14use tokio::time::{timeout, Interval};
15
16use crate::error::{BatchInsertError, FirehoseEventError};
17use crate::{DeleteAccount, EventBatch, UFOsCommit};
18
19pub const MAX_BATCHED_RECORDS: usize = 128; // *non-blocking* limit. drops oldest batched record per collection once reached.
20pub const MAX_ACCOUNT_REMOVES: usize = 1024; // hard limit, extremely unlikely to reach, but just in case
21pub const MAX_BATCHED_COLLECTIONS: usize = 64; // hard limit, MAX_BATCHED_RECORDS applies per-collection
22pub const MIN_BATCH_SPAN_SECS: f64 = 2.; // breathe
23pub const MAX_BATCH_SPAN_SECS: f64 = 60.; // hard limit, pause consumer if we're unable to send by now
24pub const SEND_TIMEOUT_S: f64 = 150.; // if the channel is blocked longer than this, something is probably up
25pub const BATCH_QUEUE_SIZE: usize = 64; // used to be 1, but sometimes inserts are just really slow????????
26
27pub type LimitedBatch = EventBatch<MAX_BATCHED_RECORDS>;
28
29#[derive(Debug, Default)]
30struct CurrentBatch {
31 initial_cursor: Option<Cursor>,
32 batch: LimitedBatch,
33}
34
35#[derive(Debug)]
36pub struct Batcher {
37 jetstream_receiver: JetstreamReceiver,
38 batch_sender: Sender<LimitedBatch>,
39 current_batch: CurrentBatch,
40 sketch_secret: SketchSecretPrefix,
41 rate_limit: Interval,
42}
43
44pub async fn consume(
45 jetstream_endpoint: &str,
46 cursor: Option<Cursor>,
47 no_compress: bool,
48 sketch_secret: SketchSecretPrefix,
49) -> anyhow::Result<Receiver<LimitedBatch>> {
50 let endpoint = DefaultJetstreamEndpoints::endpoint_or_shortcut(jetstream_endpoint);
51 if endpoint == jetstream_endpoint {
52 log::info!("connecting to jetstream at {endpoint}");
53 } else {
54 log::info!("connecting to jetstream at {jetstream_endpoint} => {endpoint}");
55 }
56 let config: JetstreamConfig = JetstreamConfig {
57 endpoint,
58 compression: if no_compress {
59 JetstreamCompression::None
60 } else {
61 JetstreamCompression::Zstd
62 },
63 replay_on_reconnect: true,
64 channel_size: 1024, // buffer up to ~1s of jetstream events
65 ..Default::default()
66 };
67 let jetstream_receiver = JetstreamConnector::new(config)?
68 .connect_cursor(cursor)
69 .await?;
70 let (batch_sender, batch_reciever) = channel::<LimitedBatch>(BATCH_QUEUE_SIZE);
71 let mut batcher = Batcher::new(jetstream_receiver, batch_sender, sketch_secret);
72 tokio::task::spawn(async move {
73 let r = batcher.run().await;
74 log::warn!("batcher ended: {r:?}");
75 });
76 Ok(batch_reciever)
77}
78
79impl Batcher {
80 pub fn new(
81 jetstream_receiver: JetstreamReceiver,
82 batch_sender: Sender<LimitedBatch>,
83 sketch_secret: SketchSecretPrefix,
84 ) -> Self {
85 describe_counter!(
86 "batcher_batches_sent",
87 Unit::Count,
88 "how many batches of events were sent from Batcher to storage"
89 );
90 describe_gauge!(
91 "batcher_batch_age",
92 Unit::Microseconds,
93 "how old the last-sent batch was"
94 );
95 describe_gauge!(
96 "batcher_send_queue_capacity",
97 Unit::Count,
98 "how many spaces are available for batches in the send queue"
99 );
100 describe_histogram!(
101 "batcher_total_collections",
102 Unit::Count,
103 "how many collections are in this batch"
104 );
105 let mut rate_limit = tokio::time::interval(std::time::Duration::from_millis(3));
106 rate_limit.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
107 Self {
108 jetstream_receiver,
109 batch_sender,
110 current_batch: Default::default(),
111 sketch_secret,
112 rate_limit,
113 }
114 }
115
116 pub async fn run(&mut self) -> anyhow::Result<()> {
117 // TODO: report errors *from here* probably, since this gets shipped off into a spawned task that might just vanish
118 loop {
119 match timeout(Duration::from_secs_f64(30.), self.jetstream_receiver.recv()).await {
120 Err(_elapsed) => self.no_events_step().await?,
121 Ok(Some(event)) => self.handle_event(event).await?,
122 Ok(None) => anyhow::bail!("channel closed"),
123 }
124 }
125 }
126
127 async fn no_events_step(&mut self) -> anyhow::Result<()> {
128 let empty = self.current_batch.batch.is_empty();
129 log::info!("no events received, stepping batcher (empty? {empty})");
130 if !empty {
131 self.send_current_batch_now(true, "no events step").await?;
132 }
133 Ok(())
134 }
135
136 async fn handle_event(&mut self, event: JetstreamEvent) -> anyhow::Result<()> {
137 if let Some(earliest) = &self.current_batch.initial_cursor {
138 if event.cursor.duration_since(earliest)? > Duration::from_secs_f64(MAX_BATCH_SPAN_SECS)
139 {
140 self.send_current_batch_now(false, "time since event")
141 .await?;
142 }
143 } else {
144 self.current_batch.initial_cursor = Some(event.cursor);
145 }
146
147 match event.kind {
148 EventKind::Commit => {
149 let commit = event
150 .commit
151 .ok_or(FirehoseEventError::CommitEventMissingCommit)?;
152 let (commit, nsid) = UFOsCommit::from_commit_info(commit, event.did, event.cursor)?;
153 self.handle_commit(commit, nsid).await?;
154 }
155 EventKind::Account => {
156 let account = event
157 .account
158 .ok_or(FirehoseEventError::AccountEventMissingAccount)?;
159 if !account.active {
160 self.handle_delete_account(event.did, event.cursor).await?;
161 }
162 }
163 _ => {}
164 }
165
166 // if the queue is empty and we have enough, send immediately. otherewise, let the current batch fill up.
167 if let Some(earliest) = &self.current_batch.initial_cursor {
168 if event.cursor.duration_since(earliest)?.as_secs_f64() > MIN_BATCH_SPAN_SECS
169 && self.batch_sender.capacity() == BATCH_QUEUE_SIZE
170 {
171 self.send_current_batch_now(true, "available queue").await?;
172 }
173 }
174 Ok(())
175 }
176
177 async fn handle_commit(&mut self, commit: UFOsCommit, collection: Nsid) -> anyhow::Result<()> {
178 let optimistic_res = self.current_batch.batch.insert_commit_by_nsid(
179 &collection,
180 commit,
181 MAX_BATCHED_COLLECTIONS,
182 &self.sketch_secret,
183 );
184
185 if let Err(BatchInsertError::BatchFull(commit)) = optimistic_res {
186 self.send_current_batch_now(false, "handle commit").await?;
187 self.current_batch.batch.insert_commit_by_nsid(
188 &collection,
189 commit,
190 MAX_BATCHED_COLLECTIONS,
191 &self.sketch_secret,
192 )?;
193 } else {
194 optimistic_res?;
195 }
196
197 Ok(())
198 }
199
200 async fn handle_delete_account(&mut self, did: Did, cursor: Cursor) -> anyhow::Result<()> {
201 if self.current_batch.batch.account_removes.len() >= MAX_ACCOUNT_REMOVES {
202 self.send_current_batch_now(false, "delete account").await?;
203 }
204 self.current_batch
205 .batch
206 .account_removes
207 .push(DeleteAccount { did, cursor });
208 Ok(())
209 }
210
211 // holds up all consumer progress until it can send to the channel
212 // use this when the current batch is too full to add more to it
213 async fn send_current_batch_now(&mut self, small: bool, referrer: &str) -> anyhow::Result<()> {
214 let size_label = if small { "small" } else { "full" };
215 let queue_cap = self.batch_sender.capacity();
216
217 if let Some(cursor) = self.current_batch.initial_cursor {
218 gauge!("batcher_batch_age", "size" => size_label).set(cursor.elapsed_micros_f64());
219 }
220 histogram!("batcher_total_collections", "size" => size_label)
221 .record(self.current_batch.batch.total_collections() as f64);
222 gauge!("batcher_send_queue_capacity").set(queue_cap as f64);
223
224 let beginning = match self.current_batch.initial_cursor.map(|c| c.elapsed()) {
225 None => "unknown".to_string(),
226 Some(Ok(t)) => format!("{t:?}"),
227 Some(Err(e)) => format!("+{:?}", e.duration()),
228 };
229 log::trace!(
230 "sending batch now from {beginning}, {size_label}, queue capacity: {queue_cap}, referrer: {referrer}",
231 );
232 let current = mem::take(&mut self.current_batch);
233 self.rate_limit.tick().await;
234 self.batch_sender
235 .send_timeout(current.batch, Duration::from_secs_f64(SEND_TIMEOUT_S))
236 .await?;
237 counter!("batcher_batches_sent", "size" => size_label, "referrer" => referrer.to_string())
238 .increment(1);
239 Ok(())
240 }
241}