tracks lexicons and how many times they appeared on the jetstream
1use std::{ops::Deref, sync::Arc}; 2 3use smol_str::ToSmolStr; 4#[cfg(not(target_env = "msvc"))] 5use tikv_jemallocator::Jemalloc; 6use tokio_util::sync::CancellationToken; 7use tracing::Level; 8use tracing_subscriber::EnvFilter; 9 10use crate::{ 11 api::serve, 12 db::{Db, EventRecord}, 13 error::AppError, 14 jetstream::JetstreamClient, 15}; 16 17mod api; 18mod db; 19mod error; 20mod jetstream; 21 22#[cfg(not(target_env = "msvc"))] 23#[global_allocator] 24static GLOBAL: Jemalloc = Jemalloc; 25 26#[tokio::main] 27async fn main() { 28 tracing_subscriber::fmt::fmt() 29 .with_env_filter( 30 EnvFilter::builder() 31 .with_default_directive(Level::INFO.into()) 32 .from_env_lossy(), 33 ) 34 .compact() 35 .init(); 36 37 if std::env::args() 38 .nth(1) 39 .map_or(false, |arg| arg == "migrate") 40 { 41 migrate_to_miniz(); 42 return; 43 } 44 45 let db = Arc::new(Db::new(".fjall_data").expect("couldnt create db")); 46 47 rustls::crypto::ring::default_provider() 48 .install_default() 49 .expect("cant install rustls crypto provider"); 50 51 let mut jetstream = 52 match JetstreamClient::new("wss://jetstream2.us-west.bsky.network/subscribe") { 53 Ok(client) => client, 54 Err(err) => { 55 tracing::error!("can't create jetstream client: {err}"); 56 return; 57 } 58 }; 59 60 let cancel_token = CancellationToken::new(); 61 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(1000); 62 63 let consume_events = tokio::spawn({ 64 let consume_cancel = cancel_token.child_token(); 65 async move { 66 jetstream.connect().await?; 67 loop { 68 tokio::select! { 69 maybe_event = jetstream.read(consume_cancel.child_token()) => match maybe_event { 70 Ok(event) => { 71 let Some(record) = EventRecord::from_jetstream(event) else { 72 continue; 73 }; 74 let _ = event_tx.send(record).await; 75 } 76 Err(err) => return Err(err), 77 }, 78 _ = consume_cancel.cancelled() => break Ok(()), 79 } 80 } 81 } 82 }); 83 84 let ingest_events = std::thread::spawn({ 85 let db = db.clone(); 86 move || { 87 tracing::info!("starting ingest events thread..."); 88 while let Some(e) = event_rx.blocking_recv() { 89 if let Err(e) = db.record_event(e) { 90 tracing::error!("failed to record event: {}", e); 91 } 92 } 93 } 94 }); 95 96 tokio::select! { 97 res = serve(db, cancel_token.child_token()) => { 98 if let Err(e) = res { 99 tracing::error!("serve failed: {}", e); 100 } 101 } 102 res = consume_events => { 103 let err = 104 res 105 .map_err(AppError::from) 106 .and_then(std::convert::identity) 107 .expect_err("consume events cant return ok"); 108 tracing::error!("consume events failed: {}", err); 109 }, 110 _ = tokio::signal::ctrl_c() => { 111 tracing::info!("received ctrl+c!"); 112 cancel_token.cancel(); 113 } 114 } 115 116 tracing::info!("shutting down..."); 117 ingest_events 118 .join() 119 .expect("failed to join ingest events thread"); 120} 121 122fn migrate_to_miniz() { 123 let from = Db::new(".fjall_data").expect("couldnt create db"); 124 let to = Db::new(".fjall_data_miniz").expect("couldnt create db"); 125 126 let mut total_count = 0_u64; 127 for nsid in from.get_nsids() { 128 tracing::info!("migrating {} ...", nsid.deref()); 129 for hit in from.get_hits(&nsid, ..).expect("cant read hits") { 130 let (timestamp, data) = hit.expect("cant read event"); 131 to.record_event(EventRecord { 132 nsid: nsid.to_smolstr(), 133 timestamp, 134 deleted: data.deleted, 135 }) 136 .expect("cant record event"); 137 total_count += 1; 138 } 139 } 140 141 tracing::info!("migrated {total_count} events!"); 142}