forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1// use foyer::HybridCache;
2// use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder};
3use metrics_exporter_prometheus::PrometheusBuilder;
4use slingshot::{
5 Identity, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve,
6};
7use std::path::PathBuf;
8
9use clap::Parser;
10use tokio_util::sync::CancellationToken;
11
12/// Slingshot record edge cache
13#[derive(Parser, Debug, Clone)]
14#[command(version, about, long_about = None)]
15struct Args {
16 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:
17 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2'
18 #[arg(long)]
19 jetstream: String,
20 /// don't request zstd-compressed jetstream events
21 ///
22 /// reduces CPU at the expense of more ingress bandwidth
23 #[arg(long, action)]
24 jetstream_no_zstd: bool,
25 /// where to keep disk caches
26 #[arg(long)]
27 cache_dir: PathBuf,
28 /// the domain pointing to this server
29 ///
30 /// if present:
31 /// - a did:web document will be served at /.well-known/did.json
32 /// - an HTTPS certs will be automatically configured with Acme/letsencrypt
33 /// - TODO: a rate-limiter will be installed
34 #[arg(long)]
35 domain: Option<String>,
36 /// email address for letsencrypt contact
37 ///
38 /// recommended in production, i guess?
39 #[arg(long)]
40 acme_contact: Option<String>,
41 /// a location to cache acme https certs
42 ///
43 /// only used if --host is specified. omitting requires re-requesting certs
44 /// on every restart, and letsencrypt has rate limits that are easy to hit.
45 ///
46 /// recommended in production, but mind the file permissions.
47 #[arg(long)]
48 certs: Option<PathBuf>,
49 /// an web address to send healtcheck pings to every ~51s or so
50 #[arg(long)]
51 healthcheck: Option<String>,
52}
53
54#[tokio::main]
55async fn main() -> Result<(), String> {
56 tracing_subscriber::fmt::init();
57
58 let shutdown = CancellationToken::new();
59
60 let ctrlc_shutdown = shutdown.clone();
61 ctrlc::set_handler(move || ctrlc_shutdown.cancel()).expect("failed to set ctrl-c handler");
62
63 let args = Args::parse();
64
65 if let Err(e) = install_metrics_server() {
66 log::error!("failed to install metrics server: {e:?}");
67 } else {
68 log::info!("metrics listening at http://0.0.0.0:8765");
69 }
70
71 std::fs::create_dir_all(&args.cache_dir).map_err(|e| {
72 format!(
73 "failed to ensure cache parent dir: {e:?} (dir: {:?})",
74 args.cache_dir
75 )
76 })?;
77 let cache_dir = args.cache_dir.canonicalize().map_err(|e| {
78 format!(
79 "failed to canonicalize cache_dir: {e:?} (dir: {:?})",
80 args.cache_dir
81 )
82 })?;
83 log::info!("cache dir ready at at {cache_dir:?}.");
84
85 log::info!("setting up firehose cache...");
86 let cache = firehose_cache(cache_dir.join("./firehose")).await?;
87 log::info!("firehose cache ready.");
88
89 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new();
90
91 log::info!("starting identity service...");
92 let identity = Identity::new(cache_dir.join("./identity"))
93 .await
94 .map_err(|e| format!("identity setup failed: {e:?}"))?;
95 log::info!("identity service ready.");
96 let identity_refresher = identity.clone();
97 let identity_shutdown = shutdown.clone();
98 tasks.spawn(async move {
99 identity_refresher.run_refresher(identity_shutdown).await?;
100 Ok(())
101 });
102
103 let repo = Repo::new(identity.clone());
104
105 let server_shutdown = shutdown.clone();
106 let server_cache_handle = cache.clone();
107 tasks.spawn(async move {
108 serve(
109 server_cache_handle,
110 identity,
111 repo,
112 args.domain,
113 args.acme_contact,
114 args.certs,
115 server_shutdown,
116 )
117 .await?;
118 Ok(())
119 });
120
121 let consumer_shutdown = shutdown.clone();
122 let consumer_cache = cache.clone();
123 tasks.spawn(async move {
124 consume(
125 args.jetstream,
126 None,
127 args.jetstream_no_zstd,
128 consumer_shutdown,
129 consumer_cache,
130 )
131 .await?;
132 Ok(())
133 });
134
135 if let Some(hc) = args.healthcheck {
136 let healthcheck_shutdown = shutdown.clone();
137 tasks.spawn(async move {
138 healthcheck(hc, healthcheck_shutdown).await?;
139 Ok(())
140 });
141 }
142
143 tokio::select! {
144 _ = shutdown.cancelled() => log::warn!("shutdown requested"),
145 Some(r) = tasks.join_next() => {
146 log::warn!("a task exited, shutting down: {r:?}");
147 shutdown.cancel();
148 }
149 }
150
151 tasks.spawn(async move {
152 cache
153 .close()
154 .await
155 .map_err(MainTaskError::FirehoseCacheCloseError)
156 });
157
158 tokio::select! {
159 _ = async {
160 while let Some(completed) = tasks.join_next().await {
161 log::info!("shutdown: task completed: {completed:?}");
162 }
163 } => {},
164 _ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {
165 log::info!("shutdown: not all tasks completed on time. aborting...");
166 tasks.shutdown().await;
167 },
168 }
169
170 log::info!("bye!");
171
172 Ok(())
173}
174
175fn install_metrics_server() -> Result<(), metrics_exporter_prometheus::BuildError> {
176 log::info!("installing metrics server...");
177 let host = [0, 0, 0, 0];
178 let port = 8765;
179 PrometheusBuilder::new()
180 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
181 .set_bucket_duration(std::time::Duration::from_secs(300))?
182 .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here.
183 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage)
184 .with_http_listener((host, port))
185 .install()?;
186 log::info!(
187 "metrics server installed! listening on http://{}.{}.{}.{}:{port}",
188 host[0],
189 host[1],
190 host[2],
191 host[3]
192 );
193 Ok(())
194}