Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
at main 9.9 kB view raw
1use clap::Parser; 2use jetstream::events::Cursor; 3use metrics::{describe_gauge, gauge, Unit}; 4use metrics_exporter_prometheus::PrometheusBuilder; 5use std::path::PathBuf; 6use std::time::{Duration, SystemTime}; 7use tokio::task::JoinSet; 8use ufos::consumer; 9use ufos::file_consumer; 10use ufos::server; 11use ufos::storage::{StorageWhatever, StoreBackground, StoreReader, StoreWriter}; 12use ufos::storage_fjall::FjallStorage; 13use ufos::store_types::SketchSecretPrefix; 14use ufos::{nice_duration, ConsumerInfo}; 15 16#[cfg(not(target_env = "msvc"))] 17use tikv_jemallocator::Jemalloc; 18 19#[cfg(not(target_env = "msvc"))] 20#[global_allocator] 21static GLOBAL: Jemalloc = Jemalloc; 22 23/// Aggregate links in the at-mosphere 24#[derive(Parser, Debug, Clone)] 25#[command(version, about, long_about = None)] 26struct Args { 27 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: 28 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2' 29 #[arg(long)] 30 jetstream: String, 31 /// allow changing jetstream endpoints 32 #[arg(long, action)] 33 jetstream_force: bool, 34 /// don't request zstd-compressed jetstream events 35 /// 36 /// reduces CPU at the expense of more ingress bandwidth 37 #[arg(long, action)] 38 jetstream_no_zstd: bool, 39 /// Location to store persist data to disk 40 #[arg(long)] 41 data: PathBuf, 42 /// DEBUG: don't start the jetstream consumer or its write loop 43 #[arg(long, action)] 44 pause_writer: bool, 45 /// Adjust runtime settings like background task intervals for efficient backfill 46 #[arg(long, action)] 47 backfill: bool, 48 /// DEBUG: force the rw loop to fall behind by pausing it 49 /// todo: restore this 50 #[arg(long, action)] 51 pause_rw: bool, 52 /// reset the rollup cursor, scrape through missed things in the past (backfill) 53 #[arg(long, action)] 54 reroll: bool, 55 /// DEBUG: interpret jetstream as a file fixture 56 #[arg(long, action)] 57 jetstream_fixture: bool, 58} 59 60#[tokio::main] 61async fn main() -> anyhow::Result<()> { 62 env_logger::init(); 63 64 let args = Args::parse(); 65 let jetstream = args.jetstream.clone(); 66 let (read_store, write_store, cursor, sketch_secret) = FjallStorage::init( 67 args.data.clone(), 68 jetstream, 69 args.jetstream_force, 70 Default::default(), 71 )?; 72 go(args, read_store, write_store, cursor, sketch_secret).await?; 73 Ok(()) 74} 75 76async fn go<B: StoreBackground + 'static>( 77 args: Args, 78 read_store: impl StoreReader + 'static + Clone, 79 mut write_store: impl StoreWriter<B> + 'static, 80 cursor: Option<Cursor>, 81 sketch_secret: SketchSecretPrefix, 82) -> anyhow::Result<()> { 83 let mut whatever_tasks: JoinSet<anyhow::Result<()>> = JoinSet::new(); 84 let mut consumer_tasks: JoinSet<anyhow::Result<()>> = JoinSet::new(); 85 86 println!("starting server with storage..."); 87 let serving = server::serve(read_store.clone()); 88 whatever_tasks.spawn(async move { 89 serving.await.map_err(|e| { 90 log::warn!("server ended: {e}"); 91 anyhow::anyhow!(e) 92 }) 93 }); 94 95 if args.pause_writer { 96 log::info!("not starting jetstream or the write loop."); 97 for t in whatever_tasks.join_all().await { 98 if let Err(e) = t { 99 return Err(anyhow::anyhow!(e)); 100 } 101 } 102 return Ok(()); 103 } 104 105 let batches = if args.jetstream_fixture { 106 log::info!("starting with jestream file fixture: {:?}", args.jetstream); 107 file_consumer::consume(args.jetstream.into(), sketch_secret, cursor).await? 108 } else { 109 log::info!( 110 "starting consumer with cursor: {cursor:?} from {:?} ago", 111 cursor.map(|c| c.elapsed()) 112 ); 113 consumer::consume(&args.jetstream, cursor, false, sketch_secret).await? 114 }; 115 116 let rolling = write_store 117 .background_tasks(args.reroll)? 118 .run(args.backfill); 119 whatever_tasks.spawn(async move { 120 rolling 121 .await 122 .inspect_err(|e| log::warn!("rollup ended: {e}"))?; 123 Ok(()) 124 }); 125 126 consumer_tasks.spawn(async move { 127 write_store 128 .receive_batches(batches) 129 .await 130 .inspect_err(|e| log::warn!("consumer ended: {e}"))?; 131 Ok(()) 132 }); 133 134 whatever_tasks.spawn(async move { 135 do_update_stuff(read_store).await; 136 log::warn!("status task ended"); 137 Ok(()) 138 }); 139 140 install_metrics_server()?; 141 142 for (i, t) in consumer_tasks.join_all().await.iter().enumerate() { 143 log::warn!("task {i} done: {t:?}"); 144 } 145 146 println!("consumer tasks all completed, killing the others"); 147 whatever_tasks.shutdown().await; 148 149 println!("bye!"); 150 151 Ok(()) 152} 153 154fn install_metrics_server() -> anyhow::Result<()> { 155 log::info!("installing metrics server..."); 156 let host = [0, 0, 0, 0]; 157 let port = 8765; 158 PrometheusBuilder::new() 159 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 160 .set_bucket_duration(Duration::from_secs(60))? 161 .set_bucket_count(std::num::NonZero::new(10).unwrap()) // count * duration = 10 mins. stuff doesn't happen that fast here. 162 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage) 163 .with_http_listener((host, port)) 164 .install()?; 165 log::info!( 166 "metrics server installed! listening on http://{}.{}.{}.{}:{port}", 167 host[0], 168 host[1], 169 host[2], 170 host[3] 171 ); 172 Ok(()) 173} 174 175async fn do_update_stuff(read_store: impl StoreReader) { 176 describe_gauge!( 177 "persisted_cursor_age", 178 Unit::Microseconds, 179 "microseconds between our clock and the latest persisted event's cursor" 180 ); 181 describe_gauge!( 182 "rollup_cursor_age", 183 Unit::Microseconds, 184 "microseconds between our clock and the latest rollup cursor" 185 ); 186 let started_at = std::time::SystemTime::now(); 187 let mut first_cursor = None; 188 let mut first_rollup = None; 189 let mut last_at = std::time::SystemTime::now(); 190 let mut last_cursor = None; 191 let mut last_rollup = None; 192 let mut interval = tokio::time::interval(std::time::Duration::from_secs(4)); 193 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 194 loop { 195 interval.tick().await; 196 read_store.update_metrics(); 197 match read_store.get_consumer_info().await { 198 Err(e) => log::warn!("failed to get jetstream consumer info: {e:?}"), 199 Ok(ConsumerInfo::Jetstream { 200 latest_cursor, 201 rollup_cursor, 202 .. 203 }) => { 204 let now = std::time::SystemTime::now(); 205 let latest_cursor = latest_cursor.map(Cursor::from_raw_u64); 206 let rollup_cursor = rollup_cursor.map(Cursor::from_raw_u64); 207 backfill_info( 208 latest_cursor, 209 rollup_cursor, 210 last_cursor, 211 last_rollup, 212 last_at, 213 first_cursor, 214 first_rollup, 215 started_at, 216 now, 217 ); 218 first_cursor = first_cursor.or(latest_cursor); 219 first_rollup = first_rollup.or(rollup_cursor); 220 last_cursor = latest_cursor; 221 last_rollup = rollup_cursor; 222 last_at = now; 223 } 224 } 225 } 226} 227 228#[allow(clippy::too_many_arguments)] 229fn backfill_info( 230 latest_cursor: Option<Cursor>, 231 rollup_cursor: Option<Cursor>, 232 last_cursor: Option<Cursor>, 233 last_rollup: Option<Cursor>, 234 last_at: SystemTime, 235 first_cursor: Option<Cursor>, 236 first_rollup: Option<Cursor>, 237 started_at: SystemTime, 238 now: SystemTime, 239) { 240 if let Some(cursor) = latest_cursor { 241 gauge!("persisted_cursor_age").set(cursor.elapsed_micros_f64()); 242 } 243 if let Some(cursor) = rollup_cursor { 244 gauge!("rollup_cursor_age").set(cursor.elapsed_micros_f64()); 245 } 246 247 let nice_dt_two_maybes = |earlier: Option<Cursor>, later: Option<Cursor>| match (earlier, later) 248 { 249 (Some(earlier), Some(later)) => match later.duration_since(&earlier) { 250 Ok(dt) => nice_duration(dt), 251 Err(e) => { 252 let rev_dt = e.duration(); 253 format!("+{}", nice_duration(rev_dt)) 254 } 255 }, 256 _ => "unknown".to_string(), 257 }; 258 259 let rate = |mlatest: Option<Cursor>, msince: Option<Cursor>, real: Duration| { 260 mlatest 261 .zip(msince) 262 .map(|(latest, since)| { 263 latest 264 .duration_since(&since) 265 .unwrap_or(Duration::from_millis(1)) 266 }) 267 .map(|dtc| format!("{:.2}", dtc.as_secs_f64() / real.as_secs_f64())) 268 .unwrap_or("??".into()) 269 }; 270 271 let dt_real = now 272 .duration_since(last_at) 273 .unwrap_or(Duration::from_millis(1)); 274 275 let dt_real_total = now 276 .duration_since(started_at) 277 .unwrap_or(Duration::from_millis(1)); 278 279 let cursor_rate = rate(latest_cursor, last_cursor, dt_real); 280 let cursor_avg = rate(latest_cursor, first_cursor, dt_real_total); 281 282 let rollup_rate = rate(rollup_cursor, last_rollup, dt_real); 283 let rollup_avg = rate(rollup_cursor, first_rollup, dt_real_total); 284 285 log::trace!( 286 "cursor: {} behind (→{}, {cursor_rate}x, {cursor_avg}x avg). rollup: {} behind (→{}, {rollup_rate}x, {rollup_avg}x avg).", 287 latest_cursor.map(|c| c.elapsed().map(nice_duration).unwrap_or("++".to_string())).unwrap_or("?".to_string()), 288 nice_dt_two_maybes(last_cursor, latest_cursor), 289 rollup_cursor.map(|c| c.elapsed().map(nice_duration).unwrap_or("++".to_string())).unwrap_or("?".to_string()), 290 nice_dt_two_maybes(last_rollup, rollup_cursor), 291 ); 292}