forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use clap::Parser;
2use jetstream::events::Cursor;
3use metrics::{describe_gauge, gauge, Unit};
4use metrics_exporter_prometheus::PrometheusBuilder;
5use std::path::PathBuf;
6use std::time::{Duration, SystemTime};
7use ufos::consumer;
8use ufos::file_consumer;
9use ufos::server;
10use ufos::storage::{StorageWhatever, StoreBackground, StoreReader, StoreWriter};
11use ufos::storage_fjall::FjallStorage;
12use ufos::store_types::SketchSecretPrefix;
13use ufos::{nice_duration, ConsumerInfo};
14
15#[cfg(not(target_env = "msvc"))]
16use tikv_jemallocator::Jemalloc;
17
18#[cfg(not(target_env = "msvc"))]
19#[global_allocator]
20static GLOBAL: Jemalloc = Jemalloc;
21
22/// Aggregate links in the at-mosphere
23#[derive(Parser, Debug, Clone)]
24#[command(version, about, long_about = None)]
25struct Args {
26 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:
27 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2'
28 #[arg(long)]
29 jetstream: String,
30 /// allow changing jetstream endpoints
31 #[arg(long, action)]
32 jetstream_force: bool,
33 /// don't request zstd-compressed jetstream events
34 ///
35 /// reduces CPU at the expense of more ingress bandwidth
36 #[arg(long, action)]
37 jetstream_no_zstd: bool,
38 /// Location to store persist data to disk
39 #[arg(long)]
40 data: PathBuf,
41 /// DEBUG: don't start the jetstream consumer or its write loop
42 #[arg(long, action)]
43 pause_writer: bool,
44 /// Adjust runtime settings like background task intervals for efficient backfill
45 #[arg(long, action)]
46 backfill: bool,
47 /// DEBUG: force the rw loop to fall behind by pausing it
48 /// todo: restore this
49 #[arg(long, action)]
50 pause_rw: bool,
51 /// reset the rollup cursor, scrape through missed things in the past (backfill)
52 #[arg(long, action)]
53 reroll: bool,
54 /// DEBUG: interpret jetstream as a file fixture
55 #[arg(long, action)]
56 jetstream_fixture: bool,
57}
58
59#[tokio::main]
60async fn main() -> anyhow::Result<()> {
61 env_logger::init();
62
63 let args = Args::parse();
64 let jetstream = args.jetstream.clone();
65 let (read_store, write_store, cursor, sketch_secret) = FjallStorage::init(
66 args.data.clone(),
67 jetstream,
68 args.jetstream_force,
69 Default::default(),
70 )?;
71 go(args, read_store, write_store, cursor, sketch_secret).await?;
72 Ok(())
73}
74
75async fn go<B: StoreBackground>(
76 args: Args,
77 read_store: impl StoreReader + 'static + Clone,
78 mut write_store: impl StoreWriter<B> + 'static,
79 cursor: Option<Cursor>,
80 sketch_secret: SketchSecretPrefix,
81) -> anyhow::Result<()> {
82 println!("starting server with storage...");
83 let serving = server::serve(read_store.clone());
84
85 if args.pause_writer {
86 log::info!("not starting jetstream or the write loop.");
87 serving.await.map_err(|e| anyhow::anyhow!(e))?;
88 return Ok(());
89 }
90
91 let batches = if args.jetstream_fixture {
92 log::info!("starting with jestream file fixture: {:?}", args.jetstream);
93 file_consumer::consume(args.jetstream.into(), sketch_secret, cursor).await?
94 } else {
95 log::info!(
96 "starting consumer with cursor: {cursor:?} from {:?} ago",
97 cursor.map(|c| c.elapsed())
98 );
99 consumer::consume(&args.jetstream, cursor, false, sketch_secret).await?
100 };
101
102 let rolling = write_store
103 .background_tasks(args.reroll)?
104 .run(args.backfill);
105 let consuming = write_store.receive_batches(batches);
106
107 let stating = do_update_stuff(read_store);
108
109 install_metrics_server()?;
110
111 tokio::select! {
112 z = serving => log::warn!("serve task ended: {z:?}"),
113 z = rolling => log::warn!("rollup task ended: {z:?}"),
114 z = consuming => log::warn!("consuming task ended: {z:?}"),
115 z = stating => log::warn!("status task ended: {z:?}"),
116 };
117
118 println!("bye!");
119
120 Ok(())
121}
122
123fn install_metrics_server() -> anyhow::Result<()> {
124 log::info!("installing metrics server...");
125 let host = [0, 0, 0, 0];
126 let port = 8765;
127 PrometheusBuilder::new()
128 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
129 .set_bucket_duration(Duration::from_secs(60))?
130 .set_bucket_count(std::num::NonZero::new(10).unwrap()) // count * duration = 10 mins. stuff doesn't happen that fast here.
131 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage)
132 .with_http_listener((host, port))
133 .install()?;
134 log::info!(
135 "metrics server installed! listening on http://{}.{}.{}.{}:{port}",
136 host[0],
137 host[1],
138 host[2],
139 host[3]
140 );
141 Ok(())
142}
143
144async fn do_update_stuff(read_store: impl StoreReader) {
145 describe_gauge!(
146 "persisted_cursor_age",
147 Unit::Microseconds,
148 "microseconds between our clock and the latest persisted event's cursor"
149 );
150 describe_gauge!(
151 "rollup_cursor_age",
152 Unit::Microseconds,
153 "microseconds between our clock and the latest rollup cursor"
154 );
155 let started_at = std::time::SystemTime::now();
156 let mut first_cursor = None;
157 let mut first_rollup = None;
158 let mut last_at = std::time::SystemTime::now();
159 let mut last_cursor = None;
160 let mut last_rollup = None;
161 let mut interval = tokio::time::interval(std::time::Duration::from_secs(4));
162 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
163 loop {
164 interval.tick().await;
165 match read_store.get_consumer_info().await {
166 Err(e) => log::warn!("failed to get jetstream consumer info: {e:?}"),
167 Ok(ConsumerInfo::Jetstream {
168 latest_cursor,
169 rollup_cursor,
170 ..
171 }) => {
172 let now = std::time::SystemTime::now();
173 let latest_cursor = latest_cursor.map(Cursor::from_raw_u64);
174 let rollup_cursor = rollup_cursor.map(Cursor::from_raw_u64);
175 backfill_info(
176 latest_cursor,
177 rollup_cursor,
178 last_cursor,
179 last_rollup,
180 last_at,
181 first_cursor,
182 first_rollup,
183 started_at,
184 now,
185 );
186 first_cursor = first_cursor.or(latest_cursor);
187 first_rollup = first_rollup.or(rollup_cursor);
188 last_cursor = latest_cursor;
189 last_rollup = rollup_cursor;
190 last_at = now;
191 }
192 }
193 }
194}
195
196#[allow(clippy::too_many_arguments)]
197fn backfill_info(
198 latest_cursor: Option<Cursor>,
199 rollup_cursor: Option<Cursor>,
200 last_cursor: Option<Cursor>,
201 last_rollup: Option<Cursor>,
202 last_at: SystemTime,
203 first_cursor: Option<Cursor>,
204 first_rollup: Option<Cursor>,
205 started_at: SystemTime,
206 now: SystemTime,
207) {
208 if let Some(cursor) = latest_cursor {
209 gauge!("persisted_cursor_age").set(cursor.elapsed_micros_f64());
210 }
211 if let Some(cursor) = rollup_cursor {
212 gauge!("rollup_cursor_age").set(cursor.elapsed_micros_f64());
213 }
214
215 let nice_dt_two_maybes = |earlier: Option<Cursor>, later: Option<Cursor>| match (earlier, later)
216 {
217 (Some(earlier), Some(later)) => match later.duration_since(&earlier) {
218 Ok(dt) => nice_duration(dt),
219 Err(e) => {
220 let rev_dt = e.duration();
221 format!("+{}", nice_duration(rev_dt))
222 }
223 },
224 _ => "unknown".to_string(),
225 };
226
227 let rate = |mlatest: Option<Cursor>, msince: Option<Cursor>, real: Duration| {
228 mlatest
229 .zip(msince)
230 .map(|(latest, since)| {
231 latest
232 .duration_since(&since)
233 .unwrap_or(Duration::from_millis(1))
234 })
235 .map(|dtc| format!("{:.2}", dtc.as_secs_f64() / real.as_secs_f64()))
236 .unwrap_or("??".into())
237 };
238
239 let dt_real = now
240 .duration_since(last_at)
241 .unwrap_or(Duration::from_millis(1));
242
243 let dt_real_total = now
244 .duration_since(started_at)
245 .unwrap_or(Duration::from_millis(1));
246
247 let cursor_rate = rate(latest_cursor, last_cursor, dt_real);
248 let cursor_avg = rate(latest_cursor, first_cursor, dt_real_total);
249
250 let rollup_rate = rate(rollup_cursor, last_rollup, dt_real);
251 let rollup_avg = rate(rollup_cursor, first_rollup, dt_real_total);
252
253 log::trace!(
254 "cursor: {} behind (→{}, {cursor_rate}x, {cursor_avg}x avg). rollup: {} behind (→{}, {rollup_rate}x, {rollup_avg}x avg).",
255 latest_cursor.map(|c| c.elapsed().map(nice_duration).unwrap_or("++".to_string())).unwrap_or("?".to_string()),
256 nice_dt_two_maybes(last_cursor, latest_cursor),
257 rollup_cursor.map(|c| c.elapsed().map(nice_duration).unwrap_or("++".to_string())).unwrap_or("?".to_string()),
258 nice_dt_two_maybes(last_rollup, rollup_cursor),
259 );
260}