Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use spacedust::consumer; 2use spacedust::delay; 3use spacedust::error::MainTaskError; 4use spacedust::removable_delay_queue::removable_delay_queue; 5use spacedust::server; 6 7use clap::Parser; 8use metrics_exporter_prometheus::PrometheusBuilder; 9use std::time::Duration; 10use tokio::sync::broadcast; 11use tokio_util::sync::CancellationToken; 12 13/// Aggregate links in the at-mosphere 14#[derive(Parser, Debug, Clone)] 15#[command(version, about, long_about = None)] 16struct Args { 17 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: 18 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2' 19 #[arg(long)] 20 jetstream: String, 21 /// don't request zstd-compressed jetstream events 22 /// 23 /// reduces CPU at the expense of more ingress bandwidth 24 #[arg(long, action)] 25 jetstream_no_zstd: bool, 26} 27 28#[tokio::main] 29async fn main() -> Result<(), String> { 30 env_logger::init(); 31 32 // tokio broadcast keeps a single main output queue for all subscribers. 33 // each subscriber clones off a copy of an individual value for each recv. 34 // since there's no large per-client buffer, we can make this one kind of 35 // big and accommodate more slow/bursty clients. 36 // 37 // in fact, we *could* even keep lagging clients alive, inserting lag- 38 // indicating messages to their output.... but for now we'll drop them to 39 // avoid accumulating zombies. 40 // 41 // events on the channel are individual links as they are discovered. a link 42 // contains a source and a target. the target is an at-uri, so it's up to 43 // ~1KB max; source is a collection + link path, which can be more but in 44 // practice the whole link rarely approaches 1KB total. 45 // 46 // TODO: determine if a pathological case could blow this up (eg 1MB link 47 // paths + slow subscriber -> 16GiB queue) 48 let (b, _) = broadcast::channel(16_384); 49 let consumer_sender = b.clone(); 50 let (d, _) = broadcast::channel(16_384); 51 let consumer_delayed_sender = d.clone(); 52 53 let delay = Duration::from_secs(21); 54 let (delay_queue_sender, delay_queue_receiver) = removable_delay_queue(delay); 55 56 let shutdown = CancellationToken::new(); 57 58 let ctrlc_shutdown = shutdown.clone(); 59 ctrlc::set_handler(move || ctrlc_shutdown.cancel()).expect("failed to set ctrl-c handler"); 60 61 let args = Args::parse(); 62 63 if let Err(e) = install_metrics_server() { 64 log::error!("failed to install metrics server: {e:?}"); 65 }; 66 67 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new(); 68 69 let server_shutdown = shutdown.clone(); 70 tasks.spawn(async move { 71 server::serve(b, d, server_shutdown).await?; 72 Ok(()) 73 }); 74 75 let consumer_shutdown = shutdown.clone(); 76 tasks.spawn(async move { 77 consumer::consume( 78 consumer_sender, 79 delay_queue_sender, 80 args.jetstream, 81 None, 82 args.jetstream_no_zstd, 83 consumer_shutdown, 84 ) 85 .await?; 86 Ok(()) 87 }); 88 89 let delay_shutdown = shutdown.clone(); 90 tasks.spawn(async move { 91 delay::to_broadcast( 92 delay_queue_receiver, 93 consumer_delayed_sender, 94 delay_shutdown, 95 ) 96 .await?; 97 Ok(()) 98 }); 99 100 tokio::select! { 101 _ = shutdown.cancelled() => log::warn!("shutdown requested"), 102 Some(r) = tasks.join_next() => { 103 log::warn!("a task exited, shutting down: {r:?}"); 104 shutdown.cancel(); 105 } 106 } 107 108 tokio::select! { 109 _ = async { 110 while let Some(completed) = tasks.join_next().await { 111 log::info!("shutdown: task completed: {completed:?}"); 112 } 113 } => {}, 114 _ = tokio::time::sleep(std::time::Duration::from_secs(3)) => { 115 log::info!("shutdown: not all tasks completed on time. aborting..."); 116 tasks.shutdown().await; 117 }, 118 } 119 120 log::info!("bye!"); 121 122 Ok(()) 123} 124 125fn install_metrics_server() -> Result<(), metrics_exporter_prometheus::BuildError> { 126 log::info!("installing metrics server..."); 127 let host = [0, 0, 0, 0]; 128 let port = 8765; 129 PrometheusBuilder::new() 130 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 131 .set_bucket_duration(std::time::Duration::from_secs(300))? 132 .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here. 133 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage) 134 .with_http_listener((host, port)) 135 .install()?; 136 log::info!( 137 "metrics server installed! listening on http://{}.{}.{}.{}:{port}", 138 host[0], 139 host[1], 140 host[2], 141 host[3] 142 ); 143 Ok(()) 144}