Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

backfill: faster and allow skipping old events

+7 -2
ufos/src/consumer.rs
···
batch_sender: Sender<LimitedBatch>,
sketch_secret: SketchSecretPrefix,
) -> Self {
-
let mut rate_limit = tokio::time::interval(std::time::Duration::from_millis(5));
rate_limit.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
Self {
jetstream_receiver,
···
pub async fn run(&mut self) -> anyhow::Result<()> {
// TODO: report errors *from here* probably, since this gets shipped off into a spawned task that might just vanish
loop {
-
match timeout(Duration::from_millis(9_000), self.jetstream_receiver.recv()).await {
Err(_elapsed) => self.no_events_step().await?,
Ok(Some(event)) => self.handle_event(event).await?,
Ok(None) => anyhow::bail!("channel closed"),
···
batch_sender: Sender<LimitedBatch>,
sketch_secret: SketchSecretPrefix,
) -> Self {
+
let mut rate_limit = tokio::time::interval(std::time::Duration::from_millis(3));
rate_limit.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
Self {
jetstream_receiver,
···
pub async fn run(&mut self) -> anyhow::Result<()> {
// TODO: report errors *from here* probably, since this gets shipped off into a spawned task that might just vanish
loop {
+
match timeout(
+
Duration::from_millis(30_000),
+
self.jetstream_receiver.recv(),
+
)
+
.await
+
{
Err(_elapsed) => self.no_events_step().await?,
Ok(Some(event)) => self.handle_event(event).await?,
Ok(None) => anyhow::bail!("channel closed"),
+30 -4
ufos/src/file_consumer.rs
···
use crate::consumer::{Batcher, LimitedBatch, BATCH_QUEUE_SIZE};
use crate::store_types::SketchSecretPrefix;
use anyhow::Result;
use jetstream::{error::JetstreamEventError, events::JetstreamEvent};
use std::path::PathBuf;
···
sync::mpsc::{channel, Receiver, Sender},
};
-
async fn read_jsonl(f: File, sender: Sender<JetstreamEvent>) -> Result<()> {
let mut lines = BufReader::new(f).lines();
while let Some(line) = lines.next_line().await? {
match serde_json::from_str::<JetstreamEvent>(&line) {
Ok(event) => match sender.send(event).await {
···
pub async fn consume(
p: PathBuf,
sketch_secret: SketchSecretPrefix,
) -> Result<Receiver<LimitedBatch>> {
let f = File::open(p).await?;
let (jsonl_sender, jsonl_receiver) = channel::<JetstreamEvent>(16);
let (batch_sender, batch_reciever) = channel::<LimitedBatch>(BATCH_QUEUE_SIZE);
let mut batcher = Batcher::new(jsonl_receiver, batch_sender, sketch_secret);
tokio::task::spawn(async move {
-
let r = read_jsonl(f, jsonl_sender).await;
-
log::info!("read_jsonl finished: {r:?}");
});
tokio::task::spawn(async move {
let r = batcher.run().await;
-
log::info!("batcher finished: {r:?}");
});
Ok(batch_reciever)
}
···
use crate::consumer::{Batcher, LimitedBatch, BATCH_QUEUE_SIZE};
use crate::store_types::SketchSecretPrefix;
+
use crate::Cursor;
use anyhow::Result;
use jetstream::{error::JetstreamEventError, events::JetstreamEvent};
use std::path::PathBuf;
···
sync::mpsc::{channel, Receiver, Sender},
};
+
async fn read_jsonl(f: File, sender: Sender<JetstreamEvent>, cursor: Option<Cursor>) -> Result<()> {
let mut lines = BufReader::new(f).lines();
+
if let Some(db_cursor) = cursor {
+
log::info!("jsonl fixture: skipping events before cursor {db_cursor:?}");
+
let mut bad_lines = 0;
+
let mut skipped = 0;
+
while let Some(line) = lines.next_line().await? {
+
let Ok(event) = serde_json::from_str::<JetstreamEvent>(&line) else {
+
bad_lines += 1;
+
continue;
+
};
+
if event.cursor < db_cursor {
+
skipped += 1;
+
continue;
+
}
+
if event.cursor == db_cursor {
+
log::info!("jsonl fixture: found existing db cursor! skipped {skipped} old events and failed parsing {bad_lines} lines");
+
break;
+
}
+
anyhow::bail!("jsonl fixture: did not find existing db cursor, found event cursor {:?} which is newer. bailing.", event.cursor);
+
}
+
} else {
+
log::info!("jsonl fixture: no cursor provided, sending every event");
+
}
+
+
log::info!("jsonl fixture: now sending events");
while let Some(line) = lines.next_line().await? {
match serde_json::from_str::<JetstreamEvent>(&line) {
Ok(event) => match sender.send(event).await {
···
pub async fn consume(
p: PathBuf,
sketch_secret: SketchSecretPrefix,
+
cursor: Option<Cursor>,
) -> Result<Receiver<LimitedBatch>> {
let f = File::open(p).await?;
let (jsonl_sender, jsonl_receiver) = channel::<JetstreamEvent>(16);
let (batch_sender, batch_reciever) = channel::<LimitedBatch>(BATCH_QUEUE_SIZE);
let mut batcher = Batcher::new(jsonl_receiver, batch_sender, sketch_secret);
tokio::task::spawn(async move {
+
let r = read_jsonl(f, jsonl_sender, cursor).await;
+
log::warn!("read_jsonl finished: {r:?}");
});
tokio::task::spawn(async move {
let r = batcher.run().await;
+
log::warn!("batcher finished: {r:?}");
});
Ok(batch_reciever)
}
+4 -2
ufos/src/main.rs
···
let batches = if jetstream_fixture {
log::info!("starting with jestream file fixture: {jetstream:?}");
-
file_consumer::consume(jetstream.into(), sketch_secret).await?
} else {
log::info!(
"starting consumer with cursor: {cursor:?} from {:?} ago",
···
let mut last_at = std::time::SystemTime::now();
let mut last_cursor = None;
let mut last_rollup = None;
loop {
-
tokio::time::sleep(std::time::Duration::from_secs_f64(4.)).await;
match read_store.get_consumer_info().await {
Err(e) => log::warn!("failed to get jetstream consumer info: {e:?}"),
Ok(ConsumerInfo::Jetstream {
···
let batches = if jetstream_fixture {
log::info!("starting with jestream file fixture: {jetstream:?}");
+
file_consumer::consume(jetstream.into(), sketch_secret, cursor).await?
} else {
log::info!(
"starting consumer with cursor: {cursor:?} from {:?} ago",
···
let mut last_at = std::time::SystemTime::now();
let mut last_cursor = None;
let mut last_rollup = None;
+
let mut interval = tokio::time::interval(std::time::Duration::from_secs(4));
+
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
+
interval.tick().await;
match read_store.get_consumer_info().await {
Err(e) => log::warn!("failed to get jetstream consumer info: {e:?}"),
Ok(ConsumerInfo::Jetstream {
+7 -7
ufos/src/storage_fjall.rs
···
let mut ended_early = false;
let mut batch = self.keyspace.batch();
for (i, kv) in self.feeds.range(live_range).rev().enumerate() {
-
if !full_scan && i > 1_000_000 {
-
log::info!("stopping collection trim early: already scanned 1M elements");
ended_early = true;
break;
}
···
let mut dirty_nsids = HashSet::new();
let mut rollup =
-
tokio::time::interval(Duration::from_millis(if backfill { 1 } else { 81 }));
-
rollup.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let mut trim =
-
tokio::time::interval(Duration::from_millis(if backfill { 3_000 } else { 6_000 }));
trim.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
···
let (danglers, deleted) = self.0.trim_collection(collection, 512, false).inspect_err(|e| log::error!("trim error: {e:?}"))?;
total_danglers += danglers;
total_deleted += deleted;
-
if total_deleted > 1_000_000 {
-
log::info!("trim stopped early, more than 1M records already deleted.");
break;
}
}
···
let mut ended_early = false;
let mut batch = self.keyspace.batch();
for (i, kv) in self.feeds.range(live_range).rev().enumerate() {
+
if !full_scan && i > 10_000_000 {
+
log::info!("stopping collection trim early: already scanned 10M elements");
ended_early = true;
break;
}
···
let mut dirty_nsids = HashSet::new();
let mut rollup =
+
tokio::time::interval(Duration::from_micros(if backfill { 100 } else { 81_000 }));
+
rollup.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut trim =
+
tokio::time::interval(Duration::from_millis(if backfill { 500 } else { 6_000 }));
trim.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
···
let (danglers, deleted) = self.0.trim_collection(collection, 512, false).inspect_err(|e| log::error!("trim error: {e:?}"))?;
total_danglers += danglers;
total_deleted += deleted;
+
if total_deleted > 100_000_000 {
+
log::info!("trim stopped early, more than 100M records already deleted.");
break;
}
}