forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1// use foyer::HybridCache;
2// use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder};
3use metrics_exporter_prometheus::PrometheusBuilder;
4use slingshot::{Identity, Repo, consume, error::MainTaskError, firehose_cache, serve};
5use std::path::PathBuf;
6
7use clap::Parser;
8use tokio_util::sync::CancellationToken;
9
10/// Slingshot record edge cache
11#[derive(Parser, Debug, Clone)]
12#[command(version, about, long_about = None)]
13struct Args {
14 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:
15 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2'
16 #[arg(long)]
17 jetstream: String,
18 /// don't request zstd-compressed jetstream events
19 ///
20 /// reduces CPU at the expense of more ingress bandwidth
21 #[arg(long, action)]
22 jetstream_no_zstd: bool,
23 /// where to keep disk caches
24 #[arg(long)]
25 cache_dir: PathBuf,
26 /// the domain pointing to this server
27 ///
28 /// if present:
29 /// - a did:web document will be served at /.well-known/did.json
30 /// - an HTTPS certs will be automatically configured with Acme/letsencrypt
31 /// - TODO: a rate-limiter will be installed
32 #[arg(long)]
33 host: Option<String>,
34 /// email address for letsencrypt contact
35 ///
36 /// recommended in production, i guess?
37 #[arg(long)]
38 acme_contact: Option<String>,
39 /// a location to cache acme https certs
40 ///
41 /// only used if --host is specified. omitting requires re-requesting certs
42 /// on every restart, and letsencrypt has rate limits that are easy to hit.
43 ///
44 /// recommended in production, but mind the file permissions.
45 #[arg(long)]
46 certs: Option<PathBuf>,
47}
48
49#[tokio::main]
50async fn main() -> Result<(), String> {
51 tracing_subscriber::fmt::init();
52
53 let shutdown = CancellationToken::new();
54
55 let ctrlc_shutdown = shutdown.clone();
56 ctrlc::set_handler(move || ctrlc_shutdown.cancel()).expect("failed to set ctrl-c handler");
57
58 let args = Args::parse();
59
60 if let Err(e) = install_metrics_server() {
61 log::error!("failed to install metrics server: {e:?}");
62 } else {
63 log::info!("metrics listening at http://0.0.0.0:8765");
64 }
65
66 std::fs::create_dir_all(&args.cache_dir).map_err(|e| {
67 format!(
68 "failed to ensure cache parent dir: {e:?} (dir: {:?})",
69 args.cache_dir
70 )
71 })?;
72 let cache_dir = args.cache_dir.canonicalize().map_err(|e| {
73 format!(
74 "failed to canonicalize cache_dir: {e:?} (dir: {:?})",
75 args.cache_dir
76 )
77 })?;
78 log::info!("cache dir ready at at {cache_dir:?}.");
79
80 log::info!("setting up firehose cache...");
81 let cache = firehose_cache(cache_dir.join("./firehose")).await?;
82 log::info!("firehose cache ready.");
83
84 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new();
85
86 log::info!("starting identity service...");
87 let identity = Identity::new(cache_dir.join("./identity"))
88 .await
89 .map_err(|e| format!("identity setup failed: {e:?}"))?;
90 log::info!("identity service ready.");
91 let identity_refresher = identity.clone();
92 tasks.spawn(async move {
93 identity_refresher.run_refresher().await?;
94 Ok(())
95 });
96
97 let repo = Repo::new(identity.clone());
98
99 let server_shutdown = shutdown.clone();
100 let server_cache_handle = cache.clone();
101 tasks.spawn(async move {
102 serve(
103 server_cache_handle,
104 identity,
105 repo,
106 args.host,
107 args.acme_contact,
108 args.certs,
109 server_shutdown,
110 )
111 .await?;
112 Ok(())
113 });
114
115 let consumer_shutdown = shutdown.clone();
116 tasks.spawn(async move {
117 consume(
118 args.jetstream,
119 None,
120 args.jetstream_no_zstd,
121 consumer_shutdown,
122 cache,
123 )
124 .await?;
125 Ok(())
126 });
127
128 tokio::select! {
129 _ = shutdown.cancelled() => log::warn!("shutdown requested"),
130 Some(r) = tasks.join_next() => {
131 log::warn!("a task exited, shutting down: {r:?}");
132 shutdown.cancel();
133 }
134 }
135
136 tokio::select! {
137 _ = async {
138 while let Some(completed) = tasks.join_next().await {
139 log::info!("shutdown: task completed: {completed:?}");
140 }
141 } => {},
142 _ = tokio::time::sleep(std::time::Duration::from_secs(3)) => {
143 log::info!("shutdown: not all tasks completed on time. aborting...");
144 tasks.shutdown().await;
145 },
146 }
147
148 log::info!("bye!");
149
150 Ok(())
151}
152
153fn install_metrics_server() -> Result<(), metrics_exporter_prometheus::BuildError> {
154 log::info!("installing metrics server...");
155 let host = [0, 0, 0, 0];
156 let port = 8765;
157 PrometheusBuilder::new()
158 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
159 .set_bucket_duration(std::time::Duration::from_secs(300))?
160 .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here.
161 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage)
162 .with_http_listener((host, port))
163 .install()?;
164 log::info!(
165 "metrics server installed! listening on http://{}.{}.{}.{}:{port}",
166 host[0],
167 host[1],
168 host[2],
169 host[3]
170 );
171 Ok(())
172}