forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use spacedust::consumer;
2use spacedust::delay;
3use spacedust::error::MainTaskError;
4use spacedust::removable_delay_queue::removable_delay_queue;
5use spacedust::server;
6
7use clap::Parser;
8use metrics_exporter_prometheus::PrometheusBuilder;
9use std::time::Duration;
10use tokio::sync::broadcast;
11use tokio_util::sync::CancellationToken;
12
13/// Aggregate links in the at-mosphere
14#[derive(Parser, Debug, Clone)]
15#[command(version, about, long_about = None)]
16struct Args {
17 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:
18 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2'
19 #[arg(long)]
20 jetstream: String,
21 /// don't request zstd-compressed jetstream events
22 ///
23 /// reduces CPU at the expense of more ingress bandwidth
24 #[arg(long, action)]
25 jetstream_no_zstd: bool,
26}
27
28#[tokio::main]
29async fn main() -> Result<(), String> {
30 env_logger::init();
31
32 // tokio broadcast keeps a single main output queue for all subscribers.
33 // each subscriber clones off a copy of an individual value for each recv.
34 // since there's no large per-client buffer, we can make this one kind of
35 // big and accommodate more slow/bursty clients.
36 //
37 // in fact, we *could* even keep lagging clients alive, inserting lag-
38 // indicating messages to their output.... but for now we'll drop them to
39 // avoid accumulating zombies.
40 //
41 // events on the channel are individual links as they are discovered. a link
42 // contains a source and a target. the target is an at-uri, so it's up to
43 // ~1KB max; source is a collection + link path, which can be more but in
44 // practice the whole link rarely approaches 1KB total.
45 //
46 // TODO: determine if a pathological case could blow this up (eg 1MB link
47 // paths + slow subscriber -> 16GiB queue)
48 let (b, _) = broadcast::channel(16_384);
49 let consumer_sender = b.clone();
50 let (d, _) = broadcast::channel(16_384);
51 let consumer_delayed_sender = d.clone();
52
53 let delay = Duration::from_secs(21);
54 let (delay_queue_sender, delay_queue_receiver) = removable_delay_queue(delay);
55
56 let shutdown = CancellationToken::new();
57
58 let ctrlc_shutdown = shutdown.clone();
59 ctrlc::set_handler(move || ctrlc_shutdown.cancel()).expect("failed to set ctrl-c handler");
60
61 let args = Args::parse();
62
63 if let Err(e) = install_metrics_server() {
64 log::error!("failed to install metrics server: {e:?}");
65 };
66
67 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new();
68
69 let server_shutdown = shutdown.clone();
70 tasks.spawn(async move {
71 server::serve(b, d, server_shutdown).await?;
72 Ok(())
73 });
74
75 let consumer_shutdown = shutdown.clone();
76 tasks.spawn(async move {
77 consumer::consume(
78 consumer_sender,
79 delay_queue_sender,
80 args.jetstream,
81 None,
82 args.jetstream_no_zstd,
83 consumer_shutdown,
84 )
85 .await?;
86 Ok(())
87 });
88
89 let delay_shutdown = shutdown.clone();
90 tasks.spawn(async move {
91 delay::to_broadcast(
92 delay_queue_receiver,
93 consumer_delayed_sender,
94 delay_shutdown,
95 )
96 .await?;
97 Ok(())
98 });
99
100 tokio::select! {
101 _ = shutdown.cancelled() => log::warn!("shutdown requested"),
102 Some(r) = tasks.join_next() => {
103 log::warn!("a task exited, shutting down: {r:?}");
104 shutdown.cancel();
105 }
106 }
107
108 tokio::select! {
109 _ = async {
110 while let Some(completed) = tasks.join_next().await {
111 log::info!("shutdown: task completed: {completed:?}");
112 }
113 } => {},
114 _ = tokio::time::sleep(std::time::Duration::from_secs(3)) => {
115 log::info!("shutdown: not all tasks completed on time. aborting...");
116 tasks.shutdown().await;
117 },
118 }
119
120 log::info!("bye!");
121
122 Ok(())
123}
124
125fn install_metrics_server() -> Result<(), metrics_exporter_prometheus::BuildError> {
126 log::info!("installing metrics server...");
127 let host = [0, 0, 0, 0];
128 let port = 8765;
129 PrometheusBuilder::new()
130 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
131 .set_bucket_duration(std::time::Duration::from_secs(300))?
132 .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here.
133 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage)
134 .with_http_listener((host, port))
135 .install()?;
136 log::info!(
137 "metrics server installed! listening on http://{}.{}.{}.{}:{port}",
138 host[0],
139 host[1],
140 host[2],
141 host[3]
142 );
143 Ok(())
144}