Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
at main 11 kB view raw
1use anyhow::{bail, Result}; 2use clap::{Parser, ValueEnum}; 3use metrics_exporter_prometheus::PrometheusBuilder; 4use std::net::SocketAddr; 5use std::num::NonZero; 6use std::path::PathBuf; 7use std::sync::{atomic::AtomicU32, Arc}; 8use std::thread; 9use std::time; 10use tokio::runtime; 11use tokio_util::sync::CancellationToken; 12 13use constellation::consumer::consume; 14use constellation::server::serve; 15#[cfg(feature = "rocks")] 16use constellation::storage::RocksStorage; 17use constellation::storage::{LinkReader, LinkStorage, MemStorage, StorageStats}; 18 19const MONITOR_INTERVAL: time::Duration = time::Duration::from_secs(15); 20 21/// Aggregate links in the at-mosphere 22#[derive(Parser, Debug)] 23#[command(version, about, long_about = None)] 24struct Args { 25 /// constellation server's listen address 26 #[arg(long)] 27 #[clap(default_value = "0.0.0.0:6789")] 28 bind: SocketAddr, 29 /// metrics server's listen address 30 #[arg(long)] 31 #[clap(default_value = "0.0.0.0:8765")] 32 bind_metrics: SocketAddr, 33 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: 34 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2' 35 #[arg(short, long)] 36 jetstream: String, 37 // TODO: make this part of rocks' own sub-config? 38 /// Where to store data on disk, for backends that use disk storage 39 #[arg(short, long)] 40 data: Option<PathBuf>, 41 /// Storage backend to use 42 #[arg(short, long)] 43 #[clap(value_enum, default_value_t = StorageBackend::Memory)] 44 backend: StorageBackend, 45 /// Initiate a database backup into this dir, if supported by the storage 46 #[arg(long)] 47 backup: Option<PathBuf>, 48 /// Start a background task to take backups every N hours 49 #[arg(long)] 50 backup_interval: Option<u64>, 51 /// Keep at most this many backups purging oldest first, requires --backup-interval 52 #[arg(long)] 53 max_old_backups: Option<usize>, 54 /// Saved jsonl from jetstream to use instead of a live subscription 55 #[arg(short, long)] 56 fixture: Option<PathBuf>, 57 /// run a scan across the target id table and write all key -> ids to id -> keys 58 #[arg(long, action)] 59 repair_target_ids: bool, 60} 61 62#[derive(Debug, Clone, ValueEnum)] 63enum StorageBackend { 64 Memory, 65 #[cfg(feature = "rocks")] 66 Rocks, 67} 68 69fn jetstream_url(provided: &str) -> String { 70 match provided { 71 "us-east-1" => "wss://jetstream1.us-east.bsky.network/subscribe".into(), 72 "us-east-2" => "wss://jetstream2.us-east.bsky.network/subscribe".into(), 73 "us-west-1" => "wss://jetstream1.us-west.bsky.network/subscribe".into(), 74 "us-west-2" => "wss://jetstream2.us-west.bsky.network/subscribe".into(), 75 custom => custom.into(), 76 } 77} 78 79fn main() -> Result<()> { 80 let args = Args::parse(); 81 82 println!("starting with storage backend: {:?}...", args.backend); 83 84 let fixture = args.fixture; 85 if let Some(ref p) = fixture { 86 println!("using fixture at {p:?}..."); 87 } 88 89 let stream = jetstream_url(&args.jetstream); 90 println!("using jetstream server {stream:?}...",); 91 92 let bind = args.bind; 93 let metrics_bind = args.bind_metrics; 94 95 let stay_alive = CancellationToken::new(); 96 97 match args.backend { 98 StorageBackend::Memory => run( 99 MemStorage::new(), 100 fixture, 101 None, 102 stream, 103 bind, 104 metrics_bind, 105 stay_alive, 106 ), 107 #[cfg(feature = "rocks")] 108 StorageBackend::Rocks => { 109 let storage_dir = args.data.clone().unwrap_or("rocks.test".into()); 110 println!("starting rocksdb..."); 111 let mut rocks = RocksStorage::new(storage_dir)?; 112 if let Some(backup_dir) = args.backup { 113 let auto_backup = match (args.backup_interval, args.max_old_backups) { 114 (Some(interval_hrs), copies) => Some((interval_hrs, copies)), 115 (None, None) => None, 116 (None, Some(_)) => bail!("invalid backup config: --max-old-backups requires --backup-interval to be configured"), 117 }; 118 rocks.start_backup(backup_dir, auto_backup, stay_alive.clone())?; 119 } 120 println!("rocks ready."); 121 std::thread::scope(|s| { 122 if args.repair_target_ids { 123 let rocks = rocks.clone(); 124 let stay_alive = stay_alive.clone(); 125 s.spawn(move || { 126 let rep = rocks.run_repair(time::Duration::from_millis(0), stay_alive); 127 eprintln!("repair finished: {rep:?}"); 128 rep 129 }); 130 } 131 s.spawn(|| { 132 let r = run( 133 rocks, 134 fixture, 135 args.data, 136 stream, 137 bind, 138 metrics_bind, 139 stay_alive, 140 ); 141 eprintln!("run finished: {r:?}"); 142 r 143 }); 144 }); 145 Ok(()) 146 } 147 } 148} 149 150fn run( 151 mut storage: impl LinkStorage, 152 fixture: Option<PathBuf>, 153 data_dir: Option<PathBuf>, 154 stream: String, 155 bind: SocketAddr, 156 metrics_bind: SocketAddr, 157 stay_alive: CancellationToken, 158) -> Result<()> { 159 ctrlc::set_handler({ 160 let mut desperation: u8 = 0; 161 let stay_alive = stay_alive.clone(); 162 move || match desperation { 163 0 => { 164 println!("ok, shutting down..."); 165 stay_alive.cancel(); 166 desperation += 1; 167 } 168 1.. => panic!("fine, panicking!"), 169 } 170 })?; 171 172 let qsize = Arc::new(AtomicU32::new(0)); 173 174 thread::scope(|s| { 175 let readable = storage.to_readable(); 176 177 s.spawn({ 178 let qsize = qsize.clone(); 179 let stay_alive = stay_alive.clone(); 180 let staying_alive = stay_alive.clone(); 181 move || { 182 if let Err(e) = consume(storage, qsize, fixture, stream, staying_alive) { 183 eprintln!("jetstream finished with error: {e}"); 184 } 185 stay_alive.drop_guard(); 186 } 187 }); 188 189 s.spawn({ 190 let readable = readable.clone(); 191 let stay_alive = stay_alive.clone(); 192 let staying_alive = stay_alive.clone(); 193 || { 194 runtime::Builder::new_multi_thread() 195 .worker_threads(1) 196 .max_blocking_threads(2) 197 .enable_all() 198 .build() 199 .expect("axum startup") 200 .block_on(async { 201 install_metrics_server(metrics_bind)?; 202 serve(readable, bind, staying_alive).await 203 }) 204 .unwrap(); 205 stay_alive.drop_guard(); 206 } 207 }); 208 209 s.spawn(move || { // monitor thread 210 let stay_alive = stay_alive.clone(); 211 let check_alive = stay_alive.clone(); 212 213 let process_collector = metrics_process::Collector::default(); 214 process_collector.describe(); 215 metrics::describe_gauge!( 216 "storage_available", 217 metrics::Unit::Bytes, 218 "available to be allocated" 219 ); 220 metrics::describe_gauge!( 221 "storage_free", 222 metrics::Unit::Bytes, 223 "unused bytes in filesystem" 224 ); 225 if let Some(ref p) = data_dir { 226 if let Err(e) = fs4::available_space(p) { 227 eprintln!("fs4 failed to get available space. may not be supported here? space metrics may be absent. e: {e:?}"); 228 } else { 229 println!("disk space monitoring should work, watching at {p:?}"); 230 } 231 } 232 233 'monitor: loop { 234 match readable.get_stats() { 235 Ok(StorageStats { dids, targetables, linking_records, .. }) => { 236 metrics::gauge!("storage.stats.dids").set(dids as f64); 237 metrics::gauge!("storage.stats.targetables").set(targetables as f64); 238 metrics::gauge!("storage.stats.linking_records").set(linking_records as f64); 239 } 240 Err(e) => eprintln!("failed to get stats: {e:?}"), 241 } 242 243 process_collector.collect(); 244 if let Some(ref p) = data_dir { 245 if let Ok(avail) = fs4::available_space(p) { 246 metrics::gauge!("storage.available").set(avail as f64); 247 } 248 if let Ok(free) = fs4::free_space(p) { 249 metrics::gauge!("storage.free").set(free as f64); 250 } 251 } 252 let wait = time::Instant::now(); 253 while wait.elapsed() < MONITOR_INTERVAL { 254 thread::sleep(time::Duration::from_millis(100)); 255 if check_alive.is_cancelled() { 256 break 'monitor 257 } 258 } 259 } 260 stay_alive.drop_guard(); 261 }); 262 }); 263 264 println!("byeeee"); 265 266 Ok(()) 267} 268 269fn install_metrics_server(metrics_bind: SocketAddr) -> Result<()> { 270 println!("installing metrics server..."); 271 PrometheusBuilder::new() 272 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 273 .set_bucket_duration(time::Duration::from_secs(30))? 274 .set_bucket_count(NonZero::new(10).unwrap()) // count * duration = 5 mins. stuff doesn't happen that fast here. 275 .set_enable_unit_suffix(true) 276 .with_http_listener(metrics_bind) 277 .install()?; 278 println!("metrics server installed! listening at {metrics_bind:?}"); 279 Ok(()) 280} 281 282#[cfg(test)] 283mod tests { 284 use constellation::consumer::get_actionable; 285 use constellation::storage::{LinkReader, LinkStorage, MemStorage}; 286 287 #[test] 288 fn test_create_like_integrated() { 289 let mut storage = MemStorage::new(); 290 291 let rec = r#"{ 292 "did":"did:plc:icprmty6ticzracr5urz4uum", 293 "time_us":1736448492661668, 294 "kind":"commit", 295 "commit":{"rev":"3lfddpt5qa62c","operation":"create","collection":"app.bsky.feed.like","rkey":"3lfddpt5djw2c","record":{ 296 "$type":"app.bsky.feed.like", 297 "createdAt":"2025-01-09T18:48:10.412Z", 298 "subject":{"cid":"bafyreihazf62qvmusup55ojhkzwbmzee6rxtsug3e6eg33mnjrgthxvozu","uri":"at://did:plc:lphckw3dz4mnh3ogmfpdgt6z/app.bsky.feed.post/3lfdau5f7wk23"} 299 }, 300 "cid":"bafyreidgcs2id7nsbp6co42ind2wcig3riwcvypwan6xdywyfqklovhdjq"} 301 }"#.parse().unwrap(); 302 let (action, ts) = get_actionable(&rec).unwrap(); 303 storage.push(&action, ts).unwrap(); 304 assert_eq!( 305 storage 306 .get_count( 307 "at://did:plc:lphckw3dz4mnh3ogmfpdgt6z/app.bsky.feed.post/3lfdau5f7wk23", 308 "app.bsky.feed.like", 309 ".subject.uri" 310 ) 311 .unwrap(), 312 1 313 ); 314 } 315}