tracks lexicons and how many times they appeared on the jetstream
1use std::{ops::Deref, time::Duration, u64, usize}; 2 3use itertools::Itertools; 4use rclite::Arc; 5use smol_str::ToSmolStr; 6use tokio_util::sync::CancellationToken; 7use tracing::Level; 8use tracing_subscriber::EnvFilter; 9 10use crate::{ 11 api::serve, 12 db::{Db, DbConfig, EventRecord}, 13 error::AppError, 14 jetstream::JetstreamClient, 15 utils::{CLOCK, RelativeDateTime, get_time}, 16}; 17 18mod api; 19mod db; 20mod error; 21mod jetstream; 22mod utils; 23 24#[global_allocator] 25static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; 26 27#[tokio::main] 28async fn main() { 29 tracing_subscriber::fmt::fmt() 30 .with_env_filter( 31 EnvFilter::builder() 32 .with_default_directive(Level::INFO.into()) 33 .from_env_lossy(), 34 ) 35 .compact() 36 .init(); 37 38 match std::env::args().nth(1).as_deref() { 39 Some("compact") => { 40 compact(); 41 return; 42 } 43 Some("migrate") => { 44 migrate(); 45 return; 46 } 47 Some("debug") => { 48 debug(); 49 return; 50 } 51 Some("print") => { 52 print_all(); 53 return; 54 } 55 Some(x) => { 56 tracing::error!("unknown command: {}", x); 57 return; 58 } 59 None => {} 60 } 61 62 let cancel_token = CancellationToken::new(); 63 64 let db = Arc::new( 65 Db::new(DbConfig::default(), cancel_token.child_token()).expect("couldnt create db"), 66 ); 67 68 rustls::crypto::ring::default_provider() 69 .install_default() 70 .expect("cant install rustls crypto provider"); 71 72 let mut jetstream = 73 match JetstreamClient::new("wss://jetstream2.us-west.bsky.network/subscribe") { 74 Ok(client) => client, 75 Err(err) => { 76 tracing::error!("can't create jetstream client: {err}"); 77 return; 78 } 79 }; 80 81 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(1000); 82 let consume_events = tokio::spawn({ 83 let consume_cancel = cancel_token.child_token(); 84 async move { 85 jetstream.connect().await?; 86 loop { 87 tokio::select! { 88 maybe_event = jetstream.read(consume_cancel.child_token()) => match maybe_event { 89 Ok(event) => { 90 let Some(record) = EventRecord::from_jetstream(event) else { 91 continue; 92 }; 93 event_tx.send(record).await?; 94 } 95 Err(err) => return Err(err), 96 }, 97 _ = consume_cancel.cancelled() => break Ok(()), 98 } 99 } 100 } 101 }); 102 103 let ingest_events = std::thread::spawn({ 104 let db = db.clone(); 105 move || { 106 let mut buffer = Vec::new(); 107 loop { 108 let read = event_rx.blocking_recv_many(&mut buffer, 100); 109 if let Err(err) = db.ingest_events(buffer.drain(..)) { 110 tracing::error!("failed to ingest events: {}", err); 111 } 112 if read == 0 || db.is_shutting_down() { 113 break; 114 } 115 } 116 } 117 }); 118 119 let db_task = tokio::task::spawn({ 120 let db = db.clone(); 121 async move { 122 let sync_period = Duration::from_secs(10); 123 let mut sync_interval = tokio::time::interval(sync_period); 124 sync_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 125 126 let compact_period = std::time::Duration::from_secs(60 * 30); // 30 mins 127 let mut compact_interval = tokio::time::interval(compact_period); 128 compact_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 129 130 loop { 131 let sync_db = async || { 132 tokio::task::spawn_blocking({ 133 let db = db.clone(); 134 move || { 135 if db.is_shutting_down() { 136 return; 137 } 138 match db.sync(false) { 139 Ok(_) => (), 140 Err(e) => tracing::error!("failed to sync db: {}", e), 141 } 142 } 143 }) 144 .await 145 .unwrap(); 146 }; 147 let compact_db = async || { 148 tokio::task::spawn_blocking({ 149 let db = db.clone(); 150 move || { 151 if db.is_shutting_down() { 152 return; 153 } 154 let end = get_time(); 155 let start = end - compact_period; 156 let range = start.as_secs()..end.as_secs(); 157 tracing::info!( 158 { 159 start = %RelativeDateTime::from_now(start), 160 end = %RelativeDateTime::from_now(end), 161 }, 162 "running compaction...", 163 ); 164 match db.compact_all(db.cfg.max_block_size, range, false) { 165 Ok(_) => (), 166 Err(e) => tracing::error!("failed to compact db: {}", e), 167 } 168 } 169 }) 170 .await 171 .unwrap(); 172 }; 173 tokio::select! { 174 _ = sync_interval.tick() => sync_db().await, 175 _ = compact_interval.tick() => compact_db().await, 176 _ = db.shutting_down() => break, 177 } 178 } 179 } 180 }); 181 182 tokio::select! { 183 res = serve(db.clone(), cancel_token.child_token()) => { 184 if let Err(e) = res { 185 tracing::error!("serve failed: {}", e); 186 } 187 } 188 res = consume_events => { 189 let err = 190 res 191 .map_err(AppError::from) 192 .and_then(std::convert::identity) 193 .expect_err("consume events cant return ok"); 194 tracing::error!("consume events failed: {}", err); 195 }, 196 _ = tokio::signal::ctrl_c() => { 197 tracing::info!("received ctrl+c!"); 198 cancel_token.cancel(); 199 } 200 } 201 202 tracing::info!("shutting down..."); 203 cancel_token.cancel(); 204 ingest_events.join().expect("failed to join ingest events"); 205 db_task.await.expect("cant join db task"); 206 db.sync(true).expect("cant sync db"); 207} 208 209fn print_all() { 210 let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db"); 211 let nsids = db.get_nsids().collect::<Vec<_>>(); 212 let mut count = 0_usize; 213 for nsid in nsids { 214 println!("{}:", nsid.deref()); 215 for hit in db.get_hits(&nsid, .., usize::MAX) { 216 let hit = hit.expect("aaa"); 217 println!("{} {}", hit.timestamp, hit.deser().unwrap().deleted); 218 count += 1; 219 } 220 } 221 println!("total hits: {}", count); 222} 223 224fn debug() { 225 let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db"); 226 let info = db.info().expect("cant get db info"); 227 println!("disk size: {}", info.disk_size); 228 for (nsid, blocks) in info.nsids { 229 print!("{nsid}:"); 230 let mut last_size = 0; 231 let mut same_size_count = 0; 232 for item_count in blocks { 233 if item_count == last_size { 234 same_size_count += 1; 235 } else { 236 if same_size_count > 1 { 237 print!("x{}", same_size_count); 238 } 239 print!(" {item_count}"); 240 same_size_count = 0; 241 } 242 last_size = item_count; 243 } 244 print!("\n"); 245 } 246} 247 248fn compact() { 249 let db = Db::new( 250 DbConfig::default().ks(|c| { 251 c.max_journaling_size(u64::MAX) 252 .max_write_buffer_size(u64::MAX) 253 }), 254 CancellationToken::new(), 255 ) 256 .expect("couldnt create db"); 257 let info = db.info().expect("cant get db info"); 258 db.major_compact().expect("cant compact"); 259 std::thread::sleep(Duration::from_secs(5)); 260 let compacted_info = db.info().expect("cant get db info"); 261 println!( 262 "disk size: {} -> {}", 263 info.disk_size, compacted_info.disk_size 264 ); 265 for (nsid, blocks) in info.nsids { 266 println!( 267 "{nsid}: {} -> {}", 268 blocks.len(), 269 compacted_info.nsids[&nsid].len() 270 ) 271 } 272} 273 274fn migrate() { 275 let cancel_token = CancellationToken::new(); 276 let from = Arc::new( 277 Db::new( 278 DbConfig::default().path(".fjall_data_from"), 279 cancel_token.child_token(), 280 ) 281 .expect("couldnt create db"), 282 ); 283 let to = Arc::new( 284 Db::new( 285 DbConfig::default().path(".fjall_data_to").ks(|c| { 286 c.max_journaling_size(u64::MAX) 287 .max_write_buffer_size(u64::MAX) 288 .compaction_workers(rayon::current_num_threads() * 4) 289 .flush_workers(rayon::current_num_threads() * 4) 290 }), 291 cancel_token.child_token(), 292 ) 293 .expect("couldnt create db"), 294 ); 295 296 let nsids = from.get_nsids().collect::<Vec<_>>(); 297 let _eps_thread = std::thread::spawn({ 298 let to = to.clone(); 299 move || { 300 loop { 301 std::thread::sleep(Duration::from_secs(3)); 302 let eps = to.eps(); 303 if eps > 0 { 304 tracing::info!("{} rps", eps); 305 } 306 } 307 } 308 }); 309 let mut threads = Vec::with_capacity(nsids.len()); 310 let start = CLOCK.now(); 311 for nsid in nsids { 312 let from = from.clone(); 313 let to = to.clone(); 314 threads.push(std::thread::spawn(move || { 315 tracing::info!("{}: migrating...", nsid.deref()); 316 let mut count = 0_u64; 317 for hits in from 318 .get_hits(&nsid, .., usize::MAX) 319 .chunks(100000) 320 .into_iter() 321 { 322 to.ingest_events(hits.map(|hit| { 323 count += 1; 324 let hit = hit.expect("cant decode hit"); 325 EventRecord { 326 nsid: nsid.to_smolstr(), 327 timestamp: hit.timestamp, 328 deleted: hit.deser().unwrap().deleted, 329 } 330 })) 331 .expect("cant record event"); 332 } 333 tracing::info!("{}: ingested {} events...", nsid.deref(), count); 334 count 335 })); 336 } 337 let mut total_count = 0_u64; 338 for thread in threads { 339 let count = thread.join().expect("thread panicked"); 340 total_count += count; 341 } 342 let read_time = start.elapsed(); 343 let read_per_second = total_count as f64 / read_time.as_secs_f64(); 344 drop(from); 345 tracing::info!("starting sync!!!"); 346 to.sync(true).expect("cant sync"); 347 tracing::info!("persisting..."); 348 let total_time = start.elapsed(); 349 let write_per_second = total_count as f64 / (total_time - read_time).as_secs_f64(); 350 tracing::info!( 351 "migrated {total_count} events in {total_time:?} ({read_per_second:.2} rps, {write_per_second:.2} wps)" 352 ); 353}