forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::{CachedRecord, Identity, Repo, error::ServerError};
2use atrium_api::types::string::{Cid, Did, Handle, Nsid, RecordKey};
3use foyer::HybridCache;
4use serde::Serialize;
5use std::path::PathBuf;
6use std::str::FromStr;
7use std::sync::Arc;
8use tokio_util::sync::CancellationToken;
9
10use poem::{
11 Endpoint, EndpointExt, Route, Server,
12 endpoint::make_sync,
13 http::Method,
14 listener::{
15 Listener, TcpListener,
16 acme::{AutoCert, LETS_ENCRYPT_PRODUCTION},
17 },
18 middleware::{Cors, Tracing},
19};
20use poem_openapi::{
21 ApiResponse, Object, OpenApi, OpenApiService, param::Query, payload::Json, types::Example,
22};
23
24fn example_did() -> String {
25 "did:plc:hdhoaan3xa3jiuq4fg4mefid".to_string()
26}
27fn example_collection() -> String {
28 "app.bsky.feed.like".to_string()
29}
30fn example_rkey() -> String {
31 "3lv4ouczo2b2a".to_string()
32}
33
34#[derive(Object)]
35#[oai(example = true)]
36struct XrpcErrorResponseObject {
37 /// Should correspond an error `name` in the lexicon errors array
38 error: String,
39 /// Human-readable description and possibly additonal context
40 message: String,
41}
42impl Example for XrpcErrorResponseObject {
43 fn example() -> Self {
44 Self {
45 error: "RecordNotFound".to_string(),
46 message: "This record was deleted".to_string(),
47 }
48 }
49}
50type XrpcError = Json<XrpcErrorResponseObject>;
51fn xrpc_error(error: impl AsRef<str>, message: impl AsRef<str>) -> XrpcError {
52 Json(XrpcErrorResponseObject {
53 error: error.as_ref().to_string(),
54 message: message.as_ref().to_string(),
55 })
56}
57
58fn bad_request_handler(err: poem::Error) -> GetRecordResponse {
59 GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject {
60 error: "InvalidRequest".to_string(),
61 message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
62 }))
63}
64
65#[derive(Object)]
66#[oai(example = true)]
67struct FoundRecordResponseObject {
68 /// at-uri for this record
69 uri: String,
70 /// CID for this exact version of the record
71 ///
72 /// Slingshot will always return the CID, despite it not being a required
73 /// response property in the official lexicon.
74 cid: Option<String>,
75 /// the record itself as JSON
76 value: serde_json::Value,
77}
78impl Example for FoundRecordResponseObject {
79 fn example() -> Self {
80 Self {
81 uri: format!(
82 "at://{}/{}/{}",
83 example_did(),
84 example_collection(),
85 example_rkey()
86 ),
87 cid: Some("bafyreialv3mzvvxaoyrfrwoer3xmabbmdchvrbyhayd7bga47qjbycy74e".to_string()),
88 value: serde_json::json!({
89 "$type": "app.bsky.feed.like",
90 "createdAt": "2025-07-29T18:02:02.327Z",
91 "subject": {
92 "cid": "bafyreia2gy6eyk5qfetgahvshpq35vtbwy6negpy3gnuulcdi723mi7vxy",
93 "uri": "at://did:plc:vwzwgnygau7ed7b7wt5ux7y2/app.bsky.feed.post/3lv4lkb4vgs2k"
94 }
95 }),
96 }
97 }
98}
99
100#[derive(ApiResponse)]
101#[oai(bad_request_handler = "bad_request_handler")]
102enum GetRecordResponse {
103 /// Record found
104 #[oai(status = 200)]
105 Ok(Json<FoundRecordResponseObject>),
106 /// Bad request or no record to return
107 ///
108 /// The only error name in the repo.getRecord lexicon is `RecordNotFound`,
109 /// but the [canonical api docs](https://docs.bsky.app/docs/api/com-atproto-repo-get-record)
110 /// also list `InvalidRequest`, `ExpiredToken`, and `InvalidToken`. Of
111 /// these, slingshot will only return `RecordNotFound` or `InvalidRequest`.
112 #[oai(status = 400)]
113 BadRequest(XrpcError),
114 /// Just using 500 for potentially upstream errors for now
115 #[oai(status = 500)]
116 ServerError(XrpcError),
117}
118
119struct Xrpc {
120 cache: HybridCache<String, CachedRecord>,
121 identity: Identity,
122 repo: Arc<Repo>,
123}
124
125#[OpenApi]
126impl Xrpc {
127 /// com.atproto.repo.getRecord
128 ///
129 /// Get a single record from a repository. Does not require auth.
130 ///
131 /// See https://docs.bsky.app/docs/api/com-atproto-repo-get-record for the
132 /// canonical XRPC documentation that this endpoint aims to be compatible
133 /// with.
134 #[oai(path = "/com.atproto.repo.getRecord", method = "get")]
135 async fn get_record(
136 &self,
137 /// The DID or handle of the repo
138 #[oai(example = "example_did")]
139 Query(repo): Query<String>,
140 /// The NSID of the record collection
141 #[oai(example = "example_collection")]
142 Query(collection): Query<String>,
143 /// The Record key
144 #[oai(example = "example_rkey")]
145 Query(rkey): Query<String>,
146 /// Optional: the CID of the version of the record.
147 ///
148 /// If not specified, then return the most recent version.
149 ///
150 /// If specified and a newer version of the record exists, returns 404 not
151 /// found. That is: slingshot only retains the most recent version of a
152 /// record. (TODO: verify bsky behaviour for mismatched/old CID)
153 Query(cid): Query<Option<String>>,
154 ) -> GetRecordResponse {
155 let did = match Did::new(repo.clone()) {
156 Ok(did) => did,
157 Err(_) => {
158 let Ok(handle) = Handle::new(repo) else {
159 return GetRecordResponse::BadRequest(xrpc_error(
160 "InvalidRequest",
161 "repo was not a valid DID or handle",
162 ));
163 };
164 if let Ok(res) = self.identity.handle_to_did(handle).await {
165 if let Some(did) = res {
166 did
167 } else {
168 return GetRecordResponse::BadRequest(xrpc_error(
169 "InvalidRequest",
170 "Could not resolve handle repo to a DID",
171 ));
172 }
173 } else {
174 return GetRecordResponse::ServerError(xrpc_error(
175 "ResolutionFailed",
176 "errored while trying to resolve handle to DID",
177 ));
178 }
179 }
180 };
181
182 let Ok(collection) = Nsid::new(collection) else {
183 return GetRecordResponse::BadRequest(xrpc_error(
184 "InvalidRequest",
185 "invalid NSID for collection",
186 ));
187 };
188
189 let Ok(rkey) = RecordKey::new(rkey) else {
190 return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "invalid rkey"));
191 };
192
193 let cid: Option<Cid> = if let Some(cid) = cid {
194 let Ok(cid) = Cid::from_str(&cid) else {
195 return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "invalid CID"));
196 };
197 Some(cid)
198 } else {
199 None
200 };
201
202 let at_uri = format!("at://{}/{}/{}", &*did, &*collection, &*rkey);
203
204 let entry = self
205 .cache
206 .fetch(at_uri.clone(), {
207 let cid = cid.clone();
208 let repo_api = self.repo.clone();
209 || async move {
210 repo_api
211 .get_record(&did, &collection, &rkey, &cid)
212 .await
213 .map_err(|e| foyer::Error::Other(Box::new(e)))
214 }
215 })
216 .await
217 .unwrap(); // todo
218
219 // TODO: actual 404
220
221 match *entry {
222 CachedRecord::Found(ref raw) => {
223 let (found_cid, raw_value) = raw.into();
224 if cid.clone().map(|c| c != found_cid).unwrap_or(false) {
225 return GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject {
226 error: "RecordNotFound".to_string(),
227 message: "A record was found but its CID did not match that requested"
228 .to_string(),
229 }));
230 }
231 // TODO: thank u stellz: https://gist.github.com/stella3d/51e679e55b264adff89d00a1e58d0272
232 let value =
233 serde_json::from_str(raw_value.get()).expect("RawValue to be valid json");
234 GetRecordResponse::Ok(Json(FoundRecordResponseObject {
235 uri: at_uri,
236 cid: Some(found_cid.as_ref().to_string()),
237 value,
238 }))
239 }
240 CachedRecord::Deleted => GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject {
241 error: "RecordNotFound".to_string(),
242 message: "This record was deleted".to_string(),
243 })),
244 }
245 }
246
247 // TODO
248 // #[oai(path = "/com.atproto.identity.resolveHandle", method = "get")]
249 // #[oai(path = "/com.atproto.identity.resolveDid", method = "get")]
250 // but these are both not specified to do bidirectional validation, which is what we want to offer
251 // com.atproto.identity.resolveIdentity seems right, but requires returning the full did-doc
252 // would be nice if there were two queries:
253 // did -> verified handle + pds url
254 // handle -> verified did + pds url
255 //
256 // we could do horrible things and implement resolveIdentity with only a stripped-down fake did doc
257 // but this will *definitely* cause problems because eg. we're not currently storing pubkeys and
258 // those are a little bit important
259}
260
261#[derive(Debug, Clone, Serialize)]
262#[serde(rename_all = "camelCase")]
263struct AppViewService {
264 id: String,
265 r#type: String,
266 service_endpoint: String,
267}
268#[derive(Debug, Clone, Serialize)]
269struct AppViewDoc {
270 id: String,
271 service: [AppViewService; 1],
272}
273/// Serve a did document for did:web for this to be an xrpc appview
274///
275/// No slingshot endpoints currently require auth, so it's not necessary to do
276/// service proxying, however clients may wish to:
277///
278/// - PDS proxying offers a level of client IP anonymity from slingshot
279/// - slingshot *may* implement more generous per-user rate-limits for proxied requests in the future
280fn get_did_doc(host: &str) -> impl Endpoint + use<> {
281 let doc = poem::web::Json(AppViewDoc {
282 id: format!("did:web:{host}"),
283 service: [AppViewService {
284 id: "#slingshot".to_string(),
285 r#type: "SlingshotRecordProxy".to_string(),
286 service_endpoint: format!("https://{host}"),
287 }],
288 });
289 make_sync(move |_| doc.clone())
290}
291
292pub async fn serve(
293 cache: HybridCache<String, CachedRecord>,
294 identity: Identity,
295 repo: Repo,
296 host: Option<String>,
297 certs: Option<PathBuf>,
298 _shutdown: CancellationToken,
299) -> Result<(), ServerError> {
300 let repo = Arc::new(repo);
301 let api_service = OpenApiService::new(
302 Xrpc {
303 cache,
304 identity,
305 repo,
306 },
307 "Slingshot",
308 env!("CARGO_PKG_VERSION"),
309 )
310 .server("http://localhost:3000")
311 .url_prefix("/xrpc");
312
313 let mut app = Route::new()
314 .nest("/", api_service.scalar())
315 .nest("/openapi.json", api_service.spec_endpoint())
316 .nest("/xrpc/", api_service);
317
318 if let Some(host) = host {
319 rustls::crypto::aws_lc_rs::default_provider()
320 .install_default()
321 .expect("alskfjalksdjf");
322
323 app = app.at("/.well-known/did.json", get_did_doc(&host));
324
325 let mut auto_cert = AutoCert::builder()
326 .directory_url(LETS_ENCRYPT_PRODUCTION)
327 .domain(&host);
328 if let Some(certs) = certs {
329 auto_cert = auto_cert.cache_path(certs)
330 }
331 let auto_cert = auto_cert.build().map_err(ServerError::AcmeBuildError)?;
332
333 run(TcpListener::bind("0.0.0.0:443").acme(auto_cert), app).await
334 } else {
335 run(TcpListener::bind("127.0.0.1:3000"), app).await
336 }
337}
338
339async fn run<L>(listener: L, app: Route) -> Result<(), ServerError>
340where
341 L: Listener + 'static,
342{
343 let app = app
344 .with(
345 Cors::new()
346 .allow_origin_regex("*")
347 .allow_methods([Method::GET])
348 .allow_credentials(false),
349 )
350 .with(Tracing);
351 Server::new(listener)
352 .name("slingshot")
353 .run(app)
354 .await
355 .map_err(ServerError::ServerExited)
356}