Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
at main 3.0 kB view raw
1use crate::consumer::{Batcher, LimitedBatch, BATCH_QUEUE_SIZE}; 2use crate::store_types::SketchSecretPrefix; 3use crate::Cursor; 4use anyhow::Result; 5use jetstream::{error::JetstreamEventError, events::JetstreamEvent}; 6use std::path::PathBuf; 7use tokio::{ 8 fs::File, 9 io::{AsyncBufReadExt, BufReader}, 10 sync::mpsc::{channel, Receiver, Sender}, 11}; 12 13async fn read_jsonl(f: File, sender: Sender<JetstreamEvent>, cursor: Option<Cursor>) -> Result<()> { 14 let mut lines = BufReader::new(f).lines(); 15 if let Some(db_cursor) = cursor { 16 log::info!("jsonl fixture: skipping events before cursor {db_cursor:?}"); 17 let mut bad_lines = 0; 18 let mut skipped = 0; 19 while let Some(line) = lines.next_line().await? { 20 let Ok(event) = serde_json::from_str::<JetstreamEvent>(&line) else { 21 bad_lines += 1; 22 continue; 23 }; 24 if event.cursor < db_cursor { 25 skipped += 1; 26 continue; 27 } 28 if event.cursor == db_cursor { 29 log::info!("jsonl fixture: found existing db cursor! skipped {skipped} old events and failed parsing {bad_lines} lines"); 30 break; 31 } 32 anyhow::bail!("jsonl fixture: did not find existing db cursor, found event cursor {:?} which is newer. bailing.", event.cursor); 33 } 34 } else { 35 log::info!("jsonl fixture: no cursor provided, sending every event"); 36 } 37 38 log::info!("jsonl fixture: now sending events"); 39 while let Some(line) = lines.next_line().await? { 40 match serde_json::from_str::<JetstreamEvent>(&line) { 41 Ok(event) => match sender.send(event).await { 42 Ok(_) => {} 43 Err(e) => { 44 log::warn!("All receivers for the jsonl fixture have been dropped, bye: {e:?}"); 45 return Err(JetstreamEventError::ReceiverClosedError.into()); 46 } 47 }, 48 Err(parse_err) => { 49 log::warn!("failed to parse event: {parse_err:?} from event:\n{line}"); 50 continue; 51 } 52 } 53 } 54 log::info!("reached end of jsonl file, looping on noop to keep server alive."); 55 loop { 56 tokio::time::sleep(std::time::Duration::from_secs_f64(10.)).await; 57 } 58} 59 60pub async fn consume( 61 p: PathBuf, 62 sketch_secret: SketchSecretPrefix, 63 cursor: Option<Cursor>, 64) -> Result<Receiver<LimitedBatch>> { 65 let f = File::open(p).await?; 66 let (jsonl_sender, jsonl_receiver) = channel::<JetstreamEvent>(16); 67 let (batch_sender, batch_reciever) = channel::<LimitedBatch>(BATCH_QUEUE_SIZE); 68 let mut batcher = Batcher::new(jsonl_receiver, batch_sender, sketch_secret); 69 tokio::task::spawn(async move { 70 let r = read_jsonl(f, jsonl_sender, cursor).await; 71 log::warn!("read_jsonl finished: {r:?}"); 72 }); 73 tokio::task::spawn(async move { 74 let r = batcher.run().await; 75 log::warn!("batcher finished: {r:?}"); 76 }); 77 Ok(batch_reciever) 78}