tracks lexicons and how many times they appeared on the jetstream
1use std::{ops::Deref, time::Duration, u64, usize}; 2 3use itertools::Itertools; 4use rclite::Arc; 5use smol_str::ToSmolStr; 6use tokio_util::sync::CancellationToken; 7use tracing::Level; 8use tracing_subscriber::EnvFilter; 9 10use crate::{ 11 api::serve, 12 db::{Db, DbConfig, EventRecord}, 13 error::AppError, 14 jetstream::JetstreamClient, 15 utils::{CLOCK, RelativeDateTime, get_time}, 16}; 17 18mod api; 19mod db; 20mod error; 21mod jetstream; 22mod utils; 23 24#[cfg(not(target_env = "msvc"))] 25#[global_allocator] 26static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; 27 28#[tokio::main] 29async fn main() { 30 tracing_subscriber::fmt::fmt() 31 .with_env_filter( 32 EnvFilter::builder() 33 .with_default_directive(Level::INFO.into()) 34 .from_env_lossy(), 35 ) 36 .compact() 37 .init(); 38 39 match std::env::args().nth(1).as_deref() { 40 Some("compact") => { 41 compact(); 42 return; 43 } 44 Some("migrate") => { 45 migrate(); 46 return; 47 } 48 Some("debug") => { 49 debug(); 50 return; 51 } 52 Some("print") => { 53 print_all(); 54 return; 55 } 56 Some(x) => { 57 tracing::error!("unknown command: {}", x); 58 return; 59 } 60 None => {} 61 } 62 63 let cancel_token = CancellationToken::new(); 64 65 let db = Arc::new( 66 Db::new(DbConfig::default(), cancel_token.child_token()).expect("couldnt create db"), 67 ); 68 69 rustls::crypto::ring::default_provider() 70 .install_default() 71 .expect("cant install rustls crypto provider"); 72 73 let mut jetstream = 74 match JetstreamClient::new("wss://jetstream2.us-west.bsky.network/subscribe") { 75 Ok(client) => client, 76 Err(err) => { 77 tracing::error!("can't create jetstream client: {err}"); 78 return; 79 } 80 }; 81 82 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(1000); 83 let consume_events = tokio::spawn({ 84 let consume_cancel = cancel_token.child_token(); 85 async move { 86 jetstream.connect().await?; 87 loop { 88 tokio::select! { 89 maybe_event = jetstream.read(consume_cancel.child_token()) => match maybe_event { 90 Ok(event) => { 91 let Some(record) = EventRecord::from_jetstream(event) else { 92 continue; 93 }; 94 event_tx.send(record).await?; 95 } 96 Err(err) => return Err(err), 97 }, 98 _ = consume_cancel.cancelled() => break Ok(()), 99 } 100 } 101 } 102 }); 103 104 let ingest_events = std::thread::spawn({ 105 let db = db.clone(); 106 move || { 107 let mut buffer = Vec::new(); 108 loop { 109 let read = event_rx.blocking_recv_many(&mut buffer, 500); 110 if let Err(err) = db.ingest_events(buffer.drain(..)) { 111 tracing::error!("failed to ingest events: {}", err); 112 } 113 if read == 0 || db.is_shutting_down() { 114 break; 115 } 116 } 117 } 118 }); 119 120 let db_task = tokio::task::spawn({ 121 let db = db.clone(); 122 async move { 123 let sync_period = Duration::from_secs(10); 124 let mut sync_interval = tokio::time::interval(sync_period); 125 sync_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 126 127 let compact_period = std::time::Duration::from_secs(60 * 30); // 30 mins 128 let mut compact_interval = tokio::time::interval(compact_period); 129 compact_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 130 131 loop { 132 let sync_db = async || { 133 tokio::task::spawn_blocking({ 134 let db = db.clone(); 135 move || { 136 if db.is_shutting_down() { 137 return; 138 } 139 match db.sync(false) { 140 Ok(_) => (), 141 Err(e) => tracing::error!("failed to sync db: {}", e), 142 } 143 } 144 }) 145 .await 146 .unwrap(); 147 }; 148 let compact_db = async || { 149 tokio::task::spawn_blocking({ 150 let db = db.clone(); 151 move || { 152 if db.is_shutting_down() { 153 return; 154 } 155 let end = get_time(); 156 let start = end - compact_period; 157 let range = start.as_secs()..end.as_secs(); 158 tracing::info!( 159 { 160 start = %RelativeDateTime::from_now(start), 161 end = %RelativeDateTime::from_now(end), 162 }, 163 "running compaction...", 164 ); 165 match db.compact_all(db.cfg.max_block_size, range, false) { 166 Ok(_) => (), 167 Err(e) => tracing::error!("failed to compact db: {}", e), 168 } 169 } 170 }) 171 .await 172 .unwrap(); 173 }; 174 tokio::select! { 175 _ = sync_interval.tick() => sync_db().await, 176 _ = compact_interval.tick() => compact_db().await, 177 _ = db.shutting_down() => break, 178 } 179 } 180 } 181 }); 182 183 tokio::select! { 184 res = serve(db.clone(), cancel_token.child_token()) => { 185 if let Err(e) = res { 186 tracing::error!("serve failed: {}", e); 187 } 188 } 189 res = consume_events => { 190 let err = 191 res 192 .map_err(AppError::from) 193 .and_then(std::convert::identity) 194 .expect_err("consume events cant return ok"); 195 tracing::error!("consume events failed: {}", err); 196 }, 197 _ = tokio::signal::ctrl_c() => { 198 tracing::info!("received ctrl+c!"); 199 cancel_token.cancel(); 200 } 201 } 202 203 tracing::info!("shutting down..."); 204 cancel_token.cancel(); 205 ingest_events.join().expect("failed to join ingest events"); 206 db_task.await.expect("cant join db task"); 207 db.sync(true).expect("cant sync db"); 208} 209 210fn print_all() { 211 let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db"); 212 let nsids = db.get_nsids().collect::<Vec<_>>(); 213 let mut count = 0_usize; 214 for nsid in nsids { 215 println!("{}:", nsid.deref()); 216 for hit in db.get_hits(&nsid, .., usize::MAX) { 217 let hit = hit.expect("aaa"); 218 println!("{} {}", hit.timestamp, hit.deser().unwrap().deleted); 219 count += 1; 220 } 221 } 222 println!("total hits: {}", count); 223} 224 225fn debug() { 226 let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db"); 227 let info = db.info().expect("cant get db info"); 228 println!("disk size: {}", info.disk_size); 229 for (nsid, blocks) in info.nsids { 230 print!("{nsid}:"); 231 let mut last_size = 0; 232 let mut same_size_count = 0; 233 for item_count in blocks { 234 if item_count == last_size { 235 same_size_count += 1; 236 } else { 237 if same_size_count > 1 { 238 print!("x{}", same_size_count); 239 } 240 print!(" {item_count}"); 241 same_size_count = 0; 242 } 243 last_size = item_count; 244 } 245 print!("\n"); 246 } 247} 248 249fn compact() { 250 let db = Db::new( 251 DbConfig::default().ks(|c| { 252 c.max_journaling_size(u64::MAX) 253 .max_write_buffer_size(u64::MAX) 254 }), 255 CancellationToken::new(), 256 ) 257 .expect("couldnt create db"); 258 let info = db.info().expect("cant get db info"); 259 db.major_compact().expect("cant compact"); 260 std::thread::sleep(Duration::from_secs(5)); 261 let compacted_info = db.info().expect("cant get db info"); 262 println!( 263 "disk size: {} -> {}", 264 info.disk_size, compacted_info.disk_size 265 ); 266 for (nsid, blocks) in info.nsids { 267 println!( 268 "{nsid}: {} -> {}", 269 blocks.len(), 270 compacted_info.nsids[&nsid].len() 271 ) 272 } 273} 274 275fn migrate() { 276 let cancel_token = CancellationToken::new(); 277 let from = Arc::new( 278 Db::new( 279 DbConfig::default().path(".fjall_data_from"), 280 cancel_token.child_token(), 281 ) 282 .expect("couldnt create db"), 283 ); 284 let to = Arc::new( 285 Db::new( 286 DbConfig::default().path(".fjall_data_to").ks(|c| { 287 c.max_journaling_size(u64::MAX) 288 .max_write_buffer_size(u64::MAX) 289 .compaction_workers(rayon::current_num_threads() * 4) 290 .flush_workers(rayon::current_num_threads() * 4) 291 }), 292 cancel_token.child_token(), 293 ) 294 .expect("couldnt create db"), 295 ); 296 297 let nsids = from.get_nsids().collect::<Vec<_>>(); 298 let _eps_thread = std::thread::spawn({ 299 let to = to.clone(); 300 move || { 301 loop { 302 std::thread::sleep(Duration::from_secs(3)); 303 let eps = to.eps(); 304 if eps > 0 { 305 tracing::info!("{} rps", eps); 306 } 307 } 308 } 309 }); 310 let mut threads = Vec::with_capacity(nsids.len()); 311 let start = CLOCK.now(); 312 for nsid in nsids { 313 let from = from.clone(); 314 let to = to.clone(); 315 threads.push(std::thread::spawn(move || { 316 tracing::info!("{}: migrating...", nsid.deref()); 317 let mut count = 0_u64; 318 for hits in from 319 .get_hits(&nsid, .., usize::MAX) 320 .chunks(100000) 321 .into_iter() 322 { 323 to.ingest_events(hits.map(|hit| { 324 count += 1; 325 let hit = hit.expect("cant decode hit"); 326 EventRecord { 327 nsid: nsid.to_smolstr(), 328 timestamp: hit.timestamp, 329 deleted: hit.deser().unwrap().deleted, 330 } 331 })) 332 .expect("cant record event"); 333 } 334 tracing::info!("{}: ingested {} events...", nsid.deref(), count); 335 count 336 })); 337 } 338 let mut total_count = 0_u64; 339 for thread in threads { 340 let count = thread.join().expect("thread panicked"); 341 total_count += count; 342 } 343 let read_time = start.elapsed(); 344 let read_per_second = total_count as f64 / read_time.as_secs_f64(); 345 drop(from); 346 tracing::info!("starting sync!!!"); 347 to.sync(true).expect("cant sync"); 348 tracing::info!("persisting..."); 349 let total_time = start.elapsed(); 350 let write_per_second = total_count as f64 / (total_time - read_time).as_secs_f64(); 351 tracing::info!( 352 "migrated {total_count} events in {total_time:?} ({read_per_second:.2} rps, {write_per_second:.2} wps)" 353 ); 354}