Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
at main 6.8 kB view raw
1// use foyer::HybridCache; 2// use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder}; 3use metrics_exporter_prometheus::PrometheusBuilder; 4use slingshot::{ 5 Identity, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve, 6}; 7use std::path::PathBuf; 8 9use clap::Parser; 10use tokio_util::sync::CancellationToken; 11 12/// Slingshot record edge cache 13#[derive(Parser, Debug, Clone)] 14#[command(version, about, long_about = None)] 15struct Args { 16 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: 17 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2' 18 #[arg(long)] 19 jetstream: String, 20 /// don't request zstd-compressed jetstream events 21 /// 22 /// reduces CPU at the expense of more ingress bandwidth 23 #[arg(long, action)] 24 jetstream_no_zstd: bool, 25 /// where to keep disk caches 26 #[arg(long)] 27 cache_dir: PathBuf, 28 /// memory cache size in MB 29 #[arg(long, default_value_t = 64)] 30 cache_memory_mb: usize, 31 /// disk cache size in GB 32 #[arg(long, default_value_t = 1)] 33 cache_disk_gb: usize, 34 /// host for HTTP server (when not using --domain) 35 #[arg(long, default_value = "127.0.0.1")] 36 host: String, 37 /// port for HTTP server (when not using --domain) 38 #[arg(long, default_value_t = 3000)] 39 port: u16, 40 /// port for metrics/prometheus server 41 #[arg(long, default_value_t = 8765)] 42 metrics_port: u16, 43 /// the domain pointing to this server 44 /// 45 /// if present: 46 /// - a did:web document will be served at /.well-known/did.json 47 /// - an HTTPS certs will be automatically configured with Acme/letsencrypt 48 /// - TODO: a rate-limiter will be installed 49 #[arg(long)] 50 domain: Option<String>, 51 /// email address for letsencrypt contact 52 /// 53 /// recommended in production, i guess? 54 #[arg(long)] 55 acme_contact: Option<String>, 56 /// a location to cache acme https certs 57 /// 58 /// only used if --host is specified. omitting requires re-requesting certs 59 /// on every restart, and letsencrypt has rate limits that are easy to hit. 60 /// 61 /// recommended in production, but mind the file permissions. 62 #[arg(long)] 63 certs: Option<PathBuf>, 64 /// an web address to send healtcheck pings to every ~51s or so 65 #[arg(long)] 66 healthcheck: Option<String>, 67} 68 69#[tokio::main] 70async fn main() -> Result<(), String> { 71 tracing_subscriber::fmt::init(); 72 73 let shutdown = CancellationToken::new(); 74 75 let ctrlc_shutdown = shutdown.clone(); 76 ctrlc::set_handler(move || ctrlc_shutdown.cancel()).expect("failed to set ctrl-c handler"); 77 78 let args = Args::parse(); 79 80 if let Err(e) = install_metrics_server(args.metrics_port) { 81 log::error!("failed to install metrics server: {e:?}"); 82 } else { 83 log::info!("metrics listening at http://0.0.0.0:{}", args.metrics_port); 84 } 85 86 std::fs::create_dir_all(&args.cache_dir).map_err(|e| { 87 format!( 88 "failed to ensure cache parent dir: {e:?} (dir: {:?})", 89 args.cache_dir 90 ) 91 })?; 92 let cache_dir = args.cache_dir.canonicalize().map_err(|e| { 93 format!( 94 "failed to canonicalize cache_dir: {e:?} (dir: {:?})", 95 args.cache_dir 96 ) 97 })?; 98 log::info!("cache dir ready at at {cache_dir:?}."); 99 100 log::info!("setting up firehose cache..."); 101 let cache = firehose_cache( 102 cache_dir.join("./firehose"), 103 args.cache_memory_mb, 104 args.cache_disk_gb, 105 ) 106 .await?; 107 log::info!("firehose cache ready."); 108 109 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new(); 110 111 log::info!("starting identity service..."); 112 let identity = Identity::new(cache_dir.join("./identity")) 113 .await 114 .map_err(|e| format!("identity setup failed: {e:?}"))?; 115 log::info!("identity service ready."); 116 let identity_refresher = identity.clone(); 117 let identity_shutdown = shutdown.clone(); 118 tasks.spawn(async move { 119 identity_refresher.run_refresher(identity_shutdown).await?; 120 Ok(()) 121 }); 122 123 let repo = Repo::new(identity.clone()); 124 125 let server_shutdown = shutdown.clone(); 126 let server_cache_handle = cache.clone(); 127 tasks.spawn(async move { 128 serve( 129 server_cache_handle, 130 identity, 131 repo, 132 args.domain, 133 args.acme_contact, 134 args.certs, 135 args.host, 136 args.port, 137 server_shutdown, 138 ) 139 .await?; 140 Ok(()) 141 }); 142 143 let consumer_shutdown = shutdown.clone(); 144 let consumer_cache = cache.clone(); 145 tasks.spawn(async move { 146 consume( 147 args.jetstream, 148 None, 149 args.jetstream_no_zstd, 150 consumer_shutdown, 151 consumer_cache, 152 ) 153 .await?; 154 Ok(()) 155 }); 156 157 if let Some(hc) = args.healthcheck { 158 let healthcheck_shutdown = shutdown.clone(); 159 tasks.spawn(async move { 160 healthcheck(hc, healthcheck_shutdown).await?; 161 Ok(()) 162 }); 163 } 164 165 tokio::select! { 166 _ = shutdown.cancelled() => log::warn!("shutdown requested"), 167 Some(r) = tasks.join_next() => { 168 log::warn!("a task exited, shutting down: {r:?}"); 169 shutdown.cancel(); 170 } 171 } 172 173 tasks.spawn(async move { 174 cache 175 .close() 176 .await 177 .map_err(MainTaskError::FirehoseCacheCloseError) 178 }); 179 180 tokio::select! { 181 _ = async { 182 while let Some(completed) = tasks.join_next().await { 183 log::info!("shutdown: task completed: {completed:?}"); 184 } 185 } => {}, 186 _ = tokio::time::sleep(std::time::Duration::from_secs(30)) => { 187 log::info!("shutdown: not all tasks completed on time. aborting..."); 188 tasks.shutdown().await; 189 }, 190 } 191 192 log::info!("bye!"); 193 194 Ok(()) 195} 196 197fn install_metrics_server(port: u16) -> Result<(), metrics_exporter_prometheus::BuildError> { 198 log::info!("installing metrics server..."); 199 let host = [0, 0, 0, 0]; 200 PrometheusBuilder::new() 201 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 202 .set_bucket_duration(std::time::Duration::from_secs(300))? 203 .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here. 204 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage) 205 .with_http_listener((host, port)) 206 .install()?; 207 log::info!( 208 "metrics server installed! listening on http://{}.{}.{}.{}:{port}", 209 host[0], 210 host[1], 211 host[2], 212 host[3] 213 ); 214 Ok(()) 215}