Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
at main 9.3 kB view raw
1use crate::store_types::SketchSecretPrefix; 2use jetstream::{ 3 events::{Cursor, EventKind, JetstreamEvent}, 4 exports::{Did, Nsid}, 5 DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector, 6 JetstreamReceiver, 7}; 8use metrics::{ 9 counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram, Unit, 10}; 11use std::mem; 12use std::time::Duration; 13use tokio::sync::mpsc::{channel, Receiver, Sender}; 14use tokio::time::{timeout, Interval}; 15 16use crate::error::{BatchInsertError, FirehoseEventError}; 17use crate::{DeleteAccount, EventBatch, UFOsCommit}; 18 19pub const MAX_BATCHED_RECORDS: usize = 128; // *non-blocking* limit. drops oldest batched record per collection once reached. 20pub const MAX_ACCOUNT_REMOVES: usize = 1024; // hard limit, extremely unlikely to reach, but just in case 21pub const MAX_BATCHED_COLLECTIONS: usize = 64; // hard limit, MAX_BATCHED_RECORDS applies per-collection 22pub const MIN_BATCH_SPAN_SECS: f64 = 2.; // breathe 23pub const MAX_BATCH_SPAN_SECS: f64 = 60.; // hard limit, pause consumer if we're unable to send by now 24pub const SEND_TIMEOUT_S: f64 = 150.; // if the channel is blocked longer than this, something is probably up 25pub const BATCH_QUEUE_SIZE: usize = 64; // used to be 1, but sometimes inserts are just really slow???????? 26 27pub type LimitedBatch = EventBatch<MAX_BATCHED_RECORDS>; 28 29#[derive(Debug, Default)] 30struct CurrentBatch { 31 initial_cursor: Option<Cursor>, 32 batch: LimitedBatch, 33} 34 35#[derive(Debug)] 36pub struct Batcher { 37 jetstream_receiver: JetstreamReceiver, 38 batch_sender: Sender<LimitedBatch>, 39 current_batch: CurrentBatch, 40 sketch_secret: SketchSecretPrefix, 41 rate_limit: Interval, 42} 43 44pub async fn consume( 45 jetstream_endpoint: &str, 46 cursor: Option<Cursor>, 47 no_compress: bool, 48 sketch_secret: SketchSecretPrefix, 49) -> anyhow::Result<Receiver<LimitedBatch>> { 50 let endpoint = DefaultJetstreamEndpoints::endpoint_or_shortcut(jetstream_endpoint); 51 if endpoint == jetstream_endpoint { 52 log::info!("connecting to jetstream at {endpoint}"); 53 } else { 54 log::info!("connecting to jetstream at {jetstream_endpoint} => {endpoint}"); 55 } 56 let config: JetstreamConfig = JetstreamConfig { 57 endpoint, 58 compression: if no_compress { 59 JetstreamCompression::None 60 } else { 61 JetstreamCompression::Zstd 62 }, 63 replay_on_reconnect: true, 64 channel_size: 1024, // buffer up to ~1s of jetstream events 65 ..Default::default() 66 }; 67 let jetstream_receiver = JetstreamConnector::new(config)? 68 .connect_cursor(cursor) 69 .await?; 70 let (batch_sender, batch_reciever) = channel::<LimitedBatch>(BATCH_QUEUE_SIZE); 71 let mut batcher = Batcher::new(jetstream_receiver, batch_sender, sketch_secret); 72 tokio::task::spawn(async move { 73 let r = batcher.run().await; 74 log::warn!("batcher ended: {r:?}"); 75 }); 76 Ok(batch_reciever) 77} 78 79impl Batcher { 80 pub fn new( 81 jetstream_receiver: JetstreamReceiver, 82 batch_sender: Sender<LimitedBatch>, 83 sketch_secret: SketchSecretPrefix, 84 ) -> Self { 85 describe_counter!( 86 "batcher_batches_sent", 87 Unit::Count, 88 "how many batches of events were sent from Batcher to storage" 89 ); 90 describe_gauge!( 91 "batcher_batch_age", 92 Unit::Microseconds, 93 "how old the last-sent batch was" 94 ); 95 describe_gauge!( 96 "batcher_send_queue_capacity", 97 Unit::Count, 98 "how many spaces are available for batches in the send queue" 99 ); 100 describe_histogram!( 101 "batcher_total_collections", 102 Unit::Count, 103 "how many collections are in this batch" 104 ); 105 let mut rate_limit = tokio::time::interval(std::time::Duration::from_millis(3)); 106 rate_limit.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 107 Self { 108 jetstream_receiver, 109 batch_sender, 110 current_batch: Default::default(), 111 sketch_secret, 112 rate_limit, 113 } 114 } 115 116 pub async fn run(&mut self) -> anyhow::Result<()> { 117 // TODO: report errors *from here* probably, since this gets shipped off into a spawned task that might just vanish 118 loop { 119 match timeout(Duration::from_secs_f64(30.), self.jetstream_receiver.recv()).await { 120 Err(_elapsed) => self.no_events_step().await?, 121 Ok(Some(event)) => self.handle_event(event).await?, 122 Ok(None) => anyhow::bail!("channel closed"), 123 } 124 } 125 } 126 127 async fn no_events_step(&mut self) -> anyhow::Result<()> { 128 let empty = self.current_batch.batch.is_empty(); 129 log::info!("no events received, stepping batcher (empty? {empty})"); 130 if !empty { 131 self.send_current_batch_now(true, "no events step").await?; 132 } 133 Ok(()) 134 } 135 136 async fn handle_event(&mut self, event: JetstreamEvent) -> anyhow::Result<()> { 137 if let Some(earliest) = &self.current_batch.initial_cursor { 138 if event.cursor.duration_since(earliest)? > Duration::from_secs_f64(MAX_BATCH_SPAN_SECS) 139 { 140 self.send_current_batch_now(false, "time since event") 141 .await?; 142 } 143 } else { 144 self.current_batch.initial_cursor = Some(event.cursor); 145 } 146 147 match event.kind { 148 EventKind::Commit => { 149 let commit = event 150 .commit 151 .ok_or(FirehoseEventError::CommitEventMissingCommit)?; 152 let (commit, nsid) = UFOsCommit::from_commit_info(commit, event.did, event.cursor)?; 153 self.handle_commit(commit, nsid).await?; 154 } 155 EventKind::Account => { 156 let account = event 157 .account 158 .ok_or(FirehoseEventError::AccountEventMissingAccount)?; 159 if !account.active { 160 self.handle_delete_account(event.did, event.cursor).await?; 161 } 162 } 163 _ => {} 164 } 165 166 // if the queue is empty and we have enough, send immediately. otherewise, let the current batch fill up. 167 if let Some(earliest) = &self.current_batch.initial_cursor { 168 if event.cursor.duration_since(earliest)?.as_secs_f64() > MIN_BATCH_SPAN_SECS 169 && self.batch_sender.capacity() == BATCH_QUEUE_SIZE 170 { 171 self.send_current_batch_now(true, "available queue").await?; 172 } 173 } 174 Ok(()) 175 } 176 177 async fn handle_commit(&mut self, commit: UFOsCommit, collection: Nsid) -> anyhow::Result<()> { 178 let optimistic_res = self.current_batch.batch.insert_commit_by_nsid( 179 &collection, 180 commit, 181 MAX_BATCHED_COLLECTIONS, 182 &self.sketch_secret, 183 ); 184 185 if let Err(BatchInsertError::BatchFull(commit)) = optimistic_res { 186 self.send_current_batch_now(false, "handle commit").await?; 187 self.current_batch.batch.insert_commit_by_nsid( 188 &collection, 189 commit, 190 MAX_BATCHED_COLLECTIONS, 191 &self.sketch_secret, 192 )?; 193 } else { 194 optimistic_res?; 195 } 196 197 Ok(()) 198 } 199 200 async fn handle_delete_account(&mut self, did: Did, cursor: Cursor) -> anyhow::Result<()> { 201 if self.current_batch.batch.account_removes.len() >= MAX_ACCOUNT_REMOVES { 202 self.send_current_batch_now(false, "delete account").await?; 203 } 204 self.current_batch 205 .batch 206 .account_removes 207 .push(DeleteAccount { did, cursor }); 208 Ok(()) 209 } 210 211 // holds up all consumer progress until it can send to the channel 212 // use this when the current batch is too full to add more to it 213 async fn send_current_batch_now(&mut self, small: bool, referrer: &str) -> anyhow::Result<()> { 214 let size_label = if small { "small" } else { "full" }; 215 let queue_cap = self.batch_sender.capacity(); 216 217 if let Some(cursor) = self.current_batch.initial_cursor { 218 gauge!("batcher_batch_age", "size" => size_label).set(cursor.elapsed_micros_f64()); 219 } 220 histogram!("batcher_total_collections", "size" => size_label) 221 .record(self.current_batch.batch.total_collections() as f64); 222 gauge!("batcher_send_queue_capacity").set(queue_cap as f64); 223 224 let beginning = match self.current_batch.initial_cursor.map(|c| c.elapsed()) { 225 None => "unknown".to_string(), 226 Some(Ok(t)) => format!("{t:?}"), 227 Some(Err(e)) => format!("+{:?}", e.duration()), 228 }; 229 log::trace!( 230 "sending batch now from {beginning}, {size_label}, queue capacity: {queue_cap}, referrer: {referrer}", 231 ); 232 let current = mem::take(&mut self.current_batch); 233 self.rate_limit.tick().await; 234 self.batch_sender 235 .send_timeout(current.batch, Duration::from_secs_f64(SEND_TIMEOUT_S)) 236 .await?; 237 counter!("batcher_batches_sent", "size" => size_label, "referrer" => referrer.to_string()) 238 .increment(1); 239 Ok(()) 240 } 241}