forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1// use foyer::HybridCache;
2// use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder};
3use metrics_exporter_prometheus::PrometheusBuilder;
4use slingshot::{
5 Identity, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve,
6};
7use std::path::PathBuf;
8
9use clap::Parser;
10use tokio_util::sync::CancellationToken;
11
12/// Slingshot record edge cache
13#[derive(Parser, Debug, Clone)]
14#[command(version, about, long_about = None)]
15struct Args {
16 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:
17 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2'
18 #[arg(long)]
19 jetstream: String,
20 /// don't request zstd-compressed jetstream events
21 ///
22 /// reduces CPU at the expense of more ingress bandwidth
23 #[arg(long, action)]
24 jetstream_no_zstd: bool,
25 /// where to keep disk caches
26 #[arg(long)]
27 cache_dir: PathBuf,
28 /// memory cache size in MB
29 #[arg(long, default_value_t = 64)]
30 cache_memory_mb: usize,
31 /// disk cache size in GB
32 #[arg(long, default_value_t = 1)]
33 cache_disk_gb: usize,
34 /// host for HTTP server (when not using --domain)
35 #[arg(long, default_value = "127.0.0.1")]
36 host: String,
37 /// port for HTTP server (when not using --domain)
38 #[arg(long, default_value_t = 3000)]
39 port: u16,
40 /// port for metrics/prometheus server
41 #[arg(long, default_value_t = 8765)]
42 metrics_port: u16,
43 /// the domain pointing to this server
44 ///
45 /// if present:
46 /// - a did:web document will be served at /.well-known/did.json
47 /// - an HTTPS certs will be automatically configured with Acme/letsencrypt
48 /// - TODO: a rate-limiter will be installed
49 #[arg(long)]
50 domain: Option<String>,
51 /// email address for letsencrypt contact
52 ///
53 /// recommended in production, i guess?
54 #[arg(long)]
55 acme_contact: Option<String>,
56 /// a location to cache acme https certs
57 ///
58 /// only used if --host is specified. omitting requires re-requesting certs
59 /// on every restart, and letsencrypt has rate limits that are easy to hit.
60 ///
61 /// recommended in production, but mind the file permissions.
62 #[arg(long)]
63 certs: Option<PathBuf>,
64 /// an web address to send healtcheck pings to every ~51s or so
65 #[arg(long)]
66 healthcheck: Option<String>,
67}
68
69#[tokio::main]
70async fn main() -> Result<(), String> {
71 tracing_subscriber::fmt::init();
72
73 let shutdown = CancellationToken::new();
74
75 let ctrlc_shutdown = shutdown.clone();
76 ctrlc::set_handler(move || ctrlc_shutdown.cancel()).expect("failed to set ctrl-c handler");
77
78 let args = Args::parse();
79
80 if let Err(e) = install_metrics_server(args.metrics_port) {
81 log::error!("failed to install metrics server: {e:?}");
82 } else {
83 log::info!("metrics listening at http://0.0.0.0:{}", args.metrics_port);
84 }
85
86 std::fs::create_dir_all(&args.cache_dir).map_err(|e| {
87 format!(
88 "failed to ensure cache parent dir: {e:?} (dir: {:?})",
89 args.cache_dir
90 )
91 })?;
92 let cache_dir = args.cache_dir.canonicalize().map_err(|e| {
93 format!(
94 "failed to canonicalize cache_dir: {e:?} (dir: {:?})",
95 args.cache_dir
96 )
97 })?;
98 log::info!("cache dir ready at at {cache_dir:?}.");
99
100 log::info!("setting up firehose cache...");
101 let cache = firehose_cache(
102 cache_dir.join("./firehose"),
103 args.cache_memory_mb,
104 args.cache_disk_gb,
105 )
106 .await?;
107 log::info!("firehose cache ready.");
108
109 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new();
110
111 log::info!("starting identity service...");
112 let identity = Identity::new(cache_dir.join("./identity"))
113 .await
114 .map_err(|e| format!("identity setup failed: {e:?}"))?;
115 log::info!("identity service ready.");
116 let identity_refresher = identity.clone();
117 let identity_shutdown = shutdown.clone();
118 tasks.spawn(async move {
119 identity_refresher.run_refresher(identity_shutdown).await?;
120 Ok(())
121 });
122
123 let repo = Repo::new(identity.clone());
124
125 let server_shutdown = shutdown.clone();
126 let server_cache_handle = cache.clone();
127 tasks.spawn(async move {
128 serve(
129 server_cache_handle,
130 identity,
131 repo,
132 args.domain,
133 args.acme_contact,
134 args.certs,
135 args.host,
136 args.port,
137 server_shutdown,
138 )
139 .await?;
140 Ok(())
141 });
142
143 let consumer_shutdown = shutdown.clone();
144 let consumer_cache = cache.clone();
145 tasks.spawn(async move {
146 consume(
147 args.jetstream,
148 None,
149 args.jetstream_no_zstd,
150 consumer_shutdown,
151 consumer_cache,
152 )
153 .await?;
154 Ok(())
155 });
156
157 if let Some(hc) = args.healthcheck {
158 let healthcheck_shutdown = shutdown.clone();
159 tasks.spawn(async move {
160 healthcheck(hc, healthcheck_shutdown).await?;
161 Ok(())
162 });
163 }
164
165 tokio::select! {
166 _ = shutdown.cancelled() => log::warn!("shutdown requested"),
167 Some(r) = tasks.join_next() => {
168 log::warn!("a task exited, shutting down: {r:?}");
169 shutdown.cancel();
170 }
171 }
172
173 tasks.spawn(async move {
174 cache
175 .close()
176 .await
177 .map_err(MainTaskError::FirehoseCacheCloseError)
178 });
179
180 tokio::select! {
181 _ = async {
182 while let Some(completed) = tasks.join_next().await {
183 log::info!("shutdown: task completed: {completed:?}");
184 }
185 } => {},
186 _ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {
187 log::info!("shutdown: not all tasks completed on time. aborting...");
188 tasks.shutdown().await;
189 },
190 }
191
192 log::info!("bye!");
193
194 Ok(())
195}
196
197fn install_metrics_server(port: u16) -> Result<(), metrics_exporter_prometheus::BuildError> {
198 log::info!("installing metrics server...");
199 let host = [0, 0, 0, 0];
200 PrometheusBuilder::new()
201 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
202 .set_bucket_duration(std::time::Duration::from_secs(300))?
203 .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here.
204 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage)
205 .with_http_listener((host, port))
206 .install()?;
207 log::info!(
208 "metrics server installed! listening on http://{}.{}.{}.{}:{port}",
209 host[0],
210 host[1],
211 host[2],
212 host[3]
213 );
214 Ok(())
215}