tracks lexicons and how many times they appeared on the jetstream
1use std::{ops::Deref, sync::Arc}; 2 3use smol_str::ToSmolStr; 4#[cfg(not(target_env = "msvc"))] 5use tikv_jemallocator::Jemalloc; 6use tokio_util::sync::CancellationToken; 7use tracing::Level; 8use tracing_subscriber::EnvFilter; 9 10use crate::{ 11 api::serve, 12 db::{Db, EventRecord}, 13 error::AppError, 14 jetstream::JetstreamClient, 15}; 16 17mod api; 18mod db; 19mod error; 20mod jetstream; 21mod utils; 22 23#[cfg(not(target_env = "msvc"))] 24#[global_allocator] 25static GLOBAL: Jemalloc = Jemalloc; 26 27#[tokio::main] 28async fn main() { 29 tracing_subscriber::fmt::fmt() 30 .with_env_filter( 31 EnvFilter::builder() 32 .with_default_directive(Level::INFO.into()) 33 .from_env_lossy(), 34 ) 35 .compact() 36 .init(); 37 38 match std::env::args().nth(1).as_deref() { 39 Some("compact") => { 40 compact(); 41 return; 42 } 43 Some("debug") => { 44 debug(); 45 return; 46 } 47 Some(x) => { 48 tracing::error!("unknown command: {}", x); 49 return; 50 } 51 None => {} 52 } 53 54 let db = Arc::new(Db::new(".fjall_data").expect("couldnt create db")); 55 56 rustls::crypto::ring::default_provider() 57 .install_default() 58 .expect("cant install rustls crypto provider"); 59 60 let mut jetstream = 61 match JetstreamClient::new("wss://jetstream2.us-west.bsky.network/subscribe") { 62 Ok(client) => client, 63 Err(err) => { 64 tracing::error!("can't create jetstream client: {err}"); 65 return; 66 } 67 }; 68 69 let cancel_token = CancellationToken::new(); 70 71 let consume_events = tokio::spawn({ 72 let consume_cancel = cancel_token.child_token(); 73 let db = db.clone(); 74 async move { 75 jetstream.connect().await?; 76 loop { 77 tokio::select! { 78 maybe_event = jetstream.read(consume_cancel.child_token()) => match maybe_event { 79 Ok(event) => { 80 let Some(record) = EventRecord::from_jetstream(event) else { 81 continue; 82 }; 83 let db = db.clone(); 84 tokio::task::spawn_blocking(move || { 85 if let Err(err) = db.record_event(record) { 86 tracing::error!("failed to record event: {}", err); 87 } 88 }); 89 } 90 Err(err) => return Err(err), 91 }, 92 _ = consume_cancel.cancelled() => break Ok(()), 93 } 94 } 95 } 96 }); 97 98 std::thread::spawn({ 99 let db = db.clone(); 100 move || { 101 loop { 102 match db.sync(false) { 103 Ok(_) => (), 104 Err(e) => tracing::error!("failed to sync db: {}", e), 105 } 106 std::thread::sleep(std::time::Duration::from_secs(1)); 107 } 108 } 109 }); 110 111 tokio::select! { 112 res = serve(db.clone(), cancel_token.child_token()) => { 113 if let Err(e) = res { 114 tracing::error!("serve failed: {}", e); 115 } 116 } 117 res = consume_events => { 118 let err = 119 res 120 .map_err(AppError::from) 121 .and_then(std::convert::identity) 122 .expect_err("consume events cant return ok"); 123 tracing::error!("consume events failed: {}", err); 124 }, 125 _ = tokio::signal::ctrl_c() => { 126 tracing::info!("received ctrl+c!"); 127 cancel_token.cancel(); 128 } 129 } 130 131 tracing::info!("shutting down..."); 132 db.sync(true).expect("couldnt sync db"); 133} 134 135fn debug() { 136 let db = Db::new(".fjall_data").expect("couldnt create db"); 137 for nsid in db.get_nsids() { 138 let nsid = nsid.deref(); 139 for hit in db.get_hits(nsid, ..) { 140 let hit = hit.expect("cant read event"); 141 println!("{nsid} {}", hit.timestamp); 142 } 143 } 144} 145 146fn compact() { 147 let from = Arc::new(Db::new(".fjall_data_from").expect("couldnt create db")); 148 let to = Arc::new(Db::new(".fjall_data_to").expect("couldnt create db")); 149 150 let mut threads = Vec::new(); 151 for nsid in from.get_nsids() { 152 let from = from.clone(); 153 let to = to.clone(); 154 threads.push(std::thread::spawn(move || { 155 tracing::info!("migrating {} ...", nsid.deref()); 156 let mut count = 0_u64; 157 for hit in from.get_hits(&nsid, ..) { 158 let hit = hit.expect("cant read event"); 159 let data = hit.access(); 160 to.record_event(EventRecord { 161 nsid: nsid.to_smolstr(), 162 timestamp: hit.timestamp, 163 deleted: data.deleted, 164 }) 165 .expect("cant record event"); 166 count += 1; 167 } 168 count 169 })); 170 } 171 172 let mut total_count = 0_u64; 173 for thread in threads { 174 total_count += thread.join().expect("thread panicked"); 175 } 176 to.sync(true).expect("cant sync"); 177 tracing::info!("migrated {total_count} events!"); 178}