forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use clap::Parser;
2use jetstream::events::Cursor;
3use metrics::{describe_gauge, gauge, Unit};
4use metrics_exporter_prometheus::PrometheusBuilder;
5use std::path::PathBuf;
6use std::time::{Duration, SystemTime};
7use tokio::task::JoinSet;
8use ufos::consumer;
9use ufos::file_consumer;
10use ufos::server;
11use ufos::storage::{StorageWhatever, StoreBackground, StoreReader, StoreWriter};
12use ufos::storage_fjall::FjallStorage;
13use ufos::store_types::SketchSecretPrefix;
14use ufos::{nice_duration, ConsumerInfo};
15
16#[cfg(not(target_env = "msvc"))]
17use tikv_jemallocator::Jemalloc;
18
19#[cfg(not(target_env = "msvc"))]
20#[global_allocator]
21static GLOBAL: Jemalloc = Jemalloc;
22
23/// Aggregate links in the at-mosphere
24#[derive(Parser, Debug, Clone)]
25#[command(version, about, long_about = None)]
26struct Args {
27 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:
28 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2'
29 #[arg(long)]
30 jetstream: String,
31 /// allow changing jetstream endpoints
32 #[arg(long, action)]
33 jetstream_force: bool,
34 /// don't request zstd-compressed jetstream events
35 ///
36 /// reduces CPU at the expense of more ingress bandwidth
37 #[arg(long, action)]
38 jetstream_no_zstd: bool,
39 /// Location to store persist data to disk
40 #[arg(long)]
41 data: PathBuf,
42 /// DEBUG: don't start the jetstream consumer or its write loop
43 #[arg(long, action)]
44 pause_writer: bool,
45 /// Adjust runtime settings like background task intervals for efficient backfill
46 #[arg(long, action)]
47 backfill: bool,
48 /// DEBUG: force the rw loop to fall behind by pausing it
49 /// todo: restore this
50 #[arg(long, action)]
51 pause_rw: bool,
52 /// reset the rollup cursor, scrape through missed things in the past (backfill)
53 #[arg(long, action)]
54 reroll: bool,
55 /// DEBUG: interpret jetstream as a file fixture
56 #[arg(long, action)]
57 jetstream_fixture: bool,
58}
59
60#[tokio::main]
61async fn main() -> anyhow::Result<()> {
62 env_logger::init();
63
64 let args = Args::parse();
65 let jetstream = args.jetstream.clone();
66 let (read_store, write_store, cursor, sketch_secret) = FjallStorage::init(
67 args.data.clone(),
68 jetstream,
69 args.jetstream_force,
70 Default::default(),
71 )?;
72 go(args, read_store, write_store, cursor, sketch_secret).await?;
73 Ok(())
74}
75
76async fn go<B: StoreBackground + 'static>(
77 args: Args,
78 read_store: impl StoreReader + 'static + Clone,
79 mut write_store: impl StoreWriter<B> + 'static,
80 cursor: Option<Cursor>,
81 sketch_secret: SketchSecretPrefix,
82) -> anyhow::Result<()> {
83 let mut whatever_tasks: JoinSet<anyhow::Result<()>> = JoinSet::new();
84 let mut consumer_tasks: JoinSet<anyhow::Result<()>> = JoinSet::new();
85
86 println!("starting server with storage...");
87 let serving = server::serve(read_store.clone());
88 whatever_tasks.spawn(async move {
89 serving.await.map_err(|e| {
90 log::warn!("server ended: {e}");
91 anyhow::anyhow!(e)
92 })
93 });
94
95 if args.pause_writer {
96 log::info!("not starting jetstream or the write loop.");
97 for t in whatever_tasks.join_all().await {
98 if let Err(e) = t {
99 return Err(anyhow::anyhow!(e));
100 }
101 }
102 return Ok(());
103 }
104
105 let batches = if args.jetstream_fixture {
106 log::info!("starting with jestream file fixture: {:?}", args.jetstream);
107 file_consumer::consume(args.jetstream.into(), sketch_secret, cursor).await?
108 } else {
109 log::info!(
110 "starting consumer with cursor: {cursor:?} from {:?} ago",
111 cursor.map(|c| c.elapsed())
112 );
113 consumer::consume(&args.jetstream, cursor, false, sketch_secret).await?
114 };
115
116 let rolling = write_store
117 .background_tasks(args.reroll)?
118 .run(args.backfill);
119 whatever_tasks.spawn(async move {
120 rolling
121 .await
122 .inspect_err(|e| log::warn!("rollup ended: {e}"))?;
123 Ok(())
124 });
125
126 consumer_tasks.spawn(async move {
127 write_store
128 .receive_batches(batches)
129 .await
130 .inspect_err(|e| log::warn!("consumer ended: {e}"))?;
131 Ok(())
132 });
133
134 whatever_tasks.spawn(async move {
135 do_update_stuff(read_store).await;
136 log::warn!("status task ended");
137 Ok(())
138 });
139
140 install_metrics_server()?;
141
142 for (i, t) in consumer_tasks.join_all().await.iter().enumerate() {
143 log::warn!("task {i} done: {t:?}");
144 }
145
146 println!("consumer tasks all completed, killing the others");
147 whatever_tasks.shutdown().await;
148
149 println!("bye!");
150
151 Ok(())
152}
153
154fn install_metrics_server() -> anyhow::Result<()> {
155 log::info!("installing metrics server...");
156 let host = [0, 0, 0, 0];
157 let port = 8765;
158 PrometheusBuilder::new()
159 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
160 .set_bucket_duration(Duration::from_secs(60))?
161 .set_bucket_count(std::num::NonZero::new(10).unwrap()) // count * duration = 10 mins. stuff doesn't happen that fast here.
162 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage)
163 .with_http_listener((host, port))
164 .install()?;
165 log::info!(
166 "metrics server installed! listening on http://{}.{}.{}.{}:{port}",
167 host[0],
168 host[1],
169 host[2],
170 host[3]
171 );
172 Ok(())
173}
174
175async fn do_update_stuff(read_store: impl StoreReader) {
176 describe_gauge!(
177 "persisted_cursor_age",
178 Unit::Microseconds,
179 "microseconds between our clock and the latest persisted event's cursor"
180 );
181 describe_gauge!(
182 "rollup_cursor_age",
183 Unit::Microseconds,
184 "microseconds between our clock and the latest rollup cursor"
185 );
186 let started_at = std::time::SystemTime::now();
187 let mut first_cursor = None;
188 let mut first_rollup = None;
189 let mut last_at = std::time::SystemTime::now();
190 let mut last_cursor = None;
191 let mut last_rollup = None;
192 let mut interval = tokio::time::interval(std::time::Duration::from_secs(4));
193 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
194 loop {
195 interval.tick().await;
196 read_store.update_metrics();
197 match read_store.get_consumer_info().await {
198 Err(e) => log::warn!("failed to get jetstream consumer info: {e:?}"),
199 Ok(ConsumerInfo::Jetstream {
200 latest_cursor,
201 rollup_cursor,
202 ..
203 }) => {
204 let now = std::time::SystemTime::now();
205 let latest_cursor = latest_cursor.map(Cursor::from_raw_u64);
206 let rollup_cursor = rollup_cursor.map(Cursor::from_raw_u64);
207 backfill_info(
208 latest_cursor,
209 rollup_cursor,
210 last_cursor,
211 last_rollup,
212 last_at,
213 first_cursor,
214 first_rollup,
215 started_at,
216 now,
217 );
218 first_cursor = first_cursor.or(latest_cursor);
219 first_rollup = first_rollup.or(rollup_cursor);
220 last_cursor = latest_cursor;
221 last_rollup = rollup_cursor;
222 last_at = now;
223 }
224 }
225 }
226}
227
228#[allow(clippy::too_many_arguments)]
229fn backfill_info(
230 latest_cursor: Option<Cursor>,
231 rollup_cursor: Option<Cursor>,
232 last_cursor: Option<Cursor>,
233 last_rollup: Option<Cursor>,
234 last_at: SystemTime,
235 first_cursor: Option<Cursor>,
236 first_rollup: Option<Cursor>,
237 started_at: SystemTime,
238 now: SystemTime,
239) {
240 if let Some(cursor) = latest_cursor {
241 gauge!("persisted_cursor_age").set(cursor.elapsed_micros_f64());
242 }
243 if let Some(cursor) = rollup_cursor {
244 gauge!("rollup_cursor_age").set(cursor.elapsed_micros_f64());
245 }
246
247 let nice_dt_two_maybes = |earlier: Option<Cursor>, later: Option<Cursor>| match (earlier, later)
248 {
249 (Some(earlier), Some(later)) => match later.duration_since(&earlier) {
250 Ok(dt) => nice_duration(dt),
251 Err(e) => {
252 let rev_dt = e.duration();
253 format!("+{}", nice_duration(rev_dt))
254 }
255 },
256 _ => "unknown".to_string(),
257 };
258
259 let rate = |mlatest: Option<Cursor>, msince: Option<Cursor>, real: Duration| {
260 mlatest
261 .zip(msince)
262 .map(|(latest, since)| {
263 latest
264 .duration_since(&since)
265 .unwrap_or(Duration::from_millis(1))
266 })
267 .map(|dtc| format!("{:.2}", dtc.as_secs_f64() / real.as_secs_f64()))
268 .unwrap_or("??".into())
269 };
270
271 let dt_real = now
272 .duration_since(last_at)
273 .unwrap_or(Duration::from_millis(1));
274
275 let dt_real_total = now
276 .duration_since(started_at)
277 .unwrap_or(Duration::from_millis(1));
278
279 let cursor_rate = rate(latest_cursor, last_cursor, dt_real);
280 let cursor_avg = rate(latest_cursor, first_cursor, dt_real_total);
281
282 let rollup_rate = rate(rollup_cursor, last_rollup, dt_real);
283 let rollup_avg = rate(rollup_cursor, first_rollup, dt_real_total);
284
285 log::trace!(
286 "cursor: {} behind (→{}, {cursor_rate}x, {cursor_avg}x avg). rollup: {} behind (→{}, {rollup_rate}x, {rollup_avg}x avg).",
287 latest_cursor.map(|c| c.elapsed().map(nice_duration).unwrap_or("++".to_string())).unwrap_or("?".to_string()),
288 nice_dt_two_maybes(last_cursor, latest_cursor),
289 rollup_cursor.map(|c| c.elapsed().map(nice_duration).unwrap_or("++".to_string())).unwrap_or("?".to_string()),
290 nice_dt_two_maybes(last_rollup, rollup_cursor),
291 );
292}