forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::consumer::{Batcher, LimitedBatch, BATCH_QUEUE_SIZE};
2use crate::store_types::SketchSecretPrefix;
3use crate::Cursor;
4use anyhow::Result;
5use jetstream::{error::JetstreamEventError, events::JetstreamEvent};
6use std::path::PathBuf;
7use tokio::{
8 fs::File,
9 io::{AsyncBufReadExt, BufReader},
10 sync::mpsc::{channel, Receiver, Sender},
11};
12
13async fn read_jsonl(f: File, sender: Sender<JetstreamEvent>, cursor: Option<Cursor>) -> Result<()> {
14 let mut lines = BufReader::new(f).lines();
15 if let Some(db_cursor) = cursor {
16 log::info!("jsonl fixture: skipping events before cursor {db_cursor:?}");
17 let mut bad_lines = 0;
18 let mut skipped = 0;
19 while let Some(line) = lines.next_line().await? {
20 let Ok(event) = serde_json::from_str::<JetstreamEvent>(&line) else {
21 bad_lines += 1;
22 continue;
23 };
24 if event.cursor < db_cursor {
25 skipped += 1;
26 continue;
27 }
28 if event.cursor == db_cursor {
29 log::info!("jsonl fixture: found existing db cursor! skipped {skipped} old events and failed parsing {bad_lines} lines");
30 break;
31 }
32 anyhow::bail!("jsonl fixture: did not find existing db cursor, found event cursor {:?} which is newer. bailing.", event.cursor);
33 }
34 } else {
35 log::info!("jsonl fixture: no cursor provided, sending every event");
36 }
37
38 log::info!("jsonl fixture: now sending events");
39 while let Some(line) = lines.next_line().await? {
40 match serde_json::from_str::<JetstreamEvent>(&line) {
41 Ok(event) => match sender.send(event).await {
42 Ok(_) => {}
43 Err(e) => {
44 log::warn!("All receivers for the jsonl fixture have been dropped, bye: {e:?}");
45 return Err(JetstreamEventError::ReceiverClosedError.into());
46 }
47 },
48 Err(parse_err) => {
49 log::warn!("failed to parse event: {parse_err:?} from event:\n{line}");
50 continue;
51 }
52 }
53 }
54 log::info!("reached end of jsonl file, looping on noop to keep server alive.");
55 loop {
56 tokio::time::sleep(std::time::Duration::from_secs_f64(10.)).await;
57 }
58}
59
60pub async fn consume(
61 p: PathBuf,
62 sketch_secret: SketchSecretPrefix,
63 cursor: Option<Cursor>,
64) -> Result<Receiver<LimitedBatch>> {
65 let f = File::open(p).await?;
66 let (jsonl_sender, jsonl_receiver) = channel::<JetstreamEvent>(16);
67 let (batch_sender, batch_reciever) = channel::<LimitedBatch>(BATCH_QUEUE_SIZE);
68 let mut batcher = Batcher::new(jsonl_receiver, batch_sender, sketch_secret);
69 tokio::task::spawn(async move {
70 let r = read_jsonl(f, jsonl_sender, cursor).await;
71 log::warn!("read_jsonl finished: {r:?}");
72 });
73 tokio::task::spawn(async move {
74 let r = batcher.run().await;
75 log::warn!("batcher finished: {r:?}");
76 });
77 Ok(batch_reciever)
78}