tracks lexicons and how many times they appeared on the jetstream
at migrate 11 kB view raw
1use std::{ops::Deref, time::Duration, u64}; 2 3use itertools::Itertools; 4use rclite::Arc; 5use smol_str::ToSmolStr; 6use tokio_util::sync::CancellationToken; 7use tracing::Level; 8use tracing_subscriber::EnvFilter; 9 10use crate::{ 11 api::serve, 12 db::{Db, DbConfig, EventRecord}, 13 error::AppError, 14 jetstream::JetstreamClient, 15 utils::{CLOCK, RelativeDateTime, get_time}, 16}; 17 18mod api; 19mod db; 20mod db_old; 21mod error; 22mod jetstream; 23mod utils; 24 25#[cfg(not(target_env = "msvc"))] 26#[global_allocator] 27static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; 28 29#[cfg(target_env = "msvc")] 30#[global_allocator] 31static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; 32 33#[tokio::main] 34async fn main() { 35 tracing_subscriber::fmt::fmt() 36 .with_env_filter( 37 EnvFilter::builder() 38 .with_default_directive(Level::INFO.into()) 39 .from_env_lossy(), 40 ) 41 .compact() 42 .init(); 43 44 match std::env::args().nth(1).as_deref() { 45 Some("compact") => { 46 compact(); 47 return; 48 } 49 Some("migrate") => { 50 migrate(); 51 return; 52 } 53 Some("debug") => { 54 debug(); 55 return; 56 } 57 Some(x) => { 58 tracing::error!("unknown command: {}", x); 59 return; 60 } 61 None => {} 62 } 63 64 let cancel_token = CancellationToken::new(); 65 66 let db = Arc::new( 67 Db::new(DbConfig::default(), cancel_token.child_token()).expect("couldnt create db"), 68 ); 69 70 rustls::crypto::ring::default_provider() 71 .install_default() 72 .expect("cant install rustls crypto provider"); 73 74 let mut jetstream = 75 match JetstreamClient::new("wss://jetstream2.us-west.bsky.network/subscribe") { 76 Ok(client) => client, 77 Err(err) => { 78 tracing::error!("can't create jetstream client: {err}"); 79 return; 80 } 81 }; 82 83 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(1000); 84 let consume_events = tokio::spawn({ 85 let consume_cancel = cancel_token.child_token(); 86 async move { 87 jetstream.connect().await?; 88 loop { 89 tokio::select! { 90 maybe_event = jetstream.read(consume_cancel.child_token()) => match maybe_event { 91 Ok(event) => { 92 let Some(record) = EventRecord::from_jetstream(event) else { 93 continue; 94 }; 95 event_tx.send(record).await?; 96 } 97 Err(err) => return Err(err), 98 }, 99 _ = consume_cancel.cancelled() => break Ok(()), 100 } 101 } 102 } 103 }); 104 105 let ingest_events = std::thread::spawn({ 106 let db = db.clone(); 107 move || { 108 let mut buffer = Vec::new(); 109 loop { 110 let read = event_rx.blocking_recv_many(&mut buffer, 100); 111 if let Err(err) = db.ingest_events(buffer.drain(..)) { 112 tracing::error!("failed to ingest events: {}", err); 113 } 114 if read == 0 || db.is_shutting_down() { 115 break; 116 } 117 } 118 } 119 }); 120 121 let db_task = tokio::task::spawn({ 122 let db = db.clone(); 123 async move { 124 let sync_period = Duration::from_secs(10); 125 let mut sync_interval = tokio::time::interval(sync_period); 126 sync_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 127 128 let compact_period = std::time::Duration::from_secs(60 * 30); // 30 mins 129 let mut compact_interval = tokio::time::interval(compact_period); 130 compact_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 131 132 loop { 133 let sync_db = async || { 134 tokio::task::spawn_blocking({ 135 let db = db.clone(); 136 move || { 137 if db.is_shutting_down() { 138 return; 139 } 140 match db.sync(false) { 141 Ok(_) => (), 142 Err(e) => tracing::error!("failed to sync db: {}", e), 143 } 144 } 145 }) 146 .await 147 .unwrap(); 148 }; 149 let compact_db = async || { 150 tokio::task::spawn_blocking({ 151 let db = db.clone(); 152 move || { 153 if db.is_shutting_down() { 154 return; 155 } 156 let end = get_time() - compact_period / 2; 157 let start = end - compact_period; 158 let range = start.as_secs()..end.as_secs(); 159 tracing::info!( 160 { 161 start = %RelativeDateTime::from_now(start), 162 end = %RelativeDateTime::from_now(end), 163 }, 164 "running compaction...", 165 ); 166 match db.compact_all(db.cfg.max_block_size, range, false) { 167 Ok(_) => (), 168 Err(e) => tracing::error!("failed to compact db: {}", e), 169 } 170 } 171 }) 172 .await 173 .unwrap(); 174 }; 175 tokio::select! { 176 _ = sync_interval.tick() => sync_db().await, 177 _ = compact_interval.tick() => compact_db().await, 178 _ = db.shutting_down() => break, 179 } 180 } 181 } 182 }); 183 184 tokio::select! { 185 res = serve(db.clone(), cancel_token.child_token()) => { 186 if let Err(e) = res { 187 tracing::error!("serve failed: {}", e); 188 } 189 } 190 res = consume_events => { 191 let err = 192 res 193 .map_err(AppError::from) 194 .and_then(std::convert::identity) 195 .expect_err("consume events cant return ok"); 196 tracing::error!("consume events failed: {}", err); 197 }, 198 _ = tokio::signal::ctrl_c() => { 199 tracing::info!("received ctrl+c!"); 200 cancel_token.cancel(); 201 } 202 } 203 204 tracing::info!("shutting down..."); 205 cancel_token.cancel(); 206 ingest_events.join().expect("failed to join ingest events"); 207 db_task.await.expect("cant join db task"); 208 db.sync(true).expect("cant sync db"); 209} 210 211fn debug() { 212 let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db"); 213 let info = db.info().expect("cant get db info"); 214 println!("disk size: {}", info.disk_size); 215 for (nsid, blocks) in info.nsids { 216 print!("{nsid}:"); 217 let mut last_size = 0; 218 let mut same_size_count = 0; 219 for item_count in blocks { 220 if item_count == last_size { 221 same_size_count += 1; 222 } else { 223 if same_size_count > 1 { 224 print!("x{}", same_size_count); 225 } 226 print!(" {item_count}"); 227 same_size_count = 0; 228 } 229 last_size = item_count; 230 } 231 print!("\n"); 232 } 233} 234 235fn compact() { 236 let db = Db::new( 237 DbConfig::default().ks(|c| { 238 c.max_journaling_size(u64::MAX) 239 .max_write_buffer_size(u64::MAX) 240 }), 241 CancellationToken::new(), 242 ) 243 .expect("couldnt create db"); 244 let info = db.info().expect("cant get db info"); 245 db.major_compact().expect("cant compact"); 246 std::thread::sleep(Duration::from_secs(5)); 247 let compacted_info = db.info().expect("cant get db info"); 248 println!( 249 "disk size: {} -> {}", 250 info.disk_size, compacted_info.disk_size 251 ); 252 for (nsid, blocks) in info.nsids { 253 println!( 254 "{nsid}: {} -> {}", 255 blocks.len(), 256 compacted_info.nsids[&nsid].len() 257 ) 258 } 259} 260 261fn migrate() { 262 let cancel_token = CancellationToken::new(); 263 264 let from = Arc::new(db_old::Db::new(".fjall_data_from").expect("couldnt create db")); 265 266 let to = Arc::new( 267 Db::new( 268 DbConfig::default().path(".fjall_data_to").ks(|c| { 269 c.max_journaling_size(u64::MAX) 270 .max_write_buffer_size(u64::MAX) 271 .compaction_workers(rayon::current_num_threads() * 4) 272 .flush_workers(rayon::current_num_threads() * 4) 273 }), 274 cancel_token.child_token(), 275 ) 276 .expect("couldnt create db"), 277 ); 278 279 let nsids = from.get_nsids().collect::<Vec<_>>(); 280 let eps_thread = std::thread::spawn({ 281 let to = to.clone(); 282 move || { 283 loop { 284 std::thread::sleep(Duration::from_secs(3)); 285 let eps = to.eps(); 286 if eps > 0 { 287 tracing::info!("{} rps", eps); 288 } 289 } 290 } 291 }); 292 let mut threads = Vec::with_capacity(nsids.len()); 293 let start = CLOCK.now(); 294 for nsid in nsids { 295 let from = from.clone(); 296 let to = to.clone(); 297 threads.push(std::thread::spawn(move || { 298 tracing::info!("{}: migrating...", nsid.deref()); 299 let mut count = 0_u64; 300 for hits in from.get_hits(&nsid, ..).chunks(100000).into_iter() { 301 to.ingest_events(hits.map(|hit| { 302 count += 1; 303 let hit = hit.expect("cant decode hit"); 304 EventRecord { 305 nsid: nsid.to_smolstr(), 306 timestamp: hit.timestamp, 307 deleted: hit.deser().unwrap().deleted, 308 } 309 })) 310 .expect("cant record event"); 311 } 312 tracing::info!("{}: ingested {} events...", nsid.deref(), count); 313 count 314 })); 315 } 316 let mut total_count = 0_u64; 317 for thread in threads { 318 let count = thread.join().expect("thread panicked"); 319 total_count += count; 320 } 321 let read_time = start.elapsed(); 322 let read_per_second = total_count as f64 / read_time.as_secs_f64(); 323 drop(from); 324 tracing::info!("starting sync!!!"); 325 to.sync(true).expect("cant sync"); 326 tracing::info!("persisting..."); 327 let total_time = start.elapsed(); 328 let write_per_second = total_count as f64 / (total_time - read_time).as_secs_f64(); 329 tracing::info!( 330 "migrated {total_count} events in {total_time:?} ({read_per_second:.2} rps, {write_per_second:.2} wps)" 331 ); 332}