Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1// use foyer::HybridCache; 2// use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder}; 3use metrics_exporter_prometheus::PrometheusBuilder; 4use slingshot::{Identity, Repo, consume, error::MainTaskError, firehose_cache, serve}; 5use std::path::PathBuf; 6 7use clap::Parser; 8use tokio_util::sync::CancellationToken; 9 10/// Slingshot record edge cache 11#[derive(Parser, Debug, Clone)] 12#[command(version, about, long_about = None)] 13struct Args { 14 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: 15 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2' 16 #[arg(long)] 17 jetstream: String, 18 /// don't request zstd-compressed jetstream events 19 /// 20 /// reduces CPU at the expense of more ingress bandwidth 21 #[arg(long, action)] 22 jetstream_no_zstd: bool, 23 /// where to keep disk caches 24 #[arg(long)] 25 cache_dir: PathBuf, 26 /// the domain pointing to this server 27 /// 28 /// if present: 29 /// - a did:web document will be served at /.well-known/did.json 30 /// - an HTTPS certs will be automatically configured with Acme/letsencrypt 31 /// - TODO: a rate-limiter will be installed 32 #[arg(long)] 33 host: Option<String>, 34 /// email address for letsencrypt contact 35 /// 36 /// recommended in production, i guess? 37 #[arg(long)] 38 acme_contact: Option<String>, 39 /// a location to cache acme https certs 40 /// 41 /// only used if --host is specified. omitting requires re-requesting certs 42 /// on every restart, and letsencrypt has rate limits that are easy to hit. 43 /// 44 /// recommended in production, but mind the file permissions. 45 #[arg(long)] 46 certs: Option<PathBuf>, 47} 48 49#[tokio::main] 50async fn main() -> Result<(), String> { 51 tracing_subscriber::fmt::init(); 52 53 let shutdown = CancellationToken::new(); 54 55 let ctrlc_shutdown = shutdown.clone(); 56 ctrlc::set_handler(move || ctrlc_shutdown.cancel()).expect("failed to set ctrl-c handler"); 57 58 let args = Args::parse(); 59 60 if let Err(e) = install_metrics_server() { 61 log::error!("failed to install metrics server: {e:?}"); 62 } else { 63 log::info!("metrics listening at http://0.0.0.0:8765"); 64 } 65 66 std::fs::create_dir_all(&args.cache_dir).map_err(|e| { 67 format!( 68 "failed to ensure cache parent dir: {e:?} (dir: {:?})", 69 args.cache_dir 70 ) 71 })?; 72 let cache_dir = args.cache_dir.canonicalize().map_err(|e| { 73 format!( 74 "failed to canonicalize cache_dir: {e:?} (dir: {:?})", 75 args.cache_dir 76 ) 77 })?; 78 log::info!("cache dir ready at at {cache_dir:?}."); 79 80 log::info!("setting up firehose cache..."); 81 let cache = firehose_cache(cache_dir.join("./firehose")).await?; 82 log::info!("firehose cache ready."); 83 84 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new(); 85 86 log::info!("starting identity service..."); 87 let identity = Identity::new(cache_dir.join("./identity")) 88 .await 89 .map_err(|e| format!("identity setup failed: {e:?}"))?; 90 log::info!("identity service ready."); 91 let identity_refresher = identity.clone(); 92 tasks.spawn(async move { 93 identity_refresher.run_refresher().await?; 94 Ok(()) 95 }); 96 97 let repo = Repo::new(identity.clone()); 98 99 let server_shutdown = shutdown.clone(); 100 let server_cache_handle = cache.clone(); 101 tasks.spawn(async move { 102 serve( 103 server_cache_handle, 104 identity, 105 repo, 106 args.host, 107 args.acme_contact, 108 args.certs, 109 server_shutdown, 110 ) 111 .await?; 112 Ok(()) 113 }); 114 115 let consumer_shutdown = shutdown.clone(); 116 tasks.spawn(async move { 117 consume( 118 args.jetstream, 119 None, 120 args.jetstream_no_zstd, 121 consumer_shutdown, 122 cache, 123 ) 124 .await?; 125 Ok(()) 126 }); 127 128 tokio::select! { 129 _ = shutdown.cancelled() => log::warn!("shutdown requested"), 130 Some(r) = tasks.join_next() => { 131 log::warn!("a task exited, shutting down: {r:?}"); 132 shutdown.cancel(); 133 } 134 } 135 136 tokio::select! { 137 _ = async { 138 while let Some(completed) = tasks.join_next().await { 139 log::info!("shutdown: task completed: {completed:?}"); 140 } 141 } => {}, 142 _ = tokio::time::sleep(std::time::Duration::from_secs(3)) => { 143 log::info!("shutdown: not all tasks completed on time. aborting..."); 144 tasks.shutdown().await; 145 }, 146 } 147 148 log::info!("bye!"); 149 150 Ok(()) 151} 152 153fn install_metrics_server() -> Result<(), metrics_exporter_prometheus::BuildError> { 154 log::info!("installing metrics server..."); 155 let host = [0, 0, 0, 0]; 156 let port = 8765; 157 PrometheusBuilder::new() 158 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 159 .set_bucket_duration(std::time::Duration::from_secs(300))? 160 .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here. 161 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage) 162 .with_http_listener((host, port)) 163 .install()?; 164 log::info!( 165 "metrics server installed! listening on http://{}.{}.{}.{}:{port}", 166 host[0], 167 host[1], 168 host[2], 169 host[3] 170 ); 171 Ok(()) 172}