Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use clap::Parser; 2use jetstream::events::Cursor; 3use metrics::{describe_gauge, gauge, Unit}; 4use metrics_exporter_prometheus::PrometheusBuilder; 5use std::path::PathBuf; 6use std::time::{Duration, SystemTime}; 7use ufos::consumer; 8use ufos::file_consumer; 9use ufos::server; 10use ufos::storage::{StorageWhatever, StoreBackground, StoreReader, StoreWriter}; 11use ufos::storage_fjall::FjallStorage; 12use ufos::store_types::SketchSecretPrefix; 13use ufos::{nice_duration, ConsumerInfo}; 14 15#[cfg(not(target_env = "msvc"))] 16use tikv_jemallocator::Jemalloc; 17 18#[cfg(not(target_env = "msvc"))] 19#[global_allocator] 20static GLOBAL: Jemalloc = Jemalloc; 21 22/// Aggregate links in the at-mosphere 23#[derive(Parser, Debug, Clone)] 24#[command(version, about, long_about = None)] 25struct Args { 26 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: 27 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2' 28 #[arg(long)] 29 jetstream: String, 30 /// allow changing jetstream endpoints 31 #[arg(long, action)] 32 jetstream_force: bool, 33 /// don't request zstd-compressed jetstream events 34 /// 35 /// reduces CPU at the expense of more ingress bandwidth 36 #[arg(long, action)] 37 jetstream_no_zstd: bool, 38 /// Location to store persist data to disk 39 #[arg(long)] 40 data: PathBuf, 41 /// DEBUG: don't start the jetstream consumer or its write loop 42 #[arg(long, action)] 43 pause_writer: bool, 44 /// Adjust runtime settings like background task intervals for efficient backfill 45 #[arg(long, action)] 46 backfill: bool, 47 /// DEBUG: force the rw loop to fall behind by pausing it 48 /// todo: restore this 49 #[arg(long, action)] 50 pause_rw: bool, 51 /// reset the rollup cursor, scrape through missed things in the past (backfill) 52 #[arg(long, action)] 53 reroll: bool, 54 /// DEBUG: interpret jetstream as a file fixture 55 #[arg(long, action)] 56 jetstream_fixture: bool, 57} 58 59#[tokio::main] 60async fn main() -> anyhow::Result<()> { 61 env_logger::init(); 62 63 let args = Args::parse(); 64 let jetstream = args.jetstream.clone(); 65 let (read_store, write_store, cursor, sketch_secret) = FjallStorage::init( 66 args.data.clone(), 67 jetstream, 68 args.jetstream_force, 69 Default::default(), 70 )?; 71 go(args, read_store, write_store, cursor, sketch_secret).await?; 72 Ok(()) 73} 74 75async fn go<B: StoreBackground>( 76 args: Args, 77 read_store: impl StoreReader + 'static + Clone, 78 mut write_store: impl StoreWriter<B> + 'static, 79 cursor: Option<Cursor>, 80 sketch_secret: SketchSecretPrefix, 81) -> anyhow::Result<()> { 82 println!("starting server with storage..."); 83 let serving = server::serve(read_store.clone()); 84 85 if args.pause_writer { 86 log::info!("not starting jetstream or the write loop."); 87 serving.await.map_err(|e| anyhow::anyhow!(e))?; 88 return Ok(()); 89 } 90 91 let batches = if args.jetstream_fixture { 92 log::info!("starting with jestream file fixture: {:?}", args.jetstream); 93 file_consumer::consume(args.jetstream.into(), sketch_secret, cursor).await? 94 } else { 95 log::info!( 96 "starting consumer with cursor: {cursor:?} from {:?} ago", 97 cursor.map(|c| c.elapsed()) 98 ); 99 consumer::consume(&args.jetstream, cursor, false, sketch_secret).await? 100 }; 101 102 let rolling = write_store 103 .background_tasks(args.reroll)? 104 .run(args.backfill); 105 let consuming = write_store.receive_batches(batches); 106 107 let stating = do_update_stuff(read_store); 108 109 install_metrics_server()?; 110 111 tokio::select! { 112 z = serving => log::warn!("serve task ended: {z:?}"), 113 z = rolling => log::warn!("rollup task ended: {z:?}"), 114 z = consuming => log::warn!("consuming task ended: {z:?}"), 115 z = stating => log::warn!("status task ended: {z:?}"), 116 }; 117 118 println!("bye!"); 119 120 Ok(()) 121} 122 123fn install_metrics_server() -> anyhow::Result<()> { 124 log::info!("installing metrics server..."); 125 let host = [0, 0, 0, 0]; 126 let port = 8765; 127 PrometheusBuilder::new() 128 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 129 .set_bucket_duration(Duration::from_secs(60))? 130 .set_bucket_count(std::num::NonZero::new(10).unwrap()) // count * duration = 10 mins. stuff doesn't happen that fast here. 131 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage) 132 .with_http_listener((host, port)) 133 .install()?; 134 log::info!( 135 "metrics server installed! listening on http://{}.{}.{}.{}:{port}", 136 host[0], 137 host[1], 138 host[2], 139 host[3] 140 ); 141 Ok(()) 142} 143 144async fn do_update_stuff(read_store: impl StoreReader) { 145 describe_gauge!( 146 "persisted_cursor_age", 147 Unit::Microseconds, 148 "microseconds between our clock and the latest persisted event's cursor" 149 ); 150 describe_gauge!( 151 "rollup_cursor_age", 152 Unit::Microseconds, 153 "microseconds between our clock and the latest rollup cursor" 154 ); 155 let started_at = std::time::SystemTime::now(); 156 let mut first_cursor = None; 157 let mut first_rollup = None; 158 let mut last_at = std::time::SystemTime::now(); 159 let mut last_cursor = None; 160 let mut last_rollup = None; 161 let mut interval = tokio::time::interval(std::time::Duration::from_secs(4)); 162 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 163 loop { 164 interval.tick().await; 165 match read_store.get_consumer_info().await { 166 Err(e) => log::warn!("failed to get jetstream consumer info: {e:?}"), 167 Ok(ConsumerInfo::Jetstream { 168 latest_cursor, 169 rollup_cursor, 170 .. 171 }) => { 172 let now = std::time::SystemTime::now(); 173 let latest_cursor = latest_cursor.map(Cursor::from_raw_u64); 174 let rollup_cursor = rollup_cursor.map(Cursor::from_raw_u64); 175 backfill_info( 176 latest_cursor, 177 rollup_cursor, 178 last_cursor, 179 last_rollup, 180 last_at, 181 first_cursor, 182 first_rollup, 183 started_at, 184 now, 185 ); 186 first_cursor = first_cursor.or(latest_cursor); 187 first_rollup = first_rollup.or(rollup_cursor); 188 last_cursor = latest_cursor; 189 last_rollup = rollup_cursor; 190 last_at = now; 191 } 192 } 193 } 194} 195 196#[allow(clippy::too_many_arguments)] 197fn backfill_info( 198 latest_cursor: Option<Cursor>, 199 rollup_cursor: Option<Cursor>, 200 last_cursor: Option<Cursor>, 201 last_rollup: Option<Cursor>, 202 last_at: SystemTime, 203 first_cursor: Option<Cursor>, 204 first_rollup: Option<Cursor>, 205 started_at: SystemTime, 206 now: SystemTime, 207) { 208 if let Some(cursor) = latest_cursor { 209 gauge!("persisted_cursor_age").set(cursor.elapsed_micros_f64()); 210 } 211 if let Some(cursor) = rollup_cursor { 212 gauge!("rollup_cursor_age").set(cursor.elapsed_micros_f64()); 213 } 214 215 let nice_dt_two_maybes = |earlier: Option<Cursor>, later: Option<Cursor>| match (earlier, later) 216 { 217 (Some(earlier), Some(later)) => match later.duration_since(&earlier) { 218 Ok(dt) => nice_duration(dt), 219 Err(e) => { 220 let rev_dt = e.duration(); 221 format!("+{}", nice_duration(rev_dt)) 222 } 223 }, 224 _ => "unknown".to_string(), 225 }; 226 227 let rate = |mlatest: Option<Cursor>, msince: Option<Cursor>, real: Duration| { 228 mlatest 229 .zip(msince) 230 .map(|(latest, since)| { 231 latest 232 .duration_since(&since) 233 .unwrap_or(Duration::from_millis(1)) 234 }) 235 .map(|dtc| format!("{:.2}", dtc.as_secs_f64() / real.as_secs_f64())) 236 .unwrap_or("??".into()) 237 }; 238 239 let dt_real = now 240 .duration_since(last_at) 241 .unwrap_or(Duration::from_millis(1)); 242 243 let dt_real_total = now 244 .duration_since(started_at) 245 .unwrap_or(Duration::from_millis(1)); 246 247 let cursor_rate = rate(latest_cursor, last_cursor, dt_real); 248 let cursor_avg = rate(latest_cursor, first_cursor, dt_real_total); 249 250 let rollup_rate = rate(rollup_cursor, last_rollup, dt_real); 251 let rollup_avg = rate(rollup_cursor, first_rollup, dt_real_total); 252 253 log::trace!( 254 "cursor: {} behind (→{}, {cursor_rate}x, {cursor_avg}x avg). rollup: {} behind (→{}, {rollup_rate}x, {rollup_avg}x avg).", 255 latest_cursor.map(|c| c.elapsed().map(nice_duration).unwrap_or("++".to_string())).unwrap_or("?".to_string()), 256 nice_dt_two_maybes(last_cursor, latest_cursor), 257 rollup_cursor.map(|c| c.elapsed().map(nice_duration).unwrap_or("++".to_string())).unwrap_or("?".to_string()), 258 nice_dt_two_maybes(last_rollup, rollup_cursor), 259 ); 260}