tracks lexicons and how many times they appeared on the jetstream
at main 12 kB view raw
1use std::{ops::Deref, time::Duration, u64, usize}; 2 3use itertools::Itertools; 4use rclite::Arc; 5use smol_str::ToSmolStr; 6use tokio_util::sync::CancellationToken; 7use tracing::Level; 8use tracing_subscriber::EnvFilter; 9 10use crate::{ 11 api::serve, 12 db::{Db, DbConfig, EventRecord}, 13 error::AppError, 14 jetstream::JetstreamClient, 15 utils::{CLOCK, RelativeDateTime, get_time}, 16}; 17 18mod api; 19mod db; 20mod error; 21mod jetstream; 22mod utils; 23 24#[cfg(not(target_env = "msvc"))] 25#[global_allocator] 26static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; 27 28#[tokio::main] 29async fn main() { 30 tracing_subscriber::fmt::fmt() 31 .with_env_filter( 32 EnvFilter::builder() 33 .with_default_directive(Level::INFO.into()) 34 .from_env_lossy(), 35 ) 36 .compact() 37 .init(); 38 39 match std::env::args().nth(1).as_deref() { 40 Some("compact") => { 41 compact(); 42 return; 43 } 44 Some("migrate") => { 45 migrate(); 46 return; 47 } 48 Some("debug") => { 49 debug(); 50 return; 51 } 52 Some("print") => { 53 print_all(); 54 return; 55 } 56 Some(x) => { 57 tracing::error!("unknown command: {}", x); 58 return; 59 } 60 None => {} 61 } 62 63 let cancel_token = CancellationToken::new(); 64 65 let db = Arc::new( 66 Db::new(DbConfig::default(), cancel_token.child_token()).expect("couldnt create db"), 67 ); 68 69 rustls::crypto::ring::default_provider() 70 .install_default() 71 .expect("cant install rustls crypto provider"); 72 73 let urls = [ 74 "wss://jetstream1.us-west.bsky.network/subscribe", 75 "wss://jetstream2.us-west.bsky.network/subscribe", 76 "wss://jetstream2.fr.hose.cam/subscribe", 77 "wss://jetstream.fire.hose.cam/subscribe", 78 ]; 79 let mut jetstream = match JetstreamClient::new(urls) { 80 Ok(client) => client, 81 Err(err) => { 82 tracing::error!("can't create jetstream client: {err}"); 83 return; 84 } 85 }; 86 87 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(1000); 88 let consume_events = tokio::spawn({ 89 let consume_cancel = cancel_token.child_token(); 90 async move { 91 jetstream.connect().await?; 92 loop { 93 tokio::select! { 94 maybe_event = jetstream.read(consume_cancel.child_token()) => match maybe_event { 95 Ok(event) => { 96 let Some(record) = EventRecord::from_jetstream(event) else { 97 continue; 98 }; 99 event_tx.send(record).await?; 100 } 101 Err(err) => return Err(err), 102 }, 103 _ = consume_cancel.cancelled() => break Ok(()), 104 } 105 } 106 } 107 }); 108 109 let ingest_events = std::thread::spawn({ 110 let db = db.clone(); 111 move || { 112 let mut buffer = Vec::new(); 113 loop { 114 let read = event_rx.blocking_recv_many(&mut buffer, 500); 115 if let Err(err) = db.ingest_events(buffer.drain(..)) { 116 tracing::error!("failed to ingest events: {}", err); 117 } 118 if read == 0 || db.is_shutting_down() { 119 break; 120 } 121 } 122 } 123 }); 124 125 let db_task = tokio::task::spawn({ 126 let db = db.clone(); 127 async move { 128 let sync_period = Duration::from_secs(10); 129 let mut sync_interval = tokio::time::interval(sync_period); 130 sync_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 131 132 let compact_period = std::time::Duration::from_secs(60 * 30); // 30 mins 133 let mut compact_interval = tokio::time::interval(compact_period); 134 compact_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 135 136 loop { 137 let sync_db = async || { 138 tokio::task::spawn_blocking({ 139 let db = db.clone(); 140 move || { 141 if db.is_shutting_down() { 142 return; 143 } 144 match db.sync(false) { 145 Ok(_) => (), 146 Err(e) => tracing::error!("failed to sync db: {}", e), 147 } 148 } 149 }) 150 .await 151 .unwrap(); 152 }; 153 let compact_db = async || { 154 tokio::task::spawn_blocking({ 155 let db = db.clone(); 156 move || { 157 if db.is_shutting_down() { 158 return; 159 } 160 let end = get_time(); 161 let start = end - compact_period; 162 let range = start.as_secs()..end.as_secs(); 163 tracing::info!( 164 { 165 start = %RelativeDateTime::from_now(start), 166 end = %RelativeDateTime::from_now(end), 167 }, 168 "running compaction...", 169 ); 170 match db.compact_all(db.cfg.max_block_size, range, false) { 171 Ok(_) => (), 172 Err(e) => tracing::error!("failed to compact db: {}", e), 173 } 174 } 175 }) 176 .await 177 .unwrap(); 178 }; 179 tokio::select! { 180 _ = sync_interval.tick() => sync_db().await, 181 _ = compact_interval.tick() => compact_db().await, 182 _ = db.shutting_down() => break, 183 } 184 } 185 } 186 }); 187 188 tokio::select! { 189 res = serve(db.clone(), cancel_token.child_token()) => { 190 if let Err(e) = res { 191 tracing::error!("serve failed: {}", e); 192 } 193 } 194 res = consume_events => { 195 let err = 196 res 197 .map_err(AppError::from) 198 .and_then(std::convert::identity) 199 .expect_err("consume events cant return ok"); 200 tracing::error!("consume events failed: {}", err); 201 }, 202 _ = tokio::signal::ctrl_c() => { 203 tracing::info!("received ctrl+c!"); 204 cancel_token.cancel(); 205 } 206 } 207 208 tracing::info!("shutting down..."); 209 cancel_token.cancel(); 210 ingest_events.join().expect("failed to join ingest events"); 211 db_task.await.expect("cant join db task"); 212 db.sync(true).expect("cant sync db"); 213} 214 215fn print_all() { 216 let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db"); 217 let nsids = db.get_nsids().collect::<Vec<_>>(); 218 let mut count = 0_usize; 219 for nsid in nsids { 220 println!("{}:", nsid.deref()); 221 for hit in db.get_hits(&nsid, .., usize::MAX) { 222 let hit = hit.expect("aaa"); 223 println!("{} {}", hit.timestamp, hit.deser().unwrap().deleted); 224 count += 1; 225 } 226 } 227 println!("total hits: {}", count); 228} 229 230fn debug() { 231 let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db"); 232 let info = db.info().expect("cant get db info"); 233 println!("disk size: {}", info.disk_size); 234 for (nsid, blocks) in info.nsids { 235 print!("{nsid}:"); 236 let mut last_size = 0; 237 let mut same_size_count = 0; 238 for item_count in blocks { 239 if item_count == last_size { 240 same_size_count += 1; 241 } else { 242 if same_size_count > 1 { 243 print!("x{}", same_size_count); 244 } 245 print!(" {item_count}"); 246 same_size_count = 0; 247 } 248 last_size = item_count; 249 } 250 print!("\n"); 251 } 252} 253 254fn compact() { 255 let db = Db::new( 256 DbConfig::default().ks(|c| { 257 c.max_journaling_size(u64::MAX) 258 .max_write_buffer_size(u64::MAX) 259 }), 260 CancellationToken::new(), 261 ) 262 .expect("couldnt create db"); 263 let info = db.info().expect("cant get db info"); 264 db.major_compact().expect("cant compact"); 265 std::thread::sleep(Duration::from_secs(5)); 266 let compacted_info = db.info().expect("cant get db info"); 267 println!( 268 "disk size: {} -> {}", 269 info.disk_size, compacted_info.disk_size 270 ); 271 for (nsid, blocks) in info.nsids { 272 println!( 273 "{nsid}: {} -> {}", 274 blocks.len(), 275 compacted_info.nsids[&nsid].len() 276 ) 277 } 278} 279 280fn migrate() { 281 let cancel_token = CancellationToken::new(); 282 let from = Arc::new( 283 Db::new( 284 DbConfig::default().path(".fjall_data_from"), 285 cancel_token.child_token(), 286 ) 287 .expect("couldnt create db"), 288 ); 289 let to = Arc::new( 290 Db::new( 291 DbConfig::default().path(".fjall_data_to").ks(|c| { 292 c.max_journaling_size(u64::MAX) 293 .max_write_buffer_size(u64::MAX) 294 .compaction_workers(rayon::current_num_threads() * 4) 295 .flush_workers(rayon::current_num_threads() * 4) 296 }), 297 cancel_token.child_token(), 298 ) 299 .expect("couldnt create db"), 300 ); 301 302 let nsids = from.get_nsids().collect::<Vec<_>>(); 303 let _eps_thread = std::thread::spawn({ 304 let to = to.clone(); 305 move || { 306 loop { 307 std::thread::sleep(Duration::from_secs(3)); 308 let eps = to.eps(); 309 if eps > 0 { 310 tracing::info!("{} rps", eps); 311 } 312 } 313 } 314 }); 315 let mut threads = Vec::with_capacity(nsids.len()); 316 let start = CLOCK.now(); 317 for nsid in nsids { 318 let from = from.clone(); 319 let to = to.clone(); 320 threads.push(std::thread::spawn(move || { 321 tracing::info!("{}: migrating...", nsid.deref()); 322 let mut count = 0_u64; 323 for hits in from 324 .get_hits(&nsid, .., usize::MAX) 325 .chunks(100000) 326 .into_iter() 327 { 328 to.ingest_events(hits.map(|hit| { 329 count += 1; 330 let hit = hit.expect("cant decode hit"); 331 EventRecord { 332 nsid: nsid.to_smolstr(), 333 timestamp: hit.timestamp, 334 deleted: hit.deser().unwrap().deleted, 335 } 336 })) 337 .expect("cant record event"); 338 } 339 tracing::info!("{}: ingested {} events...", nsid.deref(), count); 340 count 341 })); 342 } 343 let mut total_count = 0_u64; 344 for thread in threads { 345 let count = thread.join().expect("thread panicked"); 346 total_count += count; 347 } 348 let read_time = start.elapsed(); 349 let read_per_second = total_count as f64 / read_time.as_secs_f64(); 350 drop(from); 351 tracing::info!("starting sync!!!"); 352 to.sync(true).expect("cant sync"); 353 tracing::info!("persisting..."); 354 let total_time = start.elapsed(); 355 let write_per_second = total_count as f64 / (total_time - read_time).as_secs_f64(); 356 tracing::info!( 357 "migrated {total_count} events in {total_time:?} ({read_per_second:.2} rps, {write_per_second:.2} wps)" 358 ); 359}