Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::{ 2 CachedRecord, ErrorResponseObject, Identity, Repo, 3 error::{RecordError, ServerError}, 4}; 5use atrium_api::types::string::{Cid, Did, Handle, Nsid, RecordKey}; 6use foyer::HybridCache; 7use links::at_uri::parse_at_uri as normalize_at_uri; 8use serde::Serialize; 9use std::path::PathBuf; 10use std::str::FromStr; 11use std::sync::Arc; 12use tokio_util::sync::CancellationToken; 13 14use poem::{ 15 Endpoint, EndpointExt, Route, Server, 16 endpoint::make_sync, 17 http::Method, 18 listener::{ 19 Listener, TcpListener, 20 acme::{AutoCert, LETS_ENCRYPT_PRODUCTION}, 21 }, 22 middleware::{Cors, Tracing}, 23}; 24use poem_openapi::{ 25 ApiResponse, Object, OpenApi, OpenApiService, param::Query, payload::Json, types::Example, 26}; 27 28fn example_did() -> String { 29 "did:plc:hdhoaan3xa3jiuq4fg4mefid".to_string() 30} 31fn example_collection() -> String { 32 "app.bsky.feed.like".to_string() 33} 34fn example_rkey() -> String { 35 "3lv4ouczo2b2a".to_string() 36} 37fn example_uri() -> String { 38 format!( 39 "at://{}/{}/{}", 40 example_did(), 41 example_collection(), 42 example_rkey() 43 ) 44} 45 46#[derive(Object)] 47#[oai(example = true)] 48struct XrpcErrorResponseObject { 49 /// Should correspond an error `name` in the lexicon errors array 50 error: String, 51 /// Human-readable description and possibly additonal context 52 message: String, 53} 54impl Example for XrpcErrorResponseObject { 55 fn example() -> Self { 56 Self { 57 error: "RecordNotFound".to_string(), 58 message: "This record was deleted".to_string(), 59 } 60 } 61} 62type XrpcError = Json<XrpcErrorResponseObject>; 63fn xrpc_error(error: impl AsRef<str>, message: impl AsRef<str>) -> XrpcError { 64 Json(XrpcErrorResponseObject { 65 error: error.as_ref().to_string(), 66 message: message.as_ref().to_string(), 67 }) 68} 69 70fn bad_request_handler(err: poem::Error) -> GetRecordResponse { 71 GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject { 72 error: "InvalidRequest".to_string(), 73 message: format!("Bad request, here's some info that maybe should not be exposed: {err}"), 74 })) 75} 76 77#[derive(Object)] 78#[oai(example = true)] 79struct FoundRecordResponseObject { 80 /// at-uri for this record 81 uri: String, 82 /// CID for this exact version of the record 83 /// 84 /// Slingshot will always return the CID, despite it not being a required 85 /// response property in the official lexicon. 86 /// 87 /// TODO: probably actually let it be optional, idk are some pds's weirdly 88 /// not returning it? 89 cid: Option<String>, 90 /// the record itself as JSON 91 value: serde_json::Value, 92} 93impl Example for FoundRecordResponseObject { 94 fn example() -> Self { 95 Self { 96 uri: example_uri(), 97 cid: Some("bafyreialv3mzvvxaoyrfrwoer3xmabbmdchvrbyhayd7bga47qjbycy74e".to_string()), 98 value: serde_json::json!({ 99 "$type": "app.bsky.feed.like", 100 "createdAt": "2025-07-29T18:02:02.327Z", 101 "subject": { 102 "cid": "bafyreia2gy6eyk5qfetgahvshpq35vtbwy6negpy3gnuulcdi723mi7vxy", 103 "uri": "at://did:plc:vwzwgnygau7ed7b7wt5ux7y2/app.bsky.feed.post/3lv4lkb4vgs2k" 104 } 105 }), 106 } 107 } 108} 109 110#[derive(ApiResponse)] 111#[oai(bad_request_handler = "bad_request_handler")] 112enum GetRecordResponse { 113 /// Record found 114 #[oai(status = 200)] 115 Ok(Json<FoundRecordResponseObject>), 116 /// Bad request or no record to return 117 /// 118 /// The only error name in the repo.getRecord lexicon is `RecordNotFound`, 119 /// but the [canonical api docs](https://docs.bsky.app/docs/api/com-atproto-repo-get-record) 120 /// also list `InvalidRequest`, `ExpiredToken`, and `InvalidToken`. Of 121 /// these, slingshot will only generate `RecordNotFound` or `InvalidRequest`, 122 /// but may return any proxied error code from the upstream repo. 123 #[oai(status = 400)] 124 BadRequest(XrpcError), 125 /// Server errors 126 #[oai(status = 500)] 127 ServerError(XrpcError), 128} 129 130struct Xrpc { 131 cache: HybridCache<String, CachedRecord>, 132 identity: Identity, 133 repo: Arc<Repo>, 134} 135 136#[OpenApi] 137impl Xrpc { 138 /// com.atproto.repo.getRecord 139 /// 140 /// Get a single record from a repository. Does not require auth. 141 /// 142 /// See also the [canonical `com.atproto` XRPC documentation](https://docs.bsky.app/docs/api/com-atproto-repo-get-record) 143 /// that this endpoint aims to be compatible with. 144 #[oai(path = "/com.atproto.repo.getRecord", method = "get")] 145 async fn get_record( 146 &self, 147 /// The DID or handle of the repo 148 #[oai(example = "example_did")] 149 Query(repo): Query<String>, 150 /// The NSID of the record collection 151 #[oai(example = "example_collection")] 152 Query(collection): Query<String>, 153 /// The Record key 154 #[oai(example = "example_rkey")] 155 Query(rkey): Query<String>, 156 /// Optional: the CID of the version of the record. 157 /// 158 /// If not specified, then return the most recent version. 159 /// 160 /// If specified and a newer version of the record exists, returns 404 not 161 /// found. That is: slingshot only retains the most recent version of a 162 /// record. (TODO: verify bsky behaviour for mismatched/old CID) 163 Query(cid): Query<Option<String>>, 164 ) -> GetRecordResponse { 165 self.get_record_impl(repo, collection, rkey, cid).await 166 } 167 168 /// com.bad-example.repo.getUriRecord 169 /// 170 /// Ergonomic complement to [`com.atproto.repo.getRecord`](https://docs.bsky.app/docs/api/com-atproto-repo-get-record) 171 /// which accepts an at-uri instead of individual rep/collection/rkey params 172 #[oai(path = "/com.bad-example.repo.getUriRecord", method = "get")] 173 async fn get_uri_record( 174 &self, 175 /// The at-uri of the record 176 /// 177 /// The identifier can be a DID or an atproto handle, and the collection 178 /// and rkey segments must be present. 179 #[oai(example = "example_uri")] 180 Query(at_uri): Query<String>, 181 /// Optional: the CID of the version of the record. 182 /// 183 /// If not specified, then return the most recent version. 184 /// 185 /// If specified and a newer version of the record exists, returns 404 not 186 /// found. That is: slingshot only retains the most recent version of a 187 /// record. 188 Query(cid): Query<Option<String>>, 189 ) -> GetRecordResponse { 190 let bad_at_uri = || { 191 GetRecordResponse::BadRequest(xrpc_error( 192 "InvalidRequest", 193 "at-uri does not appear to be valid", 194 )) 195 }; 196 197 let Some(normalized) = normalize_at_uri(&at_uri) else { 198 return bad_at_uri(); 199 }; 200 201 // TODO: move this to links 202 let Some(rest) = normalized.strip_prefix("at://") else { 203 return bad_at_uri(); 204 }; 205 let Some((repo, rest)) = rest.split_once('/') else { 206 return bad_at_uri(); 207 }; 208 let Some((collection, rest)) = rest.split_once('/') else { 209 return bad_at_uri(); 210 }; 211 let rkey = if let Some((rkey, _rest)) = rest.split_once('?') { 212 rkey 213 } else { 214 rest 215 }; 216 217 self.get_record_impl( 218 repo.to_string(), 219 collection.to_string(), 220 rkey.to_string(), 221 cid, 222 ) 223 .await 224 } 225 226 async fn get_record_impl( 227 &self, 228 repo: String, 229 collection: String, 230 rkey: String, 231 cid: Option<String>, 232 ) -> GetRecordResponse { 233 let did = match Did::new(repo.clone()) { 234 Ok(did) => did, 235 Err(_) => { 236 let Ok(handle) = Handle::new(repo) else { 237 return GetRecordResponse::BadRequest(xrpc_error( 238 "InvalidRequest", 239 "repo was not a valid DID or handle", 240 )); 241 }; 242 if let Ok(res) = self.identity.handle_to_did(handle).await { 243 if let Some(did) = res { 244 did 245 } else { 246 return GetRecordResponse::BadRequest(xrpc_error( 247 "InvalidRequest", 248 "Could not resolve handle repo to a DID", 249 )); 250 } 251 } else { 252 return GetRecordResponse::ServerError(xrpc_error( 253 "ResolutionFailed", 254 "errored while trying to resolve handle to DID", 255 )); 256 } 257 } 258 }; 259 260 let Ok(collection) = Nsid::new(collection) else { 261 return GetRecordResponse::BadRequest(xrpc_error( 262 "InvalidRequest", 263 "invalid NSID for collection", 264 )); 265 }; 266 267 let Ok(rkey) = RecordKey::new(rkey) else { 268 return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "invalid rkey")); 269 }; 270 271 let cid: Option<Cid> = if let Some(cid) = cid { 272 let Ok(cid) = Cid::from_str(&cid) else { 273 return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "invalid CID")); 274 }; 275 Some(cid) 276 } else { 277 None 278 }; 279 280 let at_uri = format!("at://{}/{}/{}", &*did, &*collection, &*rkey); 281 282 let fr = self 283 .cache 284 .fetch(at_uri.clone(), { 285 let cid = cid.clone(); 286 let repo_api = self.repo.clone(); 287 || async move { 288 repo_api 289 .get_record(&did, &collection, &rkey, &cid) 290 .await 291 .map_err(|e| foyer::Error::Other(Box::new(e))) 292 } 293 }) 294 .await; 295 296 let entry = match fr { 297 Ok(e) => e, 298 Err(foyer::Error::Other(e)) => { 299 let record_error = match e.downcast::<RecordError>() { 300 Ok(e) => e, 301 Err(e) => { 302 log::error!("error (foyer other) getting cache entry, {e:?}"); 303 return GetRecordResponse::ServerError(xrpc_error( 304 "ServerError", 305 "sorry, something went wrong", 306 )); 307 } 308 }; 309 let RecordError::UpstreamBadRequest(ErrorResponseObject { error, message }) = 310 *record_error 311 else { 312 log::error!("RecordError getting cache entry, {record_error:?}"); 313 return GetRecordResponse::ServerError(xrpc_error( 314 "ServerError", 315 "sorry, something went wrong", 316 )); 317 }; 318 319 // all of the noise around here is so that we can ultimately reach this: 320 // upstream BadRequest extracted from the foyer result which we can proxy back 321 return GetRecordResponse::BadRequest(xrpc_error( 322 error, 323 format!("Upstream bad request: {message}"), 324 )); 325 } 326 Err(e) => { 327 log::error!("error (foyer) getting cache entry, {e:?}"); 328 return GetRecordResponse::ServerError(xrpc_error( 329 "ServerError", 330 "sorry, something went wrong", 331 )); 332 } 333 }; 334 335 match *entry { 336 CachedRecord::Found(ref raw) => { 337 let (found_cid, raw_value) = raw.into(); 338 if cid.clone().map(|c| c != found_cid).unwrap_or(false) { 339 return GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject { 340 error: "RecordNotFound".to_string(), 341 message: "A record was found but its CID did not match that requested" 342 .to_string(), 343 })); 344 } 345 // TODO: thank u stellz: https://gist.github.com/stella3d/51e679e55b264adff89d00a1e58d0272 346 let value = 347 serde_json::from_str(raw_value.get()).expect("RawValue to be valid json"); 348 GetRecordResponse::Ok(Json(FoundRecordResponseObject { 349 uri: at_uri, 350 cid: Some(found_cid.as_ref().to_string()), 351 value, 352 })) 353 } 354 CachedRecord::Deleted => GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject { 355 error: "RecordNotFound".to_string(), 356 message: "This record was deleted".to_string(), 357 })), 358 } 359 } 360 361 // TODO 362 // #[oai(path = "/com.atproto.identity.resolveHandle", method = "get")] 363 // #[oai(path = "/com.atproto.identity.resolveDid", method = "get")] 364 // but these are both not specified to do bidirectional validation, which is what we want to offer 365 // com.atproto.identity.resolveIdentity seems right, but requires returning the full did-doc 366 // would be nice if there were two queries: 367 // did -> verified handle + pds url 368 // handle -> verified did + pds url 369 // 370 // we could do horrible things and implement resolveIdentity with only a stripped-down fake did doc 371 // but this will *definitely* cause problems because eg. we're not currently storing pubkeys and 372 // those are a little bit important 373} 374 375#[derive(Debug, Clone, Serialize)] 376#[serde(rename_all = "camelCase")] 377struct AppViewService { 378 id: String, 379 r#type: String, 380 service_endpoint: String, 381} 382#[derive(Debug, Clone, Serialize)] 383struct AppViewDoc { 384 id: String, 385 service: [AppViewService; 1], 386} 387/// Serve a did document for did:web for this to be an xrpc appview 388/// 389/// No slingshot endpoints currently require auth, so it's not necessary to do 390/// service proxying, however clients may wish to: 391/// 392/// - PDS proxying offers a level of client IP anonymity from slingshot 393/// - slingshot *may* implement more generous per-user rate-limits for proxied requests in the future 394fn get_did_doc(host: &str) -> impl Endpoint + use<> { 395 let doc = poem::web::Json(AppViewDoc { 396 id: format!("did:web:{host}"), 397 service: [AppViewService { 398 id: "#slingshot".to_string(), 399 r#type: "SlingshotRecordProxy".to_string(), 400 service_endpoint: format!("https://{host}"), 401 }], 402 }); 403 make_sync(move |_| doc.clone()) 404} 405 406pub async fn serve( 407 cache: HybridCache<String, CachedRecord>, 408 identity: Identity, 409 repo: Repo, 410 host: Option<String>, 411 acme_contact: Option<String>, 412 certs: Option<PathBuf>, 413 _shutdown: CancellationToken, 414) -> Result<(), ServerError> { 415 let repo = Arc::new(repo); 416 let api_service = OpenApiService::new( 417 Xrpc { 418 cache, 419 identity, 420 repo, 421 }, 422 "Slingshot", 423 env!("CARGO_PKG_VERSION"), 424 ) 425 .server(if let Some(ref h) = host { 426 format!("https://{h}") 427 } else { 428 "http://localhost:3000".to_string() 429 }) 430 .url_prefix("/xrpc"); 431 432 let mut app = Route::new() 433 .nest("/", api_service.scalar()) 434 .nest("/openapi.json", api_service.spec_endpoint()) 435 .nest("/xrpc/", api_service); 436 437 if let Some(host) = host { 438 rustls::crypto::aws_lc_rs::default_provider() 439 .install_default() 440 .expect("alskfjalksdjf"); 441 442 app = app.at("/.well-known/did.json", get_did_doc(&host)); 443 444 let mut auto_cert = AutoCert::builder() 445 .directory_url(LETS_ENCRYPT_PRODUCTION) 446 .domain(&host); 447 if let Some(contact) = acme_contact { 448 auto_cert = auto_cert.contact(contact); 449 } 450 if let Some(certs) = certs { 451 auto_cert = auto_cert.cache_path(certs); 452 } 453 let auto_cert = auto_cert.build().map_err(ServerError::AcmeBuildError)?; 454 455 run(TcpListener::bind("0.0.0.0:443").acme(auto_cert), app).await 456 } else { 457 run(TcpListener::bind("127.0.0.1:3000"), app).await 458 } 459} 460 461async fn run<L>(listener: L, app: Route) -> Result<(), ServerError> 462where 463 L: Listener + 'static, 464{ 465 let app = app 466 .with( 467 Cors::new() 468 .allow_origin_regex("*") 469 .allow_methods([Method::GET]) 470 .allow_credentials(false), 471 ) 472 .with(Tracing); 473 Server::new(listener) 474 .name("slingshot") 475 .run(app) 476 .await 477 .map_err(ServerError::ServerExited) 478}