Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
at pocket 6.1 kB view raw
1// use foyer::HybridCache; 2// use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder}; 3use metrics_exporter_prometheus::PrometheusBuilder; 4use slingshot::{ 5 Identity, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve, 6}; 7use std::path::PathBuf; 8 9use clap::Parser; 10use tokio_util::sync::CancellationToken; 11 12/// Slingshot record edge cache 13#[derive(Parser, Debug, Clone)] 14#[command(version, about, long_about = None)] 15struct Args { 16 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: 17 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2' 18 #[arg(long)] 19 jetstream: String, 20 /// don't request zstd-compressed jetstream events 21 /// 22 /// reduces CPU at the expense of more ingress bandwidth 23 #[arg(long, action)] 24 jetstream_no_zstd: bool, 25 /// where to keep disk caches 26 #[arg(long)] 27 cache_dir: PathBuf, 28 /// the domain pointing to this server 29 /// 30 /// if present: 31 /// - a did:web document will be served at /.well-known/did.json 32 /// - an HTTPS certs will be automatically configured with Acme/letsencrypt 33 /// - TODO: a rate-limiter will be installed 34 #[arg(long)] 35 domain: Option<String>, 36 /// email address for letsencrypt contact 37 /// 38 /// recommended in production, i guess? 39 #[arg(long)] 40 acme_contact: Option<String>, 41 /// a location to cache acme https certs 42 /// 43 /// only used if --host is specified. omitting requires re-requesting certs 44 /// on every restart, and letsencrypt has rate limits that are easy to hit. 45 /// 46 /// recommended in production, but mind the file permissions. 47 #[arg(long)] 48 certs: Option<PathBuf>, 49 /// an web address to send healtcheck pings to every ~51s or so 50 #[arg(long)] 51 healthcheck: Option<String>, 52} 53 54#[tokio::main] 55async fn main() -> Result<(), String> { 56 tracing_subscriber::fmt::init(); 57 58 let shutdown = CancellationToken::new(); 59 60 let ctrlc_shutdown = shutdown.clone(); 61 ctrlc::set_handler(move || ctrlc_shutdown.cancel()).expect("failed to set ctrl-c handler"); 62 63 let args = Args::parse(); 64 65 if let Err(e) = install_metrics_server() { 66 log::error!("failed to install metrics server: {e:?}"); 67 } else { 68 log::info!("metrics listening at http://0.0.0.0:8765"); 69 } 70 71 std::fs::create_dir_all(&args.cache_dir).map_err(|e| { 72 format!( 73 "failed to ensure cache parent dir: {e:?} (dir: {:?})", 74 args.cache_dir 75 ) 76 })?; 77 let cache_dir = args.cache_dir.canonicalize().map_err(|e| { 78 format!( 79 "failed to canonicalize cache_dir: {e:?} (dir: {:?})", 80 args.cache_dir 81 ) 82 })?; 83 log::info!("cache dir ready at at {cache_dir:?}."); 84 85 log::info!("setting up firehose cache..."); 86 let cache = firehose_cache(cache_dir.join("./firehose")).await?; 87 log::info!("firehose cache ready."); 88 89 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new(); 90 91 log::info!("starting identity service..."); 92 let identity = Identity::new(cache_dir.join("./identity")) 93 .await 94 .map_err(|e| format!("identity setup failed: {e:?}"))?; 95 log::info!("identity service ready."); 96 let identity_refresher = identity.clone(); 97 let identity_shutdown = shutdown.clone(); 98 tasks.spawn(async move { 99 identity_refresher.run_refresher(identity_shutdown).await?; 100 Ok(()) 101 }); 102 103 let repo = Repo::new(identity.clone()); 104 105 let server_shutdown = shutdown.clone(); 106 let server_cache_handle = cache.clone(); 107 tasks.spawn(async move { 108 serve( 109 server_cache_handle, 110 identity, 111 repo, 112 args.domain, 113 args.acme_contact, 114 args.certs, 115 server_shutdown, 116 ) 117 .await?; 118 Ok(()) 119 }); 120 121 let consumer_shutdown = shutdown.clone(); 122 let consumer_cache = cache.clone(); 123 tasks.spawn(async move { 124 consume( 125 args.jetstream, 126 None, 127 args.jetstream_no_zstd, 128 consumer_shutdown, 129 consumer_cache, 130 ) 131 .await?; 132 Ok(()) 133 }); 134 135 if let Some(hc) = args.healthcheck { 136 let healthcheck_shutdown = shutdown.clone(); 137 tasks.spawn(async move { 138 healthcheck(hc, healthcheck_shutdown).await?; 139 Ok(()) 140 }); 141 } 142 143 tokio::select! { 144 _ = shutdown.cancelled() => log::warn!("shutdown requested"), 145 Some(r) = tasks.join_next() => { 146 log::warn!("a task exited, shutting down: {r:?}"); 147 shutdown.cancel(); 148 } 149 } 150 151 tasks.spawn(async move { 152 cache 153 .close() 154 .await 155 .map_err(MainTaskError::FirehoseCacheCloseError) 156 }); 157 158 tokio::select! { 159 _ = async { 160 while let Some(completed) = tasks.join_next().await { 161 log::info!("shutdown: task completed: {completed:?}"); 162 } 163 } => {}, 164 _ = tokio::time::sleep(std::time::Duration::from_secs(30)) => { 165 log::info!("shutdown: not all tasks completed on time. aborting..."); 166 tasks.shutdown().await; 167 }, 168 } 169 170 log::info!("bye!"); 171 172 Ok(()) 173} 174 175fn install_metrics_server() -> Result<(), metrics_exporter_prometheus::BuildError> { 176 log::info!("installing metrics server..."); 177 let host = [0, 0, 0, 0]; 178 let port = 8765; 179 PrometheusBuilder::new() 180 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 181 .set_bucket_duration(std::time::Duration::from_secs(300))? 182 .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here. 183 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage) 184 .with_http_listener((host, port)) 185 .install()?; 186 log::info!( 187 "metrics server installed! listening on http://{}.{}.{}.{}:{port}", 188 host[0], 189 host[1], 190 host[2], 191 host[3] 192 ); 193 Ok(()) 194}