Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
at pocket 11 kB view raw
1use crate::ClientMessage; 2use crate::error::ServerError; 3use crate::subscriber::Subscriber; 4use dropshot::{ 5 ApiDescription, ApiEndpointBodyContentType, Body, ConfigDropshot, ConfigLogging, 6 ConfigLoggingLevel, ExtractorMetadata, HttpError, HttpResponse, Query, RequestContext, 7 ServerBuilder, ServerContext, SharedExtractor, WebsocketConnection, channel, endpoint, 8}; 9use http::{ 10 Response, StatusCode, 11 header::{ORIGIN, USER_AGENT}, 12}; 13use metrics::{counter, histogram}; 14use std::sync::Arc; 15 16use async_trait::async_trait; 17use schemars::JsonSchema; 18use serde::{Deserialize, Serialize}; 19use std::collections::HashSet; 20use tokio::sync::broadcast; 21use tokio::time::Instant; 22use tokio_tungstenite::tungstenite::protocol::{Role, WebSocketConfig}; 23use tokio_util::sync::CancellationToken; 24 25const INDEX_HTML: &str = include_str!("../static/index.html"); 26const FAVICON: &[u8] = include_bytes!("../static/favicon.ico"); 27 28pub async fn serve( 29 b: broadcast::Sender<Arc<ClientMessage>>, 30 d: broadcast::Sender<Arc<ClientMessage>>, 31 shutdown: CancellationToken, 32) -> Result<(), ServerError> { 33 let config_logging = ConfigLogging::StderrTerminal { 34 level: ConfigLoggingLevel::Info, 35 }; 36 37 let log = config_logging 38 .to_logger("example-basic") 39 .map_err(ServerError::ConfigLogError)?; 40 41 let mut api = ApiDescription::new(); 42 api.register(index).unwrap(); 43 api.register(favicon).unwrap(); 44 api.register(openapi).unwrap(); 45 api.register(subscribe).unwrap(); 46 47 // TODO: put spec in a once cell / lazy lock thing? 48 let spec = Arc::new( 49 api.openapi( 50 "Spacedust", 51 env!("CARGO_PKG_VERSION") 52 .parse() 53 .inspect_err(|e| { 54 eprintln!("failed to parse cargo package version for openapi: {e:?}") 55 }) 56 .unwrap_or(semver::Version::new(0, 0, 1)), 57 ) 58 .description("A configurable ATProto notifications firehose.") 59 .contact_name("part of @microcosm.blue") 60 .contact_url("https://microcosm.blue") 61 .json() 62 .map_err(ServerError::OpenApiJsonFail)?, 63 ); 64 65 let sub_shutdown = shutdown.clone(); 66 let ctx = Context { 67 spec, 68 b, 69 d, 70 shutdown: sub_shutdown, 71 }; 72 73 let server = ServerBuilder::new(api, ctx, log) 74 .config(ConfigDropshot { 75 bind_address: "0.0.0.0:9998".parse().unwrap(), 76 ..Default::default() 77 }) 78 .start()?; 79 80 tokio::select! { 81 s = server.wait_for_shutdown() => { 82 s.map_err(ServerError::ServerExited)?; 83 log::info!("server shut down normally."); 84 }, 85 _ = shutdown.cancelled() => { 86 log::info!("shutting down: closing server"); 87 server.close().await.map_err(ServerError::BadClose)?; 88 }, 89 } 90 Ok(()) 91} 92 93#[derive(Debug, Clone)] 94struct Context { 95 pub spec: Arc<serde_json::Value>, 96 pub b: broadcast::Sender<Arc<ClientMessage>>, 97 pub d: broadcast::Sender<Arc<ClientMessage>>, 98 pub shutdown: CancellationToken, 99} 100 101async fn instrument_handler<T, H, R>(ctx: &RequestContext<T>, handler: H) -> Result<R, HttpError> 102where 103 R: HttpResponse, 104 H: Future<Output = Result<R, HttpError>>, 105 T: ServerContext, 106{ 107 let start = Instant::now(); 108 let result = handler.await; 109 let latency = start.elapsed(); 110 let status_code = match &result { 111 Ok(response) => response.status_code(), 112 Err(e) => e.status_code.as_status(), 113 } 114 .as_str() // just the number (.to_string()'s Display does eg `200 OK`) 115 .to_string(); 116 let endpoint = ctx.endpoint.operation_id.clone(); 117 let headers = ctx.request.headers(); 118 let origin = headers 119 .get(ORIGIN) 120 .and_then(|v| v.to_str().ok()) 121 .unwrap_or("") 122 .to_string(); 123 let ua = headers 124 .get(USER_AGENT) 125 .and_then(|v| v.to_str().ok()) 126 .map(|ua| { 127 if ua.starts_with("Mozilla/5.0 ") { 128 "browser" 129 } else { 130 ua 131 } 132 }) 133 .unwrap_or("") 134 .to_string(); 135 counter!("server_requests_total", 136 "endpoint" => endpoint.clone(), 137 "origin" => origin, 138 "ua" => ua, 139 "status_code" => status_code, 140 ) 141 .increment(1); 142 histogram!("server_handler_latency", "endpoint" => endpoint).record(latency.as_micros() as f64); 143 result 144} 145 146use dropshot::{HttpResponseHeaders, HttpResponseOk}; 147 148pub type OkCorsResponse<T> = Result<HttpResponseHeaders<HttpResponseOk<T>>, HttpError>; 149 150/// Helper for constructing Ok responses: return OkCors(T).into() 151/// (not happy with this yet) 152pub struct OkCors<T: Serialize + JsonSchema + Send + Sync>(pub T); 153 154impl<T> From<OkCors<T>> for OkCorsResponse<T> 155where 156 T: Serialize + JsonSchema + Send + Sync, 157{ 158 fn from(ok: OkCors<T>) -> OkCorsResponse<T> { 159 let mut res = HttpResponseHeaders::new_unnamed(HttpResponseOk(ok.0)); 160 res.headers_mut() 161 .insert("access-control-allow-origin", "*".parse().unwrap()); 162 Ok(res) 163 } 164} 165 166// TODO: cors for HttpError 167 168/// Serve index page as html 169#[endpoint { 170 method = GET, 171 path = "/", 172 /* 173 * not useful to have this in openapi 174 */ 175 unpublished = true, 176}] 177async fn index(ctx: RequestContext<Context>) -> Result<Response<Body>, HttpError> { 178 instrument_handler(&ctx, async { 179 Ok(Response::builder() 180 .status(StatusCode::OK) 181 .header(http::header::CONTENT_TYPE, "text/html") 182 .body(INDEX_HTML.into())?) 183 }) 184 .await 185} 186 187/// Serve index page as html 188#[endpoint { 189 method = GET, 190 path = "/favicon.ico", 191 /* 192 * not useful to have this in openapi 193 */ 194 unpublished = true, 195}] 196async fn favicon(ctx: RequestContext<Context>) -> Result<Response<Body>, HttpError> { 197 instrument_handler(&ctx, async { 198 Ok(Response::builder() 199 .status(StatusCode::OK) 200 .header(http::header::CONTENT_TYPE, "image/x-icon") 201 .body(FAVICON.to_vec().into())?) 202 }) 203 .await 204} 205 206/// Meta: get the openapi spec for this api 207#[endpoint { 208 method = GET, 209 path = "/openapi", 210 /* 211 * not useful to have this in openapi 212 */ 213 unpublished = true, 214}] 215async fn openapi(ctx: RequestContext<Context>) -> OkCorsResponse<serde_json::Value> { 216 instrument_handler(&ctx, async { 217 let spec = (*ctx.context().spec).clone(); 218 OkCors(spec).into() 219 }) 220 .await 221} 222 223/// The real type that gets deserialized 224#[derive(Debug, Deserialize, JsonSchema)] 225#[serde(rename_all = "camelCase")] 226pub struct MultiSubscribeQuery { 227 #[serde(default)] 228 pub wanted_subjects: HashSet<String>, 229 #[serde(default)] 230 pub wanted_subject_dids: HashSet<String>, 231 #[serde(default)] 232 pub wanted_sources: HashSet<String>, 233} 234/// The fake corresponding type for docs that dropshot won't freak out about a 235/// vec for 236#[derive(Deserialize, JsonSchema)] 237#[allow(dead_code)] 238#[serde(rename_all = "camelCase")] 239struct MultiSubscribeQueryForDocs { 240 /// One or more at-uris to receive links about 241 /// 242 /// The at-uri must be url-encoded 243 /// 244 /// Pass this parameter multiple times to specify multiple collections, like 245 /// `wantedSubjects=[...]&wantedSubjects=[...]` 246 pub wanted_subjects: String, 247 /// One or more DIDs to receive links about 248 /// 249 /// Pass this parameter multiple times to specify multiple collections 250 pub wanted_subject_dids: String, 251 /// One or more link sources to receive links about 252 /// 253 /// TODO: docs about link sources 254 /// 255 /// eg, a bluesky like's link source: `app.bsky.feed.like:subject.uri` 256 /// 257 /// Pass this parameter multiple times to specify multiple sources 258 pub wanted_sources: String, 259} 260 261// The `SharedExtractor` implementation for Query<QueryType> describes how to 262// construct an instance of `Query<QueryType>` from an HTTP request: namely, by 263// parsing the query string to an instance of `QueryType`. 264#[async_trait] 265impl SharedExtractor for MultiSubscribeQuery { 266 async fn from_request<Context: ServerContext>( 267 ctx: &RequestContext<Context>, 268 ) -> Result<MultiSubscribeQuery, HttpError> { 269 let raw_query = ctx.request.uri().query().unwrap_or(""); 270 let q = serde_qs::from_str(raw_query).map_err(|e| { 271 HttpError::for_bad_request(None, format!("unable to parse query string: {e}")) 272 })?; 273 Ok(q) 274 } 275 276 fn metadata(body_content_type: ApiEndpointBodyContentType) -> ExtractorMetadata { 277 // HACK: query type switcheroo: passing MultiSubscribeQuery to 278 // `metadata` would "helpfully" panic because dropshot believes we can 279 // only have scalar types in a query. 280 // 281 // so instead we have a fake second type whose only job is to look the 282 // same as MultiSubscribeQuery exept that it has `String` instead of 283 // `Vec<String>`, which dropshot will accept, and generate ~close-enough 284 // docs for. 285 <Query<MultiSubscribeQueryForDocs> as SharedExtractor>::metadata(body_content_type) 286 } 287} 288 289#[derive(Deserialize, JsonSchema)] 290#[serde(rename_all = "camelCase")] 291struct ScalarSubscribeQuery { 292 /// Bypass the 21-sec delay buffer 293 /// 294 /// By default, spacedust holds all firehose links for 21 seconds before 295 /// emitting them, to prevent quickly- undone interactions from generating 296 /// notifications. 297 /// 298 /// Setting `instant` to true bypasses this buffer, allowing faster (and 299 /// noisier) notification delivery. 300 /// 301 /// Typically [a little less than 1%](https://bsky.app/profile/bad-example.com/post/3ls32wctsrs2l) 302 /// of links links get deleted within 21s of being created. 303 #[serde(default)] 304 pub instant: bool, 305} 306 307#[channel { 308 protocol = WEBSOCKETS, 309 path = "/subscribe", 310}] 311async fn subscribe( 312 reqctx: RequestContext<Context>, 313 query: MultiSubscribeQuery, 314 scalar_query: Query<ScalarSubscribeQuery>, 315 upgraded: WebsocketConnection, 316) -> dropshot::WebsocketChannelResult { 317 let ws = tokio_tungstenite::WebSocketStream::from_raw_socket( 318 upgraded.into_inner(), 319 Role::Server, 320 Some(WebSocketConfig::default().max_message_size( 321 Some(10 * 2_usize.pow(20)), // 10MiB, matching jetstream 322 )), 323 ) 324 .await; 325 326 let Context { b, d, shutdown, .. } = reqctx.context(); 327 let sub_token = shutdown.child_token(); 328 329 let q = scalar_query.into_inner(); 330 let subscription = if q.instant { b } else { d }.subscribe(); 331 log::info!("starting subscriber with broadcast: instant={}", q.instant); 332 333 Subscriber::new(query, sub_token) 334 .start(ws, subscription) 335 .await 336 .map_err(|e| format!("boo: {e:?}"))?; 337 338 Ok(()) 339}