tracks lexicons and how many times they appeared on the jetstream

feat(server): enable jetstream compression and use just a single thread to ingest the events

ptr.pet 85f997c4 9bab265d

verified
Changed files
+87 -34
server
+1
.gitignore
···
events.sqlite-shm
result
+
server/bsky_zstd_dictionary
server/src/bsky_zstd_dictionary

This is a binary file and will not be displayed.

+40 -5
server/src/db.rs
···
+
use atproto_jetstream::JetstreamEvent;
use fjall::{Config, Keyspace, Partition, PartitionCreateOptions};
use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
use smol_str::SmolStr;
···
pub deleted: bool,
}
+
pub struct EventRecord {
+
nsid: SmolStr,
+
timestamp: u64,
+
deleted: bool,
+
}
+
+
impl EventRecord {
+
pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> {
+
match event {
+
JetstreamEvent::Commit {
+
time_us, commit, ..
+
} => Some(Self {
+
nsid: commit.collection.into(),
+
timestamp: time_us,
+
deleted: false,
+
}),
+
JetstreamEvent::Delete {
+
time_us, commit, ..
+
} => Some(Self {
+
nsid: commit.collection.into(),
+
timestamp: time_us,
+
deleted: true,
+
}),
+
_ => None,
+
}
+
}
+
}
+
// counts is nsid -> NsidCounts
// hits is tree per nsid: timestamp -> NsidHit
pub struct Db {
···
}))
}
-
pub fn record_event(&self, nsid: &str, timestamp: u64, deleted: bool) -> AppResult<()> {
-
self.insert_event(nsid, timestamp, deleted)?;
+
pub fn record_event(&self, e: EventRecord) -> AppResult<()> {
+
let EventRecord {
+
nsid,
+
timestamp,
+
deleted,
+
} = e;
+
+
self.insert_event(&nsid, timestamp, deleted)?;
// increment count
-
let mut counts = self.get_count(nsid)?;
+
let mut counts = self.get_count(&nsid)?;
counts.last_seen = timestamp;
if deleted {
counts.deleted_count += 1;
} else {
counts.count += 1;
}
-
self.insert_count(nsid, counts.clone())?;
+
self.insert_count(&nsid, counts.clone())?;
if self.event_broadcaster.receiver_count() > 0 {
-
let _ = self.event_broadcaster.send((SmolStr::new(nsid), counts));
+
let _ = self.event_broadcaster.send((SmolStr::new(&nsid), counts));
}
Ok(())
}
+46 -29
server/src/main.rs
···
use std::sync::Arc;
use atproto_jetstream::{CancellationToken, Consumer, EventHandler, JetstreamEvent};
+
use tokio::sync::mpsc::{Receiver, Sender};
-
use crate::{api::serve, db::Db};
+
use crate::{
+
api::serve,
+
db::{Db, EventRecord},
+
};
mod api;
mod db;
mod error;
+
+
const BSKY_ZSTD_DICT: &[u8] = include_bytes!("./bsky_zstd_dictionary");
struct JetstreamHandler {
-
db: Arc<Db>,
+
tx: Sender<EventRecord>,
+
}
+
+
impl JetstreamHandler {
+
fn new() -> (Self, Receiver<EventRecord>) {
+
let (tx, rx) = tokio::sync::mpsc::channel(1000);
+
(Self { tx }, rx)
+
}
}
#[async_trait::async_trait]
impl EventHandler for JetstreamHandler {
async fn handle_event(&self, event: JetstreamEvent) -> anyhow::Result<()> {
-
let db = self.db.clone();
-
tokio::task::spawn_blocking(move || {
-
let result = match event {
-
JetstreamEvent::Commit {
-
time_us, commit, ..
-
} => db.record_event(&commit.collection, time_us, false),
-
JetstreamEvent::Delete {
-
time_us, commit, ..
-
} => db.record_event(&commit.collection, time_us, true),
-
_ => Ok(()),
-
};
-
if let Err(err) = result {
-
tracing::error!("couldn't record event: {err}");
-
}
-
});
+
if let Some(e) = EventRecord::from_jetstream(event) {
+
self.tx.send(e).await?;
+
}
Ok(())
}
···
#[tokio::main]
async fn main() {
-
tracing_subscriber::fmt::init();
+
tracing_subscriber::fmt::fmt().compact().init();
let db = Arc::new(Db::new().expect("couldnt create db"));
-
let consumer = Consumer::new(atproto_jetstream::ConsumerTaskConfig {
-
compression: false,
+
tokio::fs::write("./bsky_zstd_dictionary", BSKY_ZSTD_DICT)
+
.await
+
.expect("could not write bsky zstd dict");
+
+
let jetstream = Consumer::new(atproto_jetstream::ConsumerTaskConfig {
+
compression: true,
jetstream_hostname: "jetstream2.us-west.bsky.network".into(),
collections: Vec::new(),
dids: Vec::new(),
max_message_size_bytes: None,
cursor: None,
require_hello: true,
-
zstd_dictionary_location: String::new(),
+
zstd_dictionary_location: "./bsky_zstd_dictionary".into(),
user_agent: "nsid-tracker/0.0.1".into(),
});
-
tracing::info!("running jetstream consumer...");
+
let (event_handler, mut event_rx) = JetstreamHandler::new();
+
let cancel_token = CancellationToken::new();
-
tokio::spawn({
+
tokio::spawn(async move {
+
jetstream
+
.register_handler(Arc::new(event_handler))
+
.await
+
.expect("cant register handler");
+
jetstream
+
.run_background(cancel_token.clone())
+
.await
+
.expect("cant run jetstream");
+
});
+
+
std::thread::spawn({
let db = db.clone();
-
async move {
-
consumer
-
.register_handler(Arc::new(JetstreamHandler { db }))
-
.await
-
.unwrap();
-
consumer.run_background(cancel_token.clone()).await.unwrap();
+
move || {
+
while let Some(e) = event_rx.blocking_recv() {
+
if let Err(e) = db.record_event(e) {
+
tracing::error!("failed to record event: {}", e);
+
}
+
}
}
});