tracks lexicons and how many times they appeared on the jetstream
1use std::{ops::Deref, time::Duration, u64, usize}; 2 3use itertools::Itertools; 4use rclite::Arc; 5use smol_str::ToSmolStr; 6use tokio_util::sync::CancellationToken; 7use tracing::Level; 8use tracing_subscriber::EnvFilter; 9 10use crate::{ 11 api::serve, 12 db::{Db, DbConfig, EventRecord}, 13 error::AppError, 14 jetstream::JetstreamClient, 15 utils::{CLOCK, RelativeDateTime, get_time}, 16}; 17 18mod api; 19mod db; 20mod error; 21mod jetstream; 22mod utils; 23 24#[cfg(not(target_env = "msvc"))] 25#[global_allocator] 26static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; 27 28#[cfg(target_env = "msvc")] 29#[global_allocator] 30static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; 31 32#[tokio::main] 33async fn main() { 34 tracing_subscriber::fmt::fmt() 35 .with_env_filter( 36 EnvFilter::builder() 37 .with_default_directive(Level::INFO.into()) 38 .from_env_lossy(), 39 ) 40 .compact() 41 .init(); 42 43 match std::env::args().nth(1).as_deref() { 44 Some("compact") => { 45 compact(); 46 return; 47 } 48 Some("migrate") => { 49 migrate(); 50 return; 51 } 52 Some("debug") => { 53 debug(); 54 return; 55 } 56 Some("print") => { 57 print_all(); 58 return; 59 } 60 Some(x) => { 61 tracing::error!("unknown command: {}", x); 62 return; 63 } 64 None => {} 65 } 66 67 let cancel_token = CancellationToken::new(); 68 69 let db = Arc::new( 70 Db::new(DbConfig::default(), cancel_token.child_token()).expect("couldnt create db"), 71 ); 72 73 rustls::crypto::ring::default_provider() 74 .install_default() 75 .expect("cant install rustls crypto provider"); 76 77 let mut jetstream = 78 match JetstreamClient::new("wss://jetstream2.us-west.bsky.network/subscribe") { 79 Ok(client) => client, 80 Err(err) => { 81 tracing::error!("can't create jetstream client: {err}"); 82 return; 83 } 84 }; 85 86 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(1000); 87 let consume_events = tokio::spawn({ 88 let consume_cancel = cancel_token.child_token(); 89 async move { 90 jetstream.connect().await?; 91 loop { 92 tokio::select! { 93 maybe_event = jetstream.read(consume_cancel.child_token()) => match maybe_event { 94 Ok(event) => { 95 let Some(record) = EventRecord::from_jetstream(event) else { 96 continue; 97 }; 98 event_tx.send(record).await?; 99 } 100 Err(err) => return Err(err), 101 }, 102 _ = consume_cancel.cancelled() => break Ok(()), 103 } 104 } 105 } 106 }); 107 108 let ingest_events = std::thread::spawn({ 109 let db = db.clone(); 110 move || { 111 let mut buffer = Vec::new(); 112 loop { 113 let read = event_rx.blocking_recv_many(&mut buffer, 100); 114 if let Err(err) = db.ingest_events(buffer.drain(..)) { 115 tracing::error!("failed to ingest events: {}", err); 116 } 117 if read == 0 || db.is_shutting_down() { 118 break; 119 } 120 } 121 } 122 }); 123 124 let db_task = tokio::task::spawn({ 125 let db = db.clone(); 126 async move { 127 let sync_period = Duration::from_secs(10); 128 let mut sync_interval = tokio::time::interval(sync_period); 129 sync_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 130 131 let compact_period = std::time::Duration::from_secs(60 * 30); // 30 mins 132 let mut compact_interval = tokio::time::interval(compact_period); 133 compact_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 134 135 loop { 136 let sync_db = async || { 137 tokio::task::spawn_blocking({ 138 let db = db.clone(); 139 move || { 140 if db.is_shutting_down() { 141 return; 142 } 143 match db.sync(false) { 144 Ok(_) => (), 145 Err(e) => tracing::error!("failed to sync db: {}", e), 146 } 147 } 148 }) 149 .await 150 .unwrap(); 151 }; 152 let compact_db = async || { 153 tokio::task::spawn_blocking({ 154 let db = db.clone(); 155 move || { 156 if db.is_shutting_down() { 157 return; 158 } 159 let end = get_time(); 160 let start = end - compact_period; 161 let range = start.as_secs()..end.as_secs(); 162 tracing::info!( 163 { 164 start = %RelativeDateTime::from_now(start), 165 end = %RelativeDateTime::from_now(end), 166 }, 167 "running compaction...", 168 ); 169 match db.compact_all(db.cfg.max_block_size, range, false) { 170 Ok(_) => (), 171 Err(e) => tracing::error!("failed to compact db: {}", e), 172 } 173 } 174 }) 175 .await 176 .unwrap(); 177 }; 178 tokio::select! { 179 _ = sync_interval.tick() => sync_db().await, 180 _ = compact_interval.tick() => compact_db().await, 181 _ = db.shutting_down() => break, 182 } 183 } 184 } 185 }); 186 187 tokio::select! { 188 res = serve(db.clone(), cancel_token.child_token()) => { 189 if let Err(e) = res { 190 tracing::error!("serve failed: {}", e); 191 } 192 } 193 res = consume_events => { 194 let err = 195 res 196 .map_err(AppError::from) 197 .and_then(std::convert::identity) 198 .expect_err("consume events cant return ok"); 199 tracing::error!("consume events failed: {}", err); 200 }, 201 _ = tokio::signal::ctrl_c() => { 202 tracing::info!("received ctrl+c!"); 203 cancel_token.cancel(); 204 } 205 } 206 207 tracing::info!("shutting down..."); 208 cancel_token.cancel(); 209 ingest_events.join().expect("failed to join ingest events"); 210 db_task.await.expect("cant join db task"); 211 db.sync(true).expect("cant sync db"); 212} 213 214fn print_all() { 215 let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db"); 216 let nsids = db.get_nsids().collect::<Vec<_>>(); 217 let mut count = 0_usize; 218 for nsid in nsids { 219 println!("{}:", nsid.deref()); 220 for hit in db.get_hits(&nsid, .., usize::MAX) { 221 let hit = hit.expect("aaa"); 222 println!("{} {}", hit.timestamp, hit.deser().unwrap().deleted); 223 count += 1; 224 } 225 } 226 println!("total hits: {}", count); 227} 228 229fn debug() { 230 let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db"); 231 let info = db.info().expect("cant get db info"); 232 println!("disk size: {}", info.disk_size); 233 for (nsid, blocks) in info.nsids { 234 print!("{nsid}:"); 235 let mut last_size = 0; 236 let mut same_size_count = 0; 237 for item_count in blocks { 238 if item_count == last_size { 239 same_size_count += 1; 240 } else { 241 if same_size_count > 1 { 242 print!("x{}", same_size_count); 243 } 244 print!(" {item_count}"); 245 same_size_count = 0; 246 } 247 last_size = item_count; 248 } 249 print!("\n"); 250 } 251} 252 253fn compact() { 254 let db = Db::new( 255 DbConfig::default().ks(|c| { 256 c.max_journaling_size(u64::MAX) 257 .max_write_buffer_size(u64::MAX) 258 }), 259 CancellationToken::new(), 260 ) 261 .expect("couldnt create db"); 262 let info = db.info().expect("cant get db info"); 263 db.major_compact().expect("cant compact"); 264 std::thread::sleep(Duration::from_secs(5)); 265 let compacted_info = db.info().expect("cant get db info"); 266 println!( 267 "disk size: {} -> {}", 268 info.disk_size, compacted_info.disk_size 269 ); 270 for (nsid, blocks) in info.nsids { 271 println!( 272 "{nsid}: {} -> {}", 273 blocks.len(), 274 compacted_info.nsids[&nsid].len() 275 ) 276 } 277} 278 279fn migrate() { 280 let cancel_token = CancellationToken::new(); 281 let from = Arc::new( 282 Db::new( 283 DbConfig::default().path(".fjall_data_from"), 284 cancel_token.child_token(), 285 ) 286 .expect("couldnt create db"), 287 ); 288 let to = Arc::new( 289 Db::new( 290 DbConfig::default().path(".fjall_data_to").ks(|c| { 291 c.max_journaling_size(u64::MAX) 292 .max_write_buffer_size(u64::MAX) 293 .compaction_workers(rayon::current_num_threads() * 4) 294 .flush_workers(rayon::current_num_threads() * 4) 295 }), 296 cancel_token.child_token(), 297 ) 298 .expect("couldnt create db"), 299 ); 300 301 let nsids = from.get_nsids().collect::<Vec<_>>(); 302 let _eps_thread = std::thread::spawn({ 303 let to = to.clone(); 304 move || { 305 loop { 306 std::thread::sleep(Duration::from_secs(3)); 307 let eps = to.eps(); 308 if eps > 0 { 309 tracing::info!("{} rps", eps); 310 } 311 } 312 } 313 }); 314 let mut threads = Vec::with_capacity(nsids.len()); 315 let start = CLOCK.now(); 316 for nsid in nsids { 317 let from = from.clone(); 318 let to = to.clone(); 319 threads.push(std::thread::spawn(move || { 320 tracing::info!("{}: migrating...", nsid.deref()); 321 let mut count = 0_u64; 322 for hits in from 323 .get_hits(&nsid, .., usize::MAX) 324 .chunks(100000) 325 .into_iter() 326 { 327 to.ingest_events(hits.map(|hit| { 328 count += 1; 329 let hit = hit.expect("cant decode hit"); 330 let mut timestamp = hit.timestamp; 331 // check if timestamp is microseconds, convert to seconds if it is 332 if timestamp > 10_000_000_000 { 333 timestamp /= 1_000_000; 334 } 335 EventRecord { 336 nsid: nsid.to_smolstr(), 337 timestamp, 338 deleted: hit.deser().unwrap().deleted, 339 } 340 })) 341 .expect("cant record event"); 342 } 343 tracing::info!("{}: ingested {} events...", nsid.deref(), count); 344 count 345 })); 346 } 347 let mut total_count = 0_u64; 348 for thread in threads { 349 let count = thread.join().expect("thread panicked"); 350 total_count += count; 351 } 352 let read_time = start.elapsed(); 353 let read_per_second = total_count as f64 / read_time.as_secs_f64(); 354 drop(from); 355 tracing::info!("starting sync!!!"); 356 to.sync(true).expect("cant sync"); 357 tracing::info!("persisting..."); 358 let total_time = start.elapsed(); 359 let write_per_second = total_count as f64 / (total_time - read_time).as_secs_f64(); 360 tracing::info!( 361 "migrated {total_count} events in {total_time:?} ({read_per_second:.2} rps, {write_per_second:.2} wps)" 362 ); 363}