forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use anyhow::{bail, Result};
2use clap::{Parser, ValueEnum};
3use metrics_exporter_prometheus::PrometheusBuilder;
4use std::net::SocketAddr;
5use std::num::NonZero;
6use std::path::PathBuf;
7use std::sync::{atomic::AtomicU32, Arc};
8use std::thread;
9use std::time;
10use tokio::runtime;
11use tokio_util::sync::CancellationToken;
12
13use constellation::consumer::consume;
14use constellation::server::serve;
15#[cfg(feature = "rocks")]
16use constellation::storage::RocksStorage;
17use constellation::storage::{LinkReader, LinkStorage, MemStorage, StorageStats};
18
19const MONITOR_INTERVAL: time::Duration = time::Duration::from_secs(15);
20
21/// Aggregate links in the at-mosphere
22#[derive(Parser, Debug)]
23#[command(version, about, long_about = None)]
24struct Args {
25 /// constellation server's listen address
26 #[arg(long)]
27 #[clap(default_value = "0.0.0.0:6789")]
28 bind: SocketAddr,
29 /// metrics server's listen address
30 #[arg(long)]
31 #[clap(default_value = "0.0.0.0:8765")]
32 bind_metrics: SocketAddr,
33 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:
34 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2'
35 #[arg(short, long)]
36 jetstream: String,
37 // TODO: make this part of rocks' own sub-config?
38 /// Where to store data on disk, for backends that use disk storage
39 #[arg(short, long)]
40 data: Option<PathBuf>,
41 /// Storage backend to use
42 #[arg(short, long)]
43 #[clap(value_enum, default_value_t = StorageBackend::Memory)]
44 backend: StorageBackend,
45 /// Initiate a database backup into this dir, if supported by the storage
46 #[arg(long)]
47 backup: Option<PathBuf>,
48 /// Start a background task to take backups every N hours
49 #[arg(long)]
50 backup_interval: Option<u64>,
51 /// Keep at most this many backups purging oldest first, requires --backup-interval
52 #[arg(long)]
53 max_old_backups: Option<usize>,
54 /// Saved jsonl from jetstream to use instead of a live subscription
55 #[arg(short, long)]
56 fixture: Option<PathBuf>,
57 /// run a scan across the target id table and write all key -> ids to id -> keys
58 #[arg(long, action)]
59 repair_target_ids: bool,
60}
61
62#[derive(Debug, Clone, ValueEnum)]
63enum StorageBackend {
64 Memory,
65 #[cfg(feature = "rocks")]
66 Rocks,
67}
68
69fn jetstream_url(provided: &str) -> String {
70 match provided {
71 "us-east-1" => "wss://jetstream1.us-east.bsky.network/subscribe".into(),
72 "us-east-2" => "wss://jetstream2.us-east.bsky.network/subscribe".into(),
73 "us-west-1" => "wss://jetstream1.us-west.bsky.network/subscribe".into(),
74 "us-west-2" => "wss://jetstream2.us-west.bsky.network/subscribe".into(),
75 custom => custom.into(),
76 }
77}
78
79fn main() -> Result<()> {
80 let args = Args::parse();
81
82 println!("starting with storage backend: {:?}...", args.backend);
83
84 let fixture = args.fixture;
85 if let Some(ref p) = fixture {
86 println!("using fixture at {p:?}...");
87 }
88
89 let stream = jetstream_url(&args.jetstream);
90 println!("using jetstream server {stream:?}...",);
91
92 let bind = args.bind;
93 let metrics_bind = args.bind_metrics;
94
95 let stay_alive = CancellationToken::new();
96
97 match args.backend {
98 StorageBackend::Memory => run(
99 MemStorage::new(),
100 fixture,
101 None,
102 stream,
103 bind,
104 metrics_bind,
105 stay_alive,
106 ),
107 #[cfg(feature = "rocks")]
108 StorageBackend::Rocks => {
109 let storage_dir = args.data.clone().unwrap_or("rocks.test".into());
110 println!("starting rocksdb...");
111 let mut rocks = RocksStorage::new(storage_dir)?;
112 if let Some(backup_dir) = args.backup {
113 let auto_backup = match (args.backup_interval, args.max_old_backups) {
114 (Some(interval_hrs), copies) => Some((interval_hrs, copies)),
115 (None, None) => None,
116 (None, Some(_)) => bail!("invalid backup config: --max-old-backups requires --backup-interval to be configured"),
117 };
118 rocks.start_backup(backup_dir, auto_backup, stay_alive.clone())?;
119 }
120 println!("rocks ready.");
121 std::thread::scope(|s| {
122 if args.repair_target_ids {
123 let rocks = rocks.clone();
124 let stay_alive = stay_alive.clone();
125 s.spawn(move || {
126 let rep = rocks.run_repair(time::Duration::from_millis(0), stay_alive);
127 eprintln!("repair finished: {rep:?}");
128 rep
129 });
130 }
131 s.spawn(|| {
132 let r = run(
133 rocks,
134 fixture,
135 args.data,
136 stream,
137 bind,
138 metrics_bind,
139 stay_alive,
140 );
141 eprintln!("run finished: {r:?}");
142 r
143 });
144 });
145 Ok(())
146 }
147 }
148}
149
150fn run(
151 mut storage: impl LinkStorage,
152 fixture: Option<PathBuf>,
153 data_dir: Option<PathBuf>,
154 stream: String,
155 bind: SocketAddr,
156 metrics_bind: SocketAddr,
157 stay_alive: CancellationToken,
158) -> Result<()> {
159 ctrlc::set_handler({
160 let mut desperation: u8 = 0;
161 let stay_alive = stay_alive.clone();
162 move || match desperation {
163 0 => {
164 println!("ok, shutting down...");
165 stay_alive.cancel();
166 desperation += 1;
167 }
168 1.. => panic!("fine, panicking!"),
169 }
170 })?;
171
172 let qsize = Arc::new(AtomicU32::new(0));
173
174 thread::scope(|s| {
175 let readable = storage.to_readable();
176
177 s.spawn({
178 let qsize = qsize.clone();
179 let stay_alive = stay_alive.clone();
180 let staying_alive = stay_alive.clone();
181 move || {
182 if let Err(e) = consume(storage, qsize, fixture, stream, staying_alive) {
183 eprintln!("jetstream finished with error: {e}");
184 }
185 stay_alive.drop_guard();
186 }
187 });
188
189 s.spawn({
190 let readable = readable.clone();
191 let stay_alive = stay_alive.clone();
192 let staying_alive = stay_alive.clone();
193 || {
194 runtime::Builder::new_multi_thread()
195 .worker_threads(1)
196 .max_blocking_threads(2)
197 .enable_all()
198 .build()
199 .expect("axum startup")
200 .block_on(async {
201 install_metrics_server(metrics_bind)?;
202 serve(readable, bind, staying_alive).await
203 })
204 .unwrap();
205 stay_alive.drop_guard();
206 }
207 });
208
209 s.spawn(move || { // monitor thread
210 let stay_alive = stay_alive.clone();
211 let check_alive = stay_alive.clone();
212
213 let process_collector = metrics_process::Collector::default();
214 process_collector.describe();
215 metrics::describe_gauge!(
216 "storage_available",
217 metrics::Unit::Bytes,
218 "available to be allocated"
219 );
220 metrics::describe_gauge!(
221 "storage_free",
222 metrics::Unit::Bytes,
223 "unused bytes in filesystem"
224 );
225 if let Some(ref p) = data_dir {
226 if let Err(e) = fs4::available_space(p) {
227 eprintln!("fs4 failed to get available space. may not be supported here? space metrics may be absent. e: {e:?}");
228 } else {
229 println!("disk space monitoring should work, watching at {p:?}");
230 }
231 }
232
233 'monitor: loop {
234 match readable.get_stats() {
235 Ok(StorageStats { dids, targetables, linking_records, .. }) => {
236 metrics::gauge!("storage.stats.dids").set(dids as f64);
237 metrics::gauge!("storage.stats.targetables").set(targetables as f64);
238 metrics::gauge!("storage.stats.linking_records").set(linking_records as f64);
239 }
240 Err(e) => eprintln!("failed to get stats: {e:?}"),
241 }
242
243 process_collector.collect();
244 if let Some(ref p) = data_dir {
245 if let Ok(avail) = fs4::available_space(p) {
246 metrics::gauge!("storage.available").set(avail as f64);
247 }
248 if let Ok(free) = fs4::free_space(p) {
249 metrics::gauge!("storage.free").set(free as f64);
250 }
251 }
252 let wait = time::Instant::now();
253 while wait.elapsed() < MONITOR_INTERVAL {
254 thread::sleep(time::Duration::from_millis(100));
255 if check_alive.is_cancelled() {
256 break 'monitor
257 }
258 }
259 }
260 stay_alive.drop_guard();
261 });
262 });
263
264 println!("byeeee");
265
266 Ok(())
267}
268
269fn install_metrics_server(metrics_bind: SocketAddr) -> Result<()> {
270 println!("installing metrics server...");
271 PrometheusBuilder::new()
272 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
273 .set_bucket_duration(time::Duration::from_secs(30))?
274 .set_bucket_count(NonZero::new(10).unwrap()) // count * duration = 5 mins. stuff doesn't happen that fast here.
275 .set_enable_unit_suffix(true)
276 .with_http_listener(metrics_bind)
277 .install()?;
278 println!("metrics server installed! listening at {metrics_bind:?}");
279 Ok(())
280}
281
282#[cfg(test)]
283mod tests {
284 use constellation::consumer::get_actionable;
285 use constellation::storage::{LinkReader, LinkStorage, MemStorage};
286
287 #[test]
288 fn test_create_like_integrated() {
289 let mut storage = MemStorage::new();
290
291 let rec = r#"{
292 "did":"did:plc:icprmty6ticzracr5urz4uum",
293 "time_us":1736448492661668,
294 "kind":"commit",
295 "commit":{"rev":"3lfddpt5qa62c","operation":"create","collection":"app.bsky.feed.like","rkey":"3lfddpt5djw2c","record":{
296 "$type":"app.bsky.feed.like",
297 "createdAt":"2025-01-09T18:48:10.412Z",
298 "subject":{"cid":"bafyreihazf62qvmusup55ojhkzwbmzee6rxtsug3e6eg33mnjrgthxvozu","uri":"at://did:plc:lphckw3dz4mnh3ogmfpdgt6z/app.bsky.feed.post/3lfdau5f7wk23"}
299 },
300 "cid":"bafyreidgcs2id7nsbp6co42ind2wcig3riwcvypwan6xdywyfqklovhdjq"}
301 }"#.parse().unwrap();
302 let (action, ts) = get_actionable(&rec).unwrap();
303 storage.push(&action, ts).unwrap();
304 assert_eq!(
305 storage
306 .get_count(
307 "at://did:plc:lphckw3dz4mnh3ogmfpdgt6z/app.bsky.feed.post/3lfdau5f7wk23",
308 "app.bsky.feed.like",
309 ".subject.uri"
310 )
311 .unwrap(),
312 1
313 );
314 }
315}