Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use clap::Parser; 2use jetstream::events::Cursor; 3use metrics::{describe_gauge, gauge, Unit}; 4use metrics_exporter_prometheus::PrometheusBuilder; 5use std::path::PathBuf; 6use std::time::{Duration, SystemTime}; 7use tokio::task::JoinSet; 8use ufos::consumer; 9use ufos::file_consumer; 10use ufos::server; 11use ufos::storage::{StorageWhatever, StoreBackground, StoreReader, StoreWriter}; 12use ufos::storage_fjall::FjallStorage; 13use ufos::store_types::SketchSecretPrefix; 14use ufos::{nice_duration, ConsumerInfo}; 15 16#[cfg(not(target_env = "msvc"))] 17use tikv_jemallocator::Jemalloc; 18 19#[cfg(not(target_env = "msvc"))] 20#[global_allocator] 21static GLOBAL: Jemalloc = Jemalloc; 22 23/// Aggregate links in the at-mosphere 24#[derive(Parser, Debug, Clone)] 25#[command(version, about, long_about = None)] 26struct Args { 27 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: 28 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2' 29 #[arg(long)] 30 jetstream: String, 31 /// allow changing jetstream endpoints 32 #[arg(long, action)] 33 jetstream_force: bool, 34 /// don't request zstd-compressed jetstream events 35 /// 36 /// reduces CPU at the expense of more ingress bandwidth 37 #[arg(long, action)] 38 jetstream_no_zstd: bool, 39 /// Location to store persist data to disk 40 #[arg(long)] 41 data: PathBuf, 42 /// DEBUG: don't start the jetstream consumer or its write loop 43 #[arg(long, action)] 44 pause_writer: bool, 45 /// Adjust runtime settings like background task intervals for efficient backfill 46 #[arg(long, action)] 47 backfill: bool, 48 /// DEBUG: force the rw loop to fall behind by pausing it 49 /// todo: restore this 50 #[arg(long, action)] 51 pause_rw: bool, 52 /// reset the rollup cursor, scrape through missed things in the past (backfill) 53 #[arg(long, action)] 54 reroll: bool, 55 /// DEBUG: interpret jetstream as a file fixture 56 #[arg(long, action)] 57 jetstream_fixture: bool, 58} 59 60#[tokio::main] 61async fn main() -> anyhow::Result<()> { 62 env_logger::init(); 63 64 let args = Args::parse(); 65 let jetstream = args.jetstream.clone(); 66 let (read_store, write_store, cursor, sketch_secret) = FjallStorage::init( 67 args.data.clone(), 68 jetstream, 69 args.jetstream_force, 70 Default::default(), 71 )?; 72 go(args, read_store, write_store, cursor, sketch_secret).await?; 73 Ok(()) 74} 75 76async fn go<B: StoreBackground + 'static>( 77 args: Args, 78 read_store: impl StoreReader + 'static + Clone, 79 mut write_store: impl StoreWriter<B> + 'static, 80 cursor: Option<Cursor>, 81 sketch_secret: SketchSecretPrefix, 82) -> anyhow::Result<()> { 83 let mut tasks: JoinSet<anyhow::Result<()>> = JoinSet::new(); 84 85 println!("starting server with storage..."); 86 let serving = server::serve(read_store.clone()); 87 tasks.spawn(async move { 88 serving.await.map_err(|e| { 89 log::warn!("server ended: {e}"); 90 anyhow::anyhow!(e) 91 }) 92 }); 93 94 if args.pause_writer { 95 log::info!("not starting jetstream or the write loop."); 96 for t in tasks.join_all().await { 97 if let Err(e) = t { 98 return Err(anyhow::anyhow!(e)); 99 } 100 } 101 return Ok(()); 102 } 103 104 let batches = if args.jetstream_fixture { 105 log::info!("starting with jestream file fixture: {:?}", args.jetstream); 106 file_consumer::consume(args.jetstream.into(), sketch_secret, cursor).await? 107 } else { 108 log::info!( 109 "starting consumer with cursor: {cursor:?} from {:?} ago", 110 cursor.map(|c| c.elapsed()) 111 ); 112 consumer::consume(&args.jetstream, cursor, false, sketch_secret).await? 113 }; 114 115 let rolling = write_store 116 .background_tasks(args.reroll)? 117 .run(args.backfill); 118 tasks.spawn(async move { 119 rolling 120 .await 121 .inspect_err(|e| log::warn!("rollup ended: {e}"))?; 122 Ok(()) 123 }); 124 125 tasks.spawn(async move { 126 write_store 127 .receive_batches(batches) 128 .await 129 .inspect_err(|e| log::warn!("consumer ended: {e}"))?; 130 Ok(()) 131 }); 132 133 tasks.spawn(async move { 134 do_update_stuff(read_store).await; 135 log::warn!("status task ended"); 136 Ok(()) 137 }); 138 139 install_metrics_server()?; 140 141 for (i, t) in tasks.join_all().await.iter().enumerate() { 142 log::warn!("task {i} done: {t:?}"); 143 } 144 145 println!("bye!"); 146 147 Ok(()) 148} 149 150fn install_metrics_server() -> anyhow::Result<()> { 151 log::info!("installing metrics server..."); 152 let host = [0, 0, 0, 0]; 153 let port = 8765; 154 PrometheusBuilder::new() 155 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 156 .set_bucket_duration(Duration::from_secs(60))? 157 .set_bucket_count(std::num::NonZero::new(10).unwrap()) // count * duration = 10 mins. stuff doesn't happen that fast here. 158 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage) 159 .with_http_listener((host, port)) 160 .install()?; 161 log::info!( 162 "metrics server installed! listening on http://{}.{}.{}.{}:{port}", 163 host[0], 164 host[1], 165 host[2], 166 host[3] 167 ); 168 Ok(()) 169} 170 171async fn do_update_stuff(read_store: impl StoreReader) { 172 describe_gauge!( 173 "persisted_cursor_age", 174 Unit::Microseconds, 175 "microseconds between our clock and the latest persisted event's cursor" 176 ); 177 describe_gauge!( 178 "rollup_cursor_age", 179 Unit::Microseconds, 180 "microseconds between our clock and the latest rollup cursor" 181 ); 182 let started_at = std::time::SystemTime::now(); 183 let mut first_cursor = None; 184 let mut first_rollup = None; 185 let mut last_at = std::time::SystemTime::now(); 186 let mut last_cursor = None; 187 let mut last_rollup = None; 188 let mut interval = tokio::time::interval(std::time::Duration::from_secs(4)); 189 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 190 loop { 191 interval.tick().await; 192 read_store.update_metrics(); 193 match read_store.get_consumer_info().await { 194 Err(e) => log::warn!("failed to get jetstream consumer info: {e:?}"), 195 Ok(ConsumerInfo::Jetstream { 196 latest_cursor, 197 rollup_cursor, 198 .. 199 }) => { 200 let now = std::time::SystemTime::now(); 201 let latest_cursor = latest_cursor.map(Cursor::from_raw_u64); 202 let rollup_cursor = rollup_cursor.map(Cursor::from_raw_u64); 203 backfill_info( 204 latest_cursor, 205 rollup_cursor, 206 last_cursor, 207 last_rollup, 208 last_at, 209 first_cursor, 210 first_rollup, 211 started_at, 212 now, 213 ); 214 first_cursor = first_cursor.or(latest_cursor); 215 first_rollup = first_rollup.or(rollup_cursor); 216 last_cursor = latest_cursor; 217 last_rollup = rollup_cursor; 218 last_at = now; 219 } 220 } 221 } 222} 223 224#[allow(clippy::too_many_arguments)] 225fn backfill_info( 226 latest_cursor: Option<Cursor>, 227 rollup_cursor: Option<Cursor>, 228 last_cursor: Option<Cursor>, 229 last_rollup: Option<Cursor>, 230 last_at: SystemTime, 231 first_cursor: Option<Cursor>, 232 first_rollup: Option<Cursor>, 233 started_at: SystemTime, 234 now: SystemTime, 235) { 236 if let Some(cursor) = latest_cursor { 237 gauge!("persisted_cursor_age").set(cursor.elapsed_micros_f64()); 238 } 239 if let Some(cursor) = rollup_cursor { 240 gauge!("rollup_cursor_age").set(cursor.elapsed_micros_f64()); 241 } 242 243 let nice_dt_two_maybes = |earlier: Option<Cursor>, later: Option<Cursor>| match (earlier, later) 244 { 245 (Some(earlier), Some(later)) => match later.duration_since(&earlier) { 246 Ok(dt) => nice_duration(dt), 247 Err(e) => { 248 let rev_dt = e.duration(); 249 format!("+{}", nice_duration(rev_dt)) 250 } 251 }, 252 _ => "unknown".to_string(), 253 }; 254 255 let rate = |mlatest: Option<Cursor>, msince: Option<Cursor>, real: Duration| { 256 mlatest 257 .zip(msince) 258 .map(|(latest, since)| { 259 latest 260 .duration_since(&since) 261 .unwrap_or(Duration::from_millis(1)) 262 }) 263 .map(|dtc| format!("{:.2}", dtc.as_secs_f64() / real.as_secs_f64())) 264 .unwrap_or("??".into()) 265 }; 266 267 let dt_real = now 268 .duration_since(last_at) 269 .unwrap_or(Duration::from_millis(1)); 270 271 let dt_real_total = now 272 .duration_since(started_at) 273 .unwrap_or(Duration::from_millis(1)); 274 275 let cursor_rate = rate(latest_cursor, last_cursor, dt_real); 276 let cursor_avg = rate(latest_cursor, first_cursor, dt_real_total); 277 278 let rollup_rate = rate(rollup_cursor, last_rollup, dt_real); 279 let rollup_avg = rate(rollup_cursor, first_rollup, dt_real_total); 280 281 log::trace!( 282 "cursor: {} behind (→{}, {cursor_rate}x, {cursor_avg}x avg). rollup: {} behind (→{}, {rollup_rate}x, {rollup_avg}x avg).", 283 latest_cursor.map(|c| c.elapsed().map(nice_duration).unwrap_or("++".to_string())).unwrap_or("?".to_string()), 284 nice_dt_two_maybes(last_cursor, latest_cursor), 285 rollup_cursor.map(|c| c.elapsed().map(nice_duration).unwrap_or("++".to_string())).unwrap_or("?".to_string()), 286 nice_dt_two_maybes(last_rollup, rollup_cursor), 287 ); 288}