forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::{
2 CachedRecord, ErrorResponseObject, Identity, Repo,
3 error::{RecordError, ServerError},
4};
5use atrium_api::types::string::{Cid, Did, Handle, Nsid, RecordKey};
6use foyer::HybridCache;
7use links::at_uri::parse_at_uri as normalize_at_uri;
8use serde::Serialize;
9use std::path::PathBuf;
10use std::str::FromStr;
11use std::sync::Arc;
12use tokio_util::sync::CancellationToken;
13
14use poem::{
15 Endpoint, EndpointExt, Route, Server,
16 endpoint::make_sync,
17 http::Method,
18 listener::{
19 Listener, TcpListener,
20 acme::{AutoCert, LETS_ENCRYPT_PRODUCTION},
21 },
22 middleware::{Cors, Tracing},
23};
24use poem_openapi::{
25 ApiResponse, Object, OpenApi, OpenApiService, param::Query, payload::Json, types::Example,
26};
27
28fn example_did() -> String {
29 "did:plc:hdhoaan3xa3jiuq4fg4mefid".to_string()
30}
31fn example_collection() -> String {
32 "app.bsky.feed.like".to_string()
33}
34fn example_rkey() -> String {
35 "3lv4ouczo2b2a".to_string()
36}
37fn example_uri() -> String {
38 format!(
39 "at://{}/{}/{}",
40 example_did(),
41 example_collection(),
42 example_rkey()
43 )
44}
45
46#[derive(Object)]
47#[oai(example = true)]
48struct XrpcErrorResponseObject {
49 /// Should correspond an error `name` in the lexicon errors array
50 error: String,
51 /// Human-readable description and possibly additonal context
52 message: String,
53}
54impl Example for XrpcErrorResponseObject {
55 fn example() -> Self {
56 Self {
57 error: "RecordNotFound".to_string(),
58 message: "This record was deleted".to_string(),
59 }
60 }
61}
62type XrpcError = Json<XrpcErrorResponseObject>;
63fn xrpc_error(error: impl AsRef<str>, message: impl AsRef<str>) -> XrpcError {
64 Json(XrpcErrorResponseObject {
65 error: error.as_ref().to_string(),
66 message: message.as_ref().to_string(),
67 })
68}
69
70fn bad_request_handler(err: poem::Error) -> GetRecordResponse {
71 GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject {
72 error: "InvalidRequest".to_string(),
73 message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
74 }))
75}
76
77#[derive(Object)]
78#[oai(example = true)]
79struct FoundRecordResponseObject {
80 /// at-uri for this record
81 uri: String,
82 /// CID for this exact version of the record
83 ///
84 /// Slingshot will always return the CID, despite it not being a required
85 /// response property in the official lexicon.
86 ///
87 /// TODO: probably actually let it be optional, idk are some pds's weirdly
88 /// not returning it?
89 cid: Option<String>,
90 /// the record itself as JSON
91 value: serde_json::Value,
92}
93impl Example for FoundRecordResponseObject {
94 fn example() -> Self {
95 Self {
96 uri: example_uri(),
97 cid: Some("bafyreialv3mzvvxaoyrfrwoer3xmabbmdchvrbyhayd7bga47qjbycy74e".to_string()),
98 value: serde_json::json!({
99 "$type": "app.bsky.feed.like",
100 "createdAt": "2025-07-29T18:02:02.327Z",
101 "subject": {
102 "cid": "bafyreia2gy6eyk5qfetgahvshpq35vtbwy6negpy3gnuulcdi723mi7vxy",
103 "uri": "at://did:plc:vwzwgnygau7ed7b7wt5ux7y2/app.bsky.feed.post/3lv4lkb4vgs2k"
104 }
105 }),
106 }
107 }
108}
109
110#[derive(ApiResponse)]
111#[oai(bad_request_handler = "bad_request_handler")]
112enum GetRecordResponse {
113 /// Record found
114 #[oai(status = 200)]
115 Ok(Json<FoundRecordResponseObject>),
116 /// Bad request or no record to return
117 ///
118 /// The only error name in the repo.getRecord lexicon is `RecordNotFound`,
119 /// but the [canonical api docs](https://docs.bsky.app/docs/api/com-atproto-repo-get-record)
120 /// also list `InvalidRequest`, `ExpiredToken`, and `InvalidToken`. Of
121 /// these, slingshot will only generate `RecordNotFound` or `InvalidRequest`,
122 /// but may return any proxied error code from the upstream repo.
123 #[oai(status = 400)]
124 BadRequest(XrpcError),
125 /// Server errors
126 #[oai(status = 500)]
127 ServerError(XrpcError),
128}
129
130struct Xrpc {
131 cache: HybridCache<String, CachedRecord>,
132 identity: Identity,
133 repo: Arc<Repo>,
134}
135
136#[OpenApi]
137impl Xrpc {
138 /// com.atproto.repo.getRecord
139 ///
140 /// Get a single record from a repository. Does not require auth.
141 ///
142 /// See also the [canonical `com.atproto` XRPC documentation](https://docs.bsky.app/docs/api/com-atproto-repo-get-record)
143 /// that this endpoint aims to be compatible with.
144 #[oai(path = "/com.atproto.repo.getRecord", method = "get")]
145 async fn get_record(
146 &self,
147 /// The DID or handle of the repo
148 #[oai(example = "example_did")]
149 Query(repo): Query<String>,
150 /// The NSID of the record collection
151 #[oai(example = "example_collection")]
152 Query(collection): Query<String>,
153 /// The Record key
154 #[oai(example = "example_rkey")]
155 Query(rkey): Query<String>,
156 /// Optional: the CID of the version of the record.
157 ///
158 /// If not specified, then return the most recent version.
159 ///
160 /// If specified and a newer version of the record exists, returns 404 not
161 /// found. That is: slingshot only retains the most recent version of a
162 /// record. (TODO: verify bsky behaviour for mismatched/old CID)
163 Query(cid): Query<Option<String>>,
164 ) -> GetRecordResponse {
165 self.get_record_impl(repo, collection, rkey, cid).await
166 }
167
168 /// com.bad-example.repo.getUriRecord
169 ///
170 /// Ergonomic complement to [`com.atproto.repo.getRecord`](https://docs.bsky.app/docs/api/com-atproto-repo-get-record)
171 /// which accepts an at-uri instead of individual rep/collection/rkey params
172 #[oai(path = "/com.bad-example.repo.getUriRecord", method = "get")]
173 async fn get_uri_record(
174 &self,
175 /// The at-uri of the record
176 ///
177 /// The identifier can be a DID or an atproto handle, and the collection
178 /// and rkey segments must be present.
179 #[oai(example = "example_uri")]
180 Query(at_uri): Query<String>,
181 /// Optional: the CID of the version of the record.
182 ///
183 /// If not specified, then return the most recent version.
184 ///
185 /// If specified and a newer version of the record exists, returns 404 not
186 /// found. That is: slingshot only retains the most recent version of a
187 /// record.
188 Query(cid): Query<Option<String>>,
189 ) -> GetRecordResponse {
190 let bad_at_uri = || {
191 GetRecordResponse::BadRequest(xrpc_error(
192 "InvalidRequest",
193 "at-uri does not appear to be valid",
194 ))
195 };
196
197 let Some(normalized) = normalize_at_uri(&at_uri) else {
198 return bad_at_uri();
199 };
200
201 // TODO: move this to links
202 let Some(rest) = normalized.strip_prefix("at://") else {
203 return bad_at_uri();
204 };
205 let Some((repo, rest)) = rest.split_once('/') else {
206 return bad_at_uri();
207 };
208 let Some((collection, rest)) = rest.split_once('/') else {
209 return bad_at_uri();
210 };
211 let rkey = if let Some((rkey, _rest)) = rest.split_once('?') {
212 rkey
213 } else {
214 rest
215 };
216
217 self.get_record_impl(
218 repo.to_string(),
219 collection.to_string(),
220 rkey.to_string(),
221 cid,
222 )
223 .await
224 }
225
226 async fn get_record_impl(
227 &self,
228 repo: String,
229 collection: String,
230 rkey: String,
231 cid: Option<String>,
232 ) -> GetRecordResponse {
233 let did = match Did::new(repo.clone()) {
234 Ok(did) => did,
235 Err(_) => {
236 let Ok(handle) = Handle::new(repo) else {
237 return GetRecordResponse::BadRequest(xrpc_error(
238 "InvalidRequest",
239 "repo was not a valid DID or handle",
240 ));
241 };
242 if let Ok(res) = self.identity.handle_to_did(handle).await {
243 if let Some(did) = res {
244 did
245 } else {
246 return GetRecordResponse::BadRequest(xrpc_error(
247 "InvalidRequest",
248 "Could not resolve handle repo to a DID",
249 ));
250 }
251 } else {
252 return GetRecordResponse::ServerError(xrpc_error(
253 "ResolutionFailed",
254 "errored while trying to resolve handle to DID",
255 ));
256 }
257 }
258 };
259
260 let Ok(collection) = Nsid::new(collection) else {
261 return GetRecordResponse::BadRequest(xrpc_error(
262 "InvalidRequest",
263 "invalid NSID for collection",
264 ));
265 };
266
267 let Ok(rkey) = RecordKey::new(rkey) else {
268 return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "invalid rkey"));
269 };
270
271 let cid: Option<Cid> = if let Some(cid) = cid {
272 let Ok(cid) = Cid::from_str(&cid) else {
273 return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "invalid CID"));
274 };
275 Some(cid)
276 } else {
277 None
278 };
279
280 let at_uri = format!("at://{}/{}/{}", &*did, &*collection, &*rkey);
281
282 let fr = self
283 .cache
284 .fetch(at_uri.clone(), {
285 let cid = cid.clone();
286 let repo_api = self.repo.clone();
287 || async move {
288 repo_api
289 .get_record(&did, &collection, &rkey, &cid)
290 .await
291 .map_err(|e| foyer::Error::Other(Box::new(e)))
292 }
293 })
294 .await;
295
296 let entry = match fr {
297 Ok(e) => e,
298 Err(foyer::Error::Other(e)) => {
299 let record_error = match e.downcast::<RecordError>() {
300 Ok(e) => e,
301 Err(e) => {
302 log::error!("error (foyer other) getting cache entry, {e:?}");
303 return GetRecordResponse::ServerError(xrpc_error(
304 "ServerError",
305 "sorry, something went wrong",
306 ));
307 }
308 };
309 let RecordError::UpstreamBadRequest(ErrorResponseObject { error, message }) =
310 *record_error
311 else {
312 log::error!("RecordError getting cache entry, {record_error:?}");
313 return GetRecordResponse::ServerError(xrpc_error(
314 "ServerError",
315 "sorry, something went wrong",
316 ));
317 };
318
319 // all of the noise around here is so that we can ultimately reach this:
320 // upstream BadRequest extracted from the foyer result which we can proxy back
321 return GetRecordResponse::BadRequest(xrpc_error(
322 error,
323 format!("Upstream bad request: {message}"),
324 ));
325 }
326 Err(e) => {
327 log::error!("error (foyer) getting cache entry, {e:?}");
328 return GetRecordResponse::ServerError(xrpc_error(
329 "ServerError",
330 "sorry, something went wrong",
331 ));
332 }
333 };
334
335 match *entry {
336 CachedRecord::Found(ref raw) => {
337 let (found_cid, raw_value) = raw.into();
338 if cid.clone().map(|c| c != found_cid).unwrap_or(false) {
339 return GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject {
340 error: "RecordNotFound".to_string(),
341 message: "A record was found but its CID did not match that requested"
342 .to_string(),
343 }));
344 }
345 // TODO: thank u stellz: https://gist.github.com/stella3d/51e679e55b264adff89d00a1e58d0272
346 let value =
347 serde_json::from_str(raw_value.get()).expect("RawValue to be valid json");
348 GetRecordResponse::Ok(Json(FoundRecordResponseObject {
349 uri: at_uri,
350 cid: Some(found_cid.as_ref().to_string()),
351 value,
352 }))
353 }
354 CachedRecord::Deleted => GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject {
355 error: "RecordNotFound".to_string(),
356 message: "This record was deleted".to_string(),
357 })),
358 }
359 }
360
361 // TODO
362 // #[oai(path = "/com.atproto.identity.resolveHandle", method = "get")]
363 // #[oai(path = "/com.atproto.identity.resolveDid", method = "get")]
364 // but these are both not specified to do bidirectional validation, which is what we want to offer
365 // com.atproto.identity.resolveIdentity seems right, but requires returning the full did-doc
366 // would be nice if there were two queries:
367 // did -> verified handle + pds url
368 // handle -> verified did + pds url
369 //
370 // we could do horrible things and implement resolveIdentity with only a stripped-down fake did doc
371 // but this will *definitely* cause problems because eg. we're not currently storing pubkeys and
372 // those are a little bit important
373}
374
375#[derive(Debug, Clone, Serialize)]
376#[serde(rename_all = "camelCase")]
377struct AppViewService {
378 id: String,
379 r#type: String,
380 service_endpoint: String,
381}
382#[derive(Debug, Clone, Serialize)]
383struct AppViewDoc {
384 id: String,
385 service: [AppViewService; 1],
386}
387/// Serve a did document for did:web for this to be an xrpc appview
388///
389/// No slingshot endpoints currently require auth, so it's not necessary to do
390/// service proxying, however clients may wish to:
391///
392/// - PDS proxying offers a level of client IP anonymity from slingshot
393/// - slingshot *may* implement more generous per-user rate-limits for proxied requests in the future
394fn get_did_doc(host: &str) -> impl Endpoint + use<> {
395 let doc = poem::web::Json(AppViewDoc {
396 id: format!("did:web:{host}"),
397 service: [AppViewService {
398 id: "#slingshot".to_string(),
399 r#type: "SlingshotRecordProxy".to_string(),
400 service_endpoint: format!("https://{host}"),
401 }],
402 });
403 make_sync(move |_| doc.clone())
404}
405
406pub async fn serve(
407 cache: HybridCache<String, CachedRecord>,
408 identity: Identity,
409 repo: Repo,
410 host: Option<String>,
411 acme_contact: Option<String>,
412 certs: Option<PathBuf>,
413 _shutdown: CancellationToken,
414) -> Result<(), ServerError> {
415 let repo = Arc::new(repo);
416 let api_service = OpenApiService::new(
417 Xrpc {
418 cache,
419 identity,
420 repo,
421 },
422 "Slingshot",
423 env!("CARGO_PKG_VERSION"),
424 )
425 .server(if let Some(ref h) = host {
426 format!("https://{h}")
427 } else {
428 "http://localhost:3000".to_string()
429 })
430 .url_prefix("/xrpc");
431
432 let mut app = Route::new()
433 .nest("/", api_service.scalar())
434 .nest("/openapi.json", api_service.spec_endpoint())
435 .nest("/xrpc/", api_service);
436
437 if let Some(host) = host {
438 rustls::crypto::aws_lc_rs::default_provider()
439 .install_default()
440 .expect("alskfjalksdjf");
441
442 app = app.at("/.well-known/did.json", get_did_doc(&host));
443
444 let mut auto_cert = AutoCert::builder()
445 .directory_url(LETS_ENCRYPT_PRODUCTION)
446 .domain(&host);
447 if let Some(contact) = acme_contact {
448 auto_cert = auto_cert.contact(contact);
449 }
450 if let Some(certs) = certs {
451 auto_cert = auto_cert.cache_path(certs);
452 }
453 let auto_cert = auto_cert.build().map_err(ServerError::AcmeBuildError)?;
454
455 run(TcpListener::bind("0.0.0.0:443").acme(auto_cert), app).await
456 } else {
457 run(TcpListener::bind("127.0.0.1:3000"), app).await
458 }
459}
460
461async fn run<L>(listener: L, app: Route) -> Result<(), ServerError>
462where
463 L: Listener + 'static,
464{
465 let app = app
466 .with(
467 Cors::new()
468 .allow_origin_regex("*")
469 .allow_methods([Method::GET])
470 .allow_credentials(false),
471 )
472 .with(Tracing);
473 Server::new(listener)
474 .name("slingshot")
475 .run(app)
476 .await
477 .map_err(ServerError::ServerExited)
478}