forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use clap::Parser;
2use jetstream::events::Cursor;
3use metrics::{describe_gauge, gauge, Unit};
4use metrics_exporter_prometheus::PrometheusBuilder;
5use std::path::PathBuf;
6use std::time::{Duration, SystemTime};
7use tokio::task::JoinSet;
8use ufos::consumer;
9use ufos::file_consumer;
10use ufos::server;
11use ufos::storage::{StorageWhatever, StoreBackground, StoreReader, StoreWriter};
12use ufos::storage_fjall::FjallStorage;
13use ufos::store_types::SketchSecretPrefix;
14use ufos::{nice_duration, ConsumerInfo};
15
16#[cfg(not(target_env = "msvc"))]
17use tikv_jemallocator::Jemalloc;
18
19#[cfg(not(target_env = "msvc"))]
20#[global_allocator]
21static GLOBAL: Jemalloc = Jemalloc;
22
23/// Aggregate links in the at-mosphere
24#[derive(Parser, Debug, Clone)]
25#[command(version, about, long_about = None)]
26struct Args {
27 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:
28 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2'
29 #[arg(long)]
30 jetstream: String,
31 /// allow changing jetstream endpoints
32 #[arg(long, action)]
33 jetstream_force: bool,
34 /// don't request zstd-compressed jetstream events
35 ///
36 /// reduces CPU at the expense of more ingress bandwidth
37 #[arg(long, action)]
38 jetstream_no_zstd: bool,
39 /// Location to store persist data to disk
40 #[arg(long)]
41 data: PathBuf,
42 /// DEBUG: don't start the jetstream consumer or its write loop
43 #[arg(long, action)]
44 pause_writer: bool,
45 /// Adjust runtime settings like background task intervals for efficient backfill
46 #[arg(long, action)]
47 backfill: bool,
48 /// DEBUG: force the rw loop to fall behind by pausing it
49 /// todo: restore this
50 #[arg(long, action)]
51 pause_rw: bool,
52 /// reset the rollup cursor, scrape through missed things in the past (backfill)
53 #[arg(long, action)]
54 reroll: bool,
55 /// DEBUG: interpret jetstream as a file fixture
56 #[arg(long, action)]
57 jetstream_fixture: bool,
58}
59
60#[tokio::main]
61async fn main() -> anyhow::Result<()> {
62 env_logger::init();
63
64 let args = Args::parse();
65 let jetstream = args.jetstream.clone();
66 let (read_store, write_store, cursor, sketch_secret) = FjallStorage::init(
67 args.data.clone(),
68 jetstream,
69 args.jetstream_force,
70 Default::default(),
71 )?;
72 go(args, read_store, write_store, cursor, sketch_secret).await?;
73 Ok(())
74}
75
76async fn go<B: StoreBackground + 'static>(
77 args: Args,
78 read_store: impl StoreReader + 'static + Clone,
79 mut write_store: impl StoreWriter<B> + 'static,
80 cursor: Option<Cursor>,
81 sketch_secret: SketchSecretPrefix,
82) -> anyhow::Result<()> {
83 let mut tasks: JoinSet<anyhow::Result<()>> = JoinSet::new();
84
85 println!("starting server with storage...");
86 let serving = server::serve(read_store.clone());
87 tasks.spawn(async move {
88 serving.await.map_err(|e| {
89 log::warn!("server ended: {e}");
90 anyhow::anyhow!(e)
91 })
92 });
93
94 if args.pause_writer {
95 log::info!("not starting jetstream or the write loop.");
96 for t in tasks.join_all().await {
97 if let Err(e) = t {
98 return Err(anyhow::anyhow!(e));
99 }
100 }
101 return Ok(());
102 }
103
104 let batches = if args.jetstream_fixture {
105 log::info!("starting with jestream file fixture: {:?}", args.jetstream);
106 file_consumer::consume(args.jetstream.into(), sketch_secret, cursor).await?
107 } else {
108 log::info!(
109 "starting consumer with cursor: {cursor:?} from {:?} ago",
110 cursor.map(|c| c.elapsed())
111 );
112 consumer::consume(&args.jetstream, cursor, false, sketch_secret).await?
113 };
114
115 let rolling = write_store
116 .background_tasks(args.reroll)?
117 .run(args.backfill);
118 tasks.spawn(async move {
119 rolling
120 .await
121 .inspect_err(|e| log::warn!("rollup ended: {e}"))?;
122 Ok(())
123 });
124
125 tasks.spawn(async move {
126 write_store
127 .receive_batches(batches)
128 .await
129 .inspect_err(|e| log::warn!("consumer ended: {e}"))?;
130 Ok(())
131 });
132
133 tasks.spawn(async move {
134 do_update_stuff(read_store).await;
135 log::warn!("status task ended");
136 Ok(())
137 });
138
139 install_metrics_server()?;
140
141 for (i, t) in tasks.join_all().await.iter().enumerate() {
142 log::warn!("task {i} done: {t:?}");
143 }
144
145 println!("bye!");
146
147 Ok(())
148}
149
150fn install_metrics_server() -> anyhow::Result<()> {
151 log::info!("installing metrics server...");
152 let host = [0, 0, 0, 0];
153 let port = 8765;
154 PrometheusBuilder::new()
155 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
156 .set_bucket_duration(Duration::from_secs(60))?
157 .set_bucket_count(std::num::NonZero::new(10).unwrap()) // count * duration = 10 mins. stuff doesn't happen that fast here.
158 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage)
159 .with_http_listener((host, port))
160 .install()?;
161 log::info!(
162 "metrics server installed! listening on http://{}.{}.{}.{}:{port}",
163 host[0],
164 host[1],
165 host[2],
166 host[3]
167 );
168 Ok(())
169}
170
171async fn do_update_stuff(read_store: impl StoreReader) {
172 describe_gauge!(
173 "persisted_cursor_age",
174 Unit::Microseconds,
175 "microseconds between our clock and the latest persisted event's cursor"
176 );
177 describe_gauge!(
178 "rollup_cursor_age",
179 Unit::Microseconds,
180 "microseconds between our clock and the latest rollup cursor"
181 );
182 let started_at = std::time::SystemTime::now();
183 let mut first_cursor = None;
184 let mut first_rollup = None;
185 let mut last_at = std::time::SystemTime::now();
186 let mut last_cursor = None;
187 let mut last_rollup = None;
188 let mut interval = tokio::time::interval(std::time::Duration::from_secs(4));
189 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
190 loop {
191 interval.tick().await;
192 read_store.update_metrics();
193 match read_store.get_consumer_info().await {
194 Err(e) => log::warn!("failed to get jetstream consumer info: {e:?}"),
195 Ok(ConsumerInfo::Jetstream {
196 latest_cursor,
197 rollup_cursor,
198 ..
199 }) => {
200 let now = std::time::SystemTime::now();
201 let latest_cursor = latest_cursor.map(Cursor::from_raw_u64);
202 let rollup_cursor = rollup_cursor.map(Cursor::from_raw_u64);
203 backfill_info(
204 latest_cursor,
205 rollup_cursor,
206 last_cursor,
207 last_rollup,
208 last_at,
209 first_cursor,
210 first_rollup,
211 started_at,
212 now,
213 );
214 first_cursor = first_cursor.or(latest_cursor);
215 first_rollup = first_rollup.or(rollup_cursor);
216 last_cursor = latest_cursor;
217 last_rollup = rollup_cursor;
218 last_at = now;
219 }
220 }
221 }
222}
223
224#[allow(clippy::too_many_arguments)]
225fn backfill_info(
226 latest_cursor: Option<Cursor>,
227 rollup_cursor: Option<Cursor>,
228 last_cursor: Option<Cursor>,
229 last_rollup: Option<Cursor>,
230 last_at: SystemTime,
231 first_cursor: Option<Cursor>,
232 first_rollup: Option<Cursor>,
233 started_at: SystemTime,
234 now: SystemTime,
235) {
236 if let Some(cursor) = latest_cursor {
237 gauge!("persisted_cursor_age").set(cursor.elapsed_micros_f64());
238 }
239 if let Some(cursor) = rollup_cursor {
240 gauge!("rollup_cursor_age").set(cursor.elapsed_micros_f64());
241 }
242
243 let nice_dt_two_maybes = |earlier: Option<Cursor>, later: Option<Cursor>| match (earlier, later)
244 {
245 (Some(earlier), Some(later)) => match later.duration_since(&earlier) {
246 Ok(dt) => nice_duration(dt),
247 Err(e) => {
248 let rev_dt = e.duration();
249 format!("+{}", nice_duration(rev_dt))
250 }
251 },
252 _ => "unknown".to_string(),
253 };
254
255 let rate = |mlatest: Option<Cursor>, msince: Option<Cursor>, real: Duration| {
256 mlatest
257 .zip(msince)
258 .map(|(latest, since)| {
259 latest
260 .duration_since(&since)
261 .unwrap_or(Duration::from_millis(1))
262 })
263 .map(|dtc| format!("{:.2}", dtc.as_secs_f64() / real.as_secs_f64()))
264 .unwrap_or("??".into())
265 };
266
267 let dt_real = now
268 .duration_since(last_at)
269 .unwrap_or(Duration::from_millis(1));
270
271 let dt_real_total = now
272 .duration_since(started_at)
273 .unwrap_or(Duration::from_millis(1));
274
275 let cursor_rate = rate(latest_cursor, last_cursor, dt_real);
276 let cursor_avg = rate(latest_cursor, first_cursor, dt_real_total);
277
278 let rollup_rate = rate(rollup_cursor, last_rollup, dt_real);
279 let rollup_avg = rate(rollup_cursor, first_rollup, dt_real_total);
280
281 log::trace!(
282 "cursor: {} behind (→{}, {cursor_rate}x, {cursor_avg}x avg). rollup: {} behind (→{}, {rollup_rate}x, {rollup_avg}x avg).",
283 latest_cursor.map(|c| c.elapsed().map(nice_duration).unwrap_or("++".to_string())).unwrap_or("?".to_string()),
284 nice_dt_two_maybes(last_cursor, latest_cursor),
285 rollup_cursor.map(|c| c.elapsed().map(nice_duration).unwrap_or("++".to_string())).unwrap_or("?".to_string()),
286 nice_dt_two_maybes(last_rollup, rollup_cursor),
287 );
288}