forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::ClientMessage;
2use crate::error::ServerError;
3use crate::subscriber::Subscriber;
4use dropshot::{
5 ApiDescription, ApiEndpointBodyContentType, Body, ConfigDropshot, ConfigLogging,
6 ConfigLoggingLevel, ExtractorMetadata, HttpError, HttpResponse, Query, RequestContext,
7 ServerBuilder, ServerContext, SharedExtractor, WebsocketConnection, channel, endpoint,
8};
9use http::{
10 Response, StatusCode,
11 header::{ORIGIN, USER_AGENT},
12};
13use metrics::{counter, histogram};
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use schemars::JsonSchema;
18use serde::{Deserialize, Serialize};
19use std::collections::HashSet;
20use tokio::sync::broadcast;
21use tokio::time::Instant;
22use tokio_tungstenite::tungstenite::protocol::{Role, WebSocketConfig};
23use tokio_util::sync::CancellationToken;
24
25const INDEX_HTML: &str = include_str!("../static/index.html");
26const FAVICON: &[u8] = include_bytes!("../static/favicon.ico");
27
28pub async fn serve(
29 b: broadcast::Sender<Arc<ClientMessage>>,
30 d: broadcast::Sender<Arc<ClientMessage>>,
31 shutdown: CancellationToken,
32) -> Result<(), ServerError> {
33 let config_logging = ConfigLogging::StderrTerminal {
34 level: ConfigLoggingLevel::Info,
35 };
36
37 let log = config_logging
38 .to_logger("example-basic")
39 .map_err(ServerError::ConfigLogError)?;
40
41 let mut api = ApiDescription::new();
42 api.register(index).unwrap();
43 api.register(favicon).unwrap();
44 api.register(openapi).unwrap();
45 api.register(subscribe).unwrap();
46
47 // TODO: put spec in a once cell / lazy lock thing?
48 let spec = Arc::new(
49 api.openapi(
50 "Spacedust",
51 env!("CARGO_PKG_VERSION")
52 .parse()
53 .inspect_err(|e| {
54 eprintln!("failed to parse cargo package version for openapi: {e:?}")
55 })
56 .unwrap_or(semver::Version::new(0, 0, 1)),
57 )
58 .description("A configurable ATProto notifications firehose.")
59 .contact_name("part of @microcosm.blue")
60 .contact_url("https://microcosm.blue")
61 .json()
62 .map_err(ServerError::OpenApiJsonFail)?,
63 );
64
65 let sub_shutdown = shutdown.clone();
66 let ctx = Context {
67 spec,
68 b,
69 d,
70 shutdown: sub_shutdown,
71 };
72
73 let server = ServerBuilder::new(api, ctx, log)
74 .config(ConfigDropshot {
75 bind_address: "0.0.0.0:9998".parse().unwrap(),
76 ..Default::default()
77 })
78 .start()?;
79
80 tokio::select! {
81 s = server.wait_for_shutdown() => {
82 s.map_err(ServerError::ServerExited)?;
83 log::info!("server shut down normally.");
84 },
85 _ = shutdown.cancelled() => {
86 log::info!("shutting down: closing server");
87 server.close().await.map_err(ServerError::BadClose)?;
88 },
89 }
90 Ok(())
91}
92
93#[derive(Debug, Clone)]
94struct Context {
95 pub spec: Arc<serde_json::Value>,
96 pub b: broadcast::Sender<Arc<ClientMessage>>,
97 pub d: broadcast::Sender<Arc<ClientMessage>>,
98 pub shutdown: CancellationToken,
99}
100
101async fn instrument_handler<T, H, R>(ctx: &RequestContext<T>, handler: H) -> Result<R, HttpError>
102where
103 R: HttpResponse,
104 H: Future<Output = Result<R, HttpError>>,
105 T: ServerContext,
106{
107 let start = Instant::now();
108 let result = handler.await;
109 let latency = start.elapsed();
110 let status_code = match &result {
111 Ok(response) => response.status_code(),
112 Err(e) => e.status_code.as_status(),
113 }
114 .as_str() // just the number (.to_string()'s Display does eg `200 OK`)
115 .to_string();
116 let endpoint = ctx.endpoint.operation_id.clone();
117 let headers = ctx.request.headers();
118 let origin = headers
119 .get(ORIGIN)
120 .and_then(|v| v.to_str().ok())
121 .unwrap_or("")
122 .to_string();
123 let ua = headers
124 .get(USER_AGENT)
125 .and_then(|v| v.to_str().ok())
126 .map(|ua| {
127 if ua.starts_with("Mozilla/5.0 ") {
128 "browser"
129 } else {
130 ua
131 }
132 })
133 .unwrap_or("")
134 .to_string();
135 counter!("server_requests_total",
136 "endpoint" => endpoint.clone(),
137 "origin" => origin,
138 "ua" => ua,
139 "status_code" => status_code,
140 )
141 .increment(1);
142 histogram!("server_handler_latency", "endpoint" => endpoint).record(latency.as_micros() as f64);
143 result
144}
145
146use dropshot::{HttpResponseHeaders, HttpResponseOk};
147
148pub type OkCorsResponse<T> = Result<HttpResponseHeaders<HttpResponseOk<T>>, HttpError>;
149
150/// Helper for constructing Ok responses: return OkCors(T).into()
151/// (not happy with this yet)
152pub struct OkCors<T: Serialize + JsonSchema + Send + Sync>(pub T);
153
154impl<T> From<OkCors<T>> for OkCorsResponse<T>
155where
156 T: Serialize + JsonSchema + Send + Sync,
157{
158 fn from(ok: OkCors<T>) -> OkCorsResponse<T> {
159 let mut res = HttpResponseHeaders::new_unnamed(HttpResponseOk(ok.0));
160 res.headers_mut()
161 .insert("access-control-allow-origin", "*".parse().unwrap());
162 Ok(res)
163 }
164}
165
166// TODO: cors for HttpError
167
168/// Serve index page as html
169#[endpoint {
170 method = GET,
171 path = "/",
172 /*
173 * not useful to have this in openapi
174 */
175 unpublished = true,
176}]
177async fn index(ctx: RequestContext<Context>) -> Result<Response<Body>, HttpError> {
178 instrument_handler(&ctx, async {
179 Ok(Response::builder()
180 .status(StatusCode::OK)
181 .header(http::header::CONTENT_TYPE, "text/html")
182 .body(INDEX_HTML.into())?)
183 })
184 .await
185}
186
187/// Serve index page as html
188#[endpoint {
189 method = GET,
190 path = "/favicon.ico",
191 /*
192 * not useful to have this in openapi
193 */
194 unpublished = true,
195}]
196async fn favicon(ctx: RequestContext<Context>) -> Result<Response<Body>, HttpError> {
197 instrument_handler(&ctx, async {
198 Ok(Response::builder()
199 .status(StatusCode::OK)
200 .header(http::header::CONTENT_TYPE, "image/x-icon")
201 .body(FAVICON.to_vec().into())?)
202 })
203 .await
204}
205
206/// Meta: get the openapi spec for this api
207#[endpoint {
208 method = GET,
209 path = "/openapi",
210 /*
211 * not useful to have this in openapi
212 */
213 unpublished = true,
214}]
215async fn openapi(ctx: RequestContext<Context>) -> OkCorsResponse<serde_json::Value> {
216 instrument_handler(&ctx, async {
217 let spec = (*ctx.context().spec).clone();
218 OkCors(spec).into()
219 })
220 .await
221}
222
223/// The real type that gets deserialized
224#[derive(Debug, Deserialize, JsonSchema)]
225#[serde(rename_all = "camelCase")]
226pub struct MultiSubscribeQuery {
227 #[serde(default)]
228 pub wanted_subjects: HashSet<String>,
229 #[serde(default)]
230 pub wanted_subject_dids: HashSet<String>,
231 #[serde(default)]
232 pub wanted_sources: HashSet<String>,
233}
234/// The fake corresponding type for docs that dropshot won't freak out about a
235/// vec for
236#[derive(Deserialize, JsonSchema)]
237#[allow(dead_code)]
238#[serde(rename_all = "camelCase")]
239struct MultiSubscribeQueryForDocs {
240 /// One or more at-uris to receive links about
241 ///
242 /// The at-uri must be url-encoded
243 ///
244 /// Pass this parameter multiple times to specify multiple collections, like
245 /// `wantedSubjects=[...]&wantedSubjects=[...]`
246 pub wanted_subjects: String,
247 /// One or more DIDs to receive links about
248 ///
249 /// Pass this parameter multiple times to specify multiple collections
250 pub wanted_subject_dids: String,
251 /// One or more link sources to receive links about
252 ///
253 /// TODO: docs about link sources
254 ///
255 /// eg, a bluesky like's link source: `app.bsky.feed.like:subject.uri`
256 ///
257 /// Pass this parameter multiple times to specify multiple sources
258 pub wanted_sources: String,
259}
260
261// The `SharedExtractor` implementation for Query<QueryType> describes how to
262// construct an instance of `Query<QueryType>` from an HTTP request: namely, by
263// parsing the query string to an instance of `QueryType`.
264#[async_trait]
265impl SharedExtractor for MultiSubscribeQuery {
266 async fn from_request<Context: ServerContext>(
267 ctx: &RequestContext<Context>,
268 ) -> Result<MultiSubscribeQuery, HttpError> {
269 let raw_query = ctx.request.uri().query().unwrap_or("");
270 let q = serde_qs::from_str(raw_query).map_err(|e| {
271 HttpError::for_bad_request(None, format!("unable to parse query string: {e}"))
272 })?;
273 Ok(q)
274 }
275
276 fn metadata(body_content_type: ApiEndpointBodyContentType) -> ExtractorMetadata {
277 // HACK: query type switcheroo: passing MultiSubscribeQuery to
278 // `metadata` would "helpfully" panic because dropshot believes we can
279 // only have scalar types in a query.
280 //
281 // so instead we have a fake second type whose only job is to look the
282 // same as MultiSubscribeQuery exept that it has `String` instead of
283 // `Vec<String>`, which dropshot will accept, and generate ~close-enough
284 // docs for.
285 <Query<MultiSubscribeQueryForDocs> as SharedExtractor>::metadata(body_content_type)
286 }
287}
288
289#[derive(Deserialize, JsonSchema)]
290#[serde(rename_all = "camelCase")]
291struct ScalarSubscribeQuery {
292 /// Bypass the 21-sec delay buffer
293 ///
294 /// By default, spacedust holds all firehose links for 21 seconds before
295 /// emitting them, to prevent quickly- undone interactions from generating
296 /// notifications.
297 ///
298 /// Setting `instant` to true bypasses this buffer, allowing faster (and
299 /// noisier) notification delivery.
300 ///
301 /// Typically [a little less than 1%](https://bsky.app/profile/bad-example.com/post/3ls32wctsrs2l)
302 /// of links links get deleted within 21s of being created.
303 #[serde(default)]
304 pub instant: bool,
305}
306
307#[channel {
308 protocol = WEBSOCKETS,
309 path = "/subscribe",
310}]
311async fn subscribe(
312 reqctx: RequestContext<Context>,
313 query: MultiSubscribeQuery,
314 scalar_query: Query<ScalarSubscribeQuery>,
315 upgraded: WebsocketConnection,
316) -> dropshot::WebsocketChannelResult {
317 let ws = tokio_tungstenite::WebSocketStream::from_raw_socket(
318 upgraded.into_inner(),
319 Role::Server,
320 Some(WebSocketConfig::default().max_message_size(
321 Some(10 * 2_usize.pow(20)), // 10MiB, matching jetstream
322 )),
323 )
324 .await;
325
326 let Context { b, d, shutdown, .. } = reqctx.context();
327 let sub_token = shutdown.child_token();
328
329 let q = scalar_query.into_inner();
330 let subscription = if q.instant { b } else { d }.subscribe();
331 log::info!("starting subscriber with broadcast: instant={}", q.instant);
332
333 Subscriber::new(query, sub_token)
334 .start(ws, subscription)
335 .await
336 .map_err(|e| format!("boo: {e:?}"))?;
337
338 Ok(())
339}