tracks lexicons and how many times they appeared on the jetstream
1use std::{ops::Deref, sync::Arc};
2
3use smol_str::ToSmolStr;
4#[cfg(not(target_env = "msvc"))]
5use tikv_jemallocator::Jemalloc;
6use tokio_util::sync::CancellationToken;
7use tracing::Level;
8use tracing_subscriber::EnvFilter;
9
10use crate::{
11 api::serve,
12 db::{Db, EventRecord},
13 error::AppError,
14 jetstream::JetstreamClient,
15};
16
17mod api;
18mod db;
19mod error;
20mod jetstream;
21
22#[cfg(not(target_env = "msvc"))]
23#[global_allocator]
24static GLOBAL: Jemalloc = Jemalloc;
25
26#[tokio::main]
27async fn main() {
28 tracing_subscriber::fmt::fmt()
29 .with_env_filter(
30 EnvFilter::builder()
31 .with_default_directive(Level::INFO.into())
32 .from_env_lossy(),
33 )
34 .compact()
35 .init();
36
37 if std::env::args()
38 .nth(1)
39 .map_or(false, |arg| arg == "migrate")
40 {
41 migrate_to_miniz();
42 return;
43 }
44
45 let db = Arc::new(Db::new(".fjall_data").expect("couldnt create db"));
46
47 rustls::crypto::ring::default_provider()
48 .install_default()
49 .expect("cant install rustls crypto provider");
50
51 let mut jetstream =
52 match JetstreamClient::new("wss://jetstream2.us-west.bsky.network/subscribe") {
53 Ok(client) => client,
54 Err(err) => {
55 tracing::error!("can't create jetstream client: {err}");
56 return;
57 }
58 };
59
60 let cancel_token = CancellationToken::new();
61 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(1000);
62
63 let consume_events = tokio::spawn({
64 let consume_cancel = cancel_token.child_token();
65 async move {
66 jetstream.connect().await?;
67 loop {
68 tokio::select! {
69 maybe_event = jetstream.read(consume_cancel.child_token()) => match maybe_event {
70 Ok(event) => {
71 let Some(record) = EventRecord::from_jetstream(event) else {
72 continue;
73 };
74 let _ = event_tx.send(record).await;
75 }
76 Err(err) => return Err(err),
77 },
78 _ = consume_cancel.cancelled() => break Ok(()),
79 }
80 }
81 }
82 });
83
84 let ingest_events = std::thread::spawn({
85 let db = db.clone();
86 move || {
87 tracing::info!("starting ingest events thread...");
88 while let Some(e) = event_rx.blocking_recv() {
89 if let Err(e) = db.record_event(e) {
90 tracing::error!("failed to record event: {}", e);
91 }
92 }
93 }
94 });
95
96 tokio::select! {
97 res = serve(db, cancel_token.child_token()) => {
98 if let Err(e) = res {
99 tracing::error!("serve failed: {}", e);
100 }
101 }
102 res = consume_events => {
103 let err =
104 res
105 .map_err(AppError::from)
106 .and_then(std::convert::identity)
107 .expect_err("consume events cant return ok");
108 tracing::error!("consume events failed: {}", err);
109 },
110 _ = tokio::signal::ctrl_c() => {
111 tracing::info!("received ctrl+c!");
112 cancel_token.cancel();
113 }
114 }
115
116 tracing::info!("shutting down...");
117 ingest_events
118 .join()
119 .expect("failed to join ingest events thread");
120}
121
122fn migrate_to_miniz() {
123 let from = Db::new(".fjall_data").expect("couldnt create db");
124 let to = Db::new(".fjall_data_miniz").expect("couldnt create db");
125
126 let mut total_count = 0_u64;
127 for nsid in from.get_nsids() {
128 tracing::info!("migrating {} ...", nsid.deref());
129 for hit in from.get_hits(&nsid, ..).expect("cant read hits") {
130 let (timestamp, data) = hit.expect("cant read event");
131 to.record_event(EventRecord {
132 nsid: nsid.to_smolstr(),
133 timestamp,
134 deleted: data.deleted,
135 })
136 .expect("cant record event");
137 total_count += 1;
138 }
139 }
140
141 tracing::info!("migrated {total_count} events!");
142}