Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::{CachedRecord, Identity, Repo, error::ServerError}; 2use atrium_api::types::string::{Cid, Did, Handle, Nsid, RecordKey}; 3use foyer::HybridCache; 4use serde::Serialize; 5use std::path::PathBuf; 6use std::str::FromStr; 7use std::sync::Arc; 8use tokio_util::sync::CancellationToken; 9 10use poem::{ 11 Endpoint, EndpointExt, Route, Server, 12 endpoint::make_sync, 13 http::Method, 14 listener::{ 15 Listener, TcpListener, 16 acme::{AutoCert, LETS_ENCRYPT_PRODUCTION}, 17 }, 18 middleware::{Cors, Tracing}, 19}; 20use poem_openapi::{ 21 ApiResponse, Object, OpenApi, OpenApiService, param::Query, payload::Json, types::Example, 22}; 23 24fn example_did() -> String { 25 "did:plc:hdhoaan3xa3jiuq4fg4mefid".to_string() 26} 27fn example_collection() -> String { 28 "app.bsky.feed.like".to_string() 29} 30fn example_rkey() -> String { 31 "3lv4ouczo2b2a".to_string() 32} 33 34#[derive(Object)] 35#[oai(example = true)] 36struct XrpcErrorResponseObject { 37 /// Should correspond an error `name` in the lexicon errors array 38 error: String, 39 /// Human-readable description and possibly additonal context 40 message: String, 41} 42impl Example for XrpcErrorResponseObject { 43 fn example() -> Self { 44 Self { 45 error: "RecordNotFound".to_string(), 46 message: "This record was deleted".to_string(), 47 } 48 } 49} 50type XrpcError = Json<XrpcErrorResponseObject>; 51fn xrpc_error(error: impl AsRef<str>, message: impl AsRef<str>) -> XrpcError { 52 Json(XrpcErrorResponseObject { 53 error: error.as_ref().to_string(), 54 message: message.as_ref().to_string(), 55 }) 56} 57 58fn bad_request_handler(err: poem::Error) -> GetRecordResponse { 59 GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject { 60 error: "InvalidRequest".to_string(), 61 message: format!("Bad request, here's some info that maybe should not be exposed: {err}"), 62 })) 63} 64 65#[derive(Object)] 66#[oai(example = true)] 67struct FoundRecordResponseObject { 68 /// at-uri for this record 69 uri: String, 70 /// CID for this exact version of the record 71 /// 72 /// Slingshot will always return the CID, despite it not being a required 73 /// response property in the official lexicon. 74 cid: Option<String>, 75 /// the record itself as JSON 76 value: serde_json::Value, 77} 78impl Example for FoundRecordResponseObject { 79 fn example() -> Self { 80 Self { 81 uri: format!( 82 "at://{}/{}/{}", 83 example_did(), 84 example_collection(), 85 example_rkey() 86 ), 87 cid: Some("bafyreialv3mzvvxaoyrfrwoer3xmabbmdchvrbyhayd7bga47qjbycy74e".to_string()), 88 value: serde_json::json!({ 89 "$type": "app.bsky.feed.like", 90 "createdAt": "2025-07-29T18:02:02.327Z", 91 "subject": { 92 "cid": "bafyreia2gy6eyk5qfetgahvshpq35vtbwy6negpy3gnuulcdi723mi7vxy", 93 "uri": "at://did:plc:vwzwgnygau7ed7b7wt5ux7y2/app.bsky.feed.post/3lv4lkb4vgs2k" 94 } 95 }), 96 } 97 } 98} 99 100#[derive(ApiResponse)] 101#[oai(bad_request_handler = "bad_request_handler")] 102enum GetRecordResponse { 103 /// Record found 104 #[oai(status = 200)] 105 Ok(Json<FoundRecordResponseObject>), 106 /// Bad request or no record to return 107 /// 108 /// The only error name in the repo.getRecord lexicon is `RecordNotFound`, 109 /// but the [canonical api docs](https://docs.bsky.app/docs/api/com-atproto-repo-get-record) 110 /// also list `InvalidRequest`, `ExpiredToken`, and `InvalidToken`. Of 111 /// these, slingshot will only return `RecordNotFound` or `InvalidRequest`. 112 #[oai(status = 400)] 113 BadRequest(XrpcError), 114 /// Just using 500 for potentially upstream errors for now 115 #[oai(status = 500)] 116 ServerError(XrpcError), 117} 118 119struct Xrpc { 120 cache: HybridCache<String, CachedRecord>, 121 identity: Identity, 122 repo: Arc<Repo>, 123} 124 125#[OpenApi] 126impl Xrpc { 127 /// com.atproto.repo.getRecord 128 /// 129 /// Get a single record from a repository. Does not require auth. 130 /// 131 /// See https://docs.bsky.app/docs/api/com-atproto-repo-get-record for the 132 /// canonical XRPC documentation that this endpoint aims to be compatible 133 /// with. 134 #[oai(path = "/com.atproto.repo.getRecord", method = "get")] 135 async fn get_record( 136 &self, 137 /// The DID or handle of the repo 138 #[oai(example = "example_did")] 139 Query(repo): Query<String>, 140 /// The NSID of the record collection 141 #[oai(example = "example_collection")] 142 Query(collection): Query<String>, 143 /// The Record key 144 #[oai(example = "example_rkey")] 145 Query(rkey): Query<String>, 146 /// Optional: the CID of the version of the record. 147 /// 148 /// If not specified, then return the most recent version. 149 /// 150 /// If specified and a newer version of the record exists, returns 404 not 151 /// found. That is: slingshot only retains the most recent version of a 152 /// record. (TODO: verify bsky behaviour for mismatched/old CID) 153 Query(cid): Query<Option<String>>, 154 ) -> GetRecordResponse { 155 let did = match Did::new(repo.clone()) { 156 Ok(did) => did, 157 Err(_) => { 158 let Ok(handle) = Handle::new(repo) else { 159 return GetRecordResponse::BadRequest(xrpc_error( 160 "InvalidRequest", 161 "repo was not a valid DID or handle", 162 )); 163 }; 164 if let Ok(res) = self.identity.handle_to_did(handle).await { 165 if let Some(did) = res { 166 did 167 } else { 168 return GetRecordResponse::BadRequest(xrpc_error( 169 "InvalidRequest", 170 "Could not resolve handle repo to a DID", 171 )); 172 } 173 } else { 174 return GetRecordResponse::ServerError(xrpc_error( 175 "ResolutionFailed", 176 "errored while trying to resolve handle to DID", 177 )); 178 } 179 } 180 }; 181 182 let Ok(collection) = Nsid::new(collection) else { 183 return GetRecordResponse::BadRequest(xrpc_error( 184 "InvalidRequest", 185 "invalid NSID for collection", 186 )); 187 }; 188 189 let Ok(rkey) = RecordKey::new(rkey) else { 190 return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "invalid rkey")); 191 }; 192 193 let cid: Option<Cid> = if let Some(cid) = cid { 194 let Ok(cid) = Cid::from_str(&cid) else { 195 return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "invalid CID")); 196 }; 197 Some(cid) 198 } else { 199 None 200 }; 201 202 let at_uri = format!("at://{}/{}/{}", &*did, &*collection, &*rkey); 203 204 let entry = self 205 .cache 206 .fetch(at_uri.clone(), { 207 let cid = cid.clone(); 208 let repo_api = self.repo.clone(); 209 || async move { 210 repo_api 211 .get_record(&did, &collection, &rkey, &cid) 212 .await 213 .map_err(|e| foyer::Error::Other(Box::new(e))) 214 } 215 }) 216 .await 217 .unwrap(); // todo 218 219 // TODO: actual 404 220 221 match *entry { 222 CachedRecord::Found(ref raw) => { 223 let (found_cid, raw_value) = raw.into(); 224 if cid.clone().map(|c| c != found_cid).unwrap_or(false) { 225 return GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject { 226 error: "RecordNotFound".to_string(), 227 message: "A record was found but its CID did not match that requested" 228 .to_string(), 229 })); 230 } 231 // TODO: thank u stellz: https://gist.github.com/stella3d/51e679e55b264adff89d00a1e58d0272 232 let value = 233 serde_json::from_str(raw_value.get()).expect("RawValue to be valid json"); 234 GetRecordResponse::Ok(Json(FoundRecordResponseObject { 235 uri: at_uri, 236 cid: Some(found_cid.as_ref().to_string()), 237 value, 238 })) 239 } 240 CachedRecord::Deleted => GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject { 241 error: "RecordNotFound".to_string(), 242 message: "This record was deleted".to_string(), 243 })), 244 } 245 } 246 247 // TODO 248 // #[oai(path = "/com.atproto.identity.resolveHandle", method = "get")] 249 // #[oai(path = "/com.atproto.identity.resolveDid", method = "get")] 250 // but these are both not specified to do bidirectional validation, which is what we want to offer 251 // com.atproto.identity.resolveIdentity seems right, but requires returning the full did-doc 252 // would be nice if there were two queries: 253 // did -> verified handle + pds url 254 // handle -> verified did + pds url 255 // 256 // we could do horrible things and implement resolveIdentity with only a stripped-down fake did doc 257 // but this will *definitely* cause problems because eg. we're not currently storing pubkeys and 258 // those are a little bit important 259} 260 261#[derive(Debug, Clone, Serialize)] 262#[serde(rename_all = "camelCase")] 263struct AppViewService { 264 id: String, 265 r#type: String, 266 service_endpoint: String, 267} 268#[derive(Debug, Clone, Serialize)] 269struct AppViewDoc { 270 id: String, 271 service: [AppViewService; 1], 272} 273/// Serve a did document for did:web for this to be an xrpc appview 274/// 275/// No slingshot endpoints currently require auth, so it's not necessary to do 276/// service proxying, however clients may wish to: 277/// 278/// - PDS proxying offers a level of client IP anonymity from slingshot 279/// - slingshot *may* implement more generous per-user rate-limits for proxied requests in the future 280fn get_did_doc(host: &str) -> impl Endpoint + use<> { 281 let doc = poem::web::Json(AppViewDoc { 282 id: format!("did:web:{host}"), 283 service: [AppViewService { 284 id: "#slingshot".to_string(), 285 r#type: "SlingshotRecordProxy".to_string(), 286 service_endpoint: format!("https://{host}"), 287 }], 288 }); 289 make_sync(move |_| doc.clone()) 290} 291 292pub async fn serve( 293 cache: HybridCache<String, CachedRecord>, 294 identity: Identity, 295 repo: Repo, 296 host: Option<String>, 297 certs: Option<PathBuf>, 298 _shutdown: CancellationToken, 299) -> Result<(), ServerError> { 300 let repo = Arc::new(repo); 301 let api_service = OpenApiService::new( 302 Xrpc { 303 cache, 304 identity, 305 repo, 306 }, 307 "Slingshot", 308 env!("CARGO_PKG_VERSION"), 309 ) 310 .server("http://localhost:3000") 311 .url_prefix("/xrpc"); 312 313 let mut app = Route::new() 314 .nest("/", api_service.scalar()) 315 .nest("/openapi.json", api_service.spec_endpoint()) 316 .nest("/xrpc/", api_service); 317 318 if let Some(host) = host { 319 rustls::crypto::aws_lc_rs::default_provider() 320 .install_default() 321 .expect("alskfjalksdjf"); 322 323 app = app.at("/.well-known/did.json", get_did_doc(&host)); 324 325 let mut auto_cert = AutoCert::builder() 326 .directory_url(LETS_ENCRYPT_PRODUCTION) 327 .domain(&host); 328 if let Some(certs) = certs { 329 auto_cert = auto_cert.cache_path(certs) 330 } 331 let auto_cert = auto_cert.build().map_err(ServerError::AcmeBuildError)?; 332 333 run(TcpListener::bind("0.0.0.0:443").acme(auto_cert), app).await 334 } else { 335 run(TcpListener::bind("127.0.0.1:3000"), app).await 336 } 337} 338 339async fn run<L>(listener: L, app: Route) -> Result<(), ServerError> 340where 341 L: Listener + 'static, 342{ 343 let app = app 344 .with( 345 Cors::new() 346 .allow_origin_regex("*") 347 .allow_methods([Method::GET]) 348 .allow_credentials(false), 349 ) 350 .with(Tracing); 351 Server::new(listener) 352 .name("slingshot") 353 .run(app) 354 .await 355 .map_err(ServerError::ServerExited) 356}