forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use askama::Template;
2use axum::{
3 extract::{Query, Request},
4 http::{self, header},
5 middleware::{self, Next},
6 response::{IntoResponse, Response},
7 routing::get,
8 Router,
9};
10use axum_metrics::{ExtraMetricLabels, MetricLayer};
11use bincode::Options;
12use serde::{Deserialize, Serialize};
13use serde_with::serde_as;
14use std::collections::{HashMap, HashSet};
15use std::time::{Duration, UNIX_EPOCH};
16use tokio::net::{TcpListener, ToSocketAddrs};
17use tokio::task::block_in_place;
18use tokio_util::sync::CancellationToken;
19
20use crate::storage::{LinkReader, StorageStats};
21use crate::{CountsByCount, Did, RecordId};
22
23mod acceptable;
24mod filters;
25
26use acceptable::{acceptable, ExtractAccept};
27
28const DEFAULT_CURSOR_LIMIT: u64 = 16;
29const DEFAULT_CURSOR_LIMIT_MAX: u64 = 100;
30
31const INDEX_BEGAN_AT_TS: u64 = 1738083600; // TODO: not this
32
33pub async fn serve<S, A>(store: S, addr: A, stay_alive: CancellationToken) -> anyhow::Result<()>
34where
35 S: LinkReader,
36 A: ToSocketAddrs,
37{
38 let app = Router::new()
39 .route("/robots.txt", get(robots))
40 .route(
41 "/",
42 get({
43 let store = store.clone();
44 move |accept| async { block_in_place(|| hello(accept, store)) }
45 }),
46 )
47 .route(
48 "/links/count",
49 get({
50 let store = store.clone();
51 move |accept, query| async { block_in_place(|| count_links(accept, query, store)) }
52 }),
53 )
54 .route(
55 "/links/count/distinct-dids",
56 get({
57 let store = store.clone();
58 move |accept, query| async {
59 block_in_place(|| count_distinct_dids(accept, query, store))
60 }
61 }),
62 )
63 .route(
64 "/links",
65 get({
66 let store = store.clone();
67 move |accept, query| async { block_in_place(|| get_links(accept, query, store)) }
68 }),
69 )
70 .route(
71 "/links/distinct-dids",
72 get({
73 let store = store.clone();
74 move |accept, query| async {
75 block_in_place(|| get_distinct_dids(accept, query, store))
76 }
77 }),
78 )
79 .route(
80 // deprecated
81 "/links/all/count",
82 get({
83 let store = store.clone();
84 move |accept, query| async {
85 block_in_place(|| count_all_links(accept, query, store))
86 }
87 }),
88 )
89 .route(
90 "/links/all",
91 get({
92 let store = store.clone();
93 move |accept, query| async {
94 block_in_place(|| explore_links(accept, query, store))
95 }
96 }),
97 )
98 .layer(tower_http::cors::CorsLayer::permissive())
99 .layer(middleware::from_fn(add_lables))
100 .layer(MetricLayer::default());
101
102 let listener = TcpListener::bind(addr).await?;
103 println!("api: listening at http://{:?}", listener.local_addr()?);
104 axum::serve(listener, app)
105 .with_graceful_shutdown(async move { stay_alive.cancelled().await })
106 .await?;
107
108 Ok(())
109}
110
111async fn add_lables(request: Request, next: Next) -> Response {
112 let origin = request
113 .headers()
114 .get(header::ORIGIN)
115 .and_then(|o| o.to_str().map(|v| v.to_owned()).ok());
116 let user_agent = request.headers().get(header::USER_AGENT).and_then(|ua| {
117 ua.to_str()
118 .map(|v| {
119 if v.starts_with("Mozilla/") {
120 "Mozilla/...".into()
121 } else {
122 v.to_owned()
123 }
124 })
125 .ok()
126 });
127
128 let mut res = next.run(request).await;
129
130 let mut labels = Vec::new();
131 if let Some(o) = origin {
132 labels.push(metrics::Label::new("origin", o));
133 }
134 if let Some(ua) = user_agent {
135 labels.push(metrics::Label::new("user_agent", ua));
136 }
137 res.extensions_mut().insert(ExtraMetricLabels(labels));
138 res
139}
140
141async fn robots() -> &'static str {
142 "\
143User-agent: *
144Disallow: /links
145Disallow: /links/
146 "
147}
148
149#[derive(Template, Serialize, Deserialize)]
150#[template(path = "hello.html.j2")]
151struct HelloReponse {
152 help: &'static str,
153 days_indexed: u64,
154 stats: StorageStats,
155}
156fn hello(
157 accept: ExtractAccept,
158 store: impl LinkReader,
159) -> Result<impl IntoResponse, http::StatusCode> {
160 let stats = store
161 .get_stats()
162 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
163 let days_indexed = (UNIX_EPOCH + Duration::from_secs(INDEX_BEGAN_AT_TS))
164 .elapsed()
165 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?
166 .as_secs()
167 / 86400;
168 Ok(acceptable(accept, HelloReponse {
169 help: "open this URL in a web browser (or request with Accept: text/html) for information about this API.",
170 days_indexed,
171 stats,
172 }))
173}
174
175#[derive(Clone, Deserialize)]
176struct GetLinksCountQuery {
177 target: String,
178 collection: String,
179 path: String,
180}
181#[derive(Template, Serialize)]
182#[template(path = "links-count.html.j2")]
183struct GetLinksCountResponse {
184 total: u64,
185 #[serde(skip_serializing)]
186 query: GetLinksCountQuery,
187}
188fn count_links(
189 accept: ExtractAccept,
190 query: Query<GetLinksCountQuery>,
191 store: impl LinkReader,
192) -> Result<impl IntoResponse, http::StatusCode> {
193 let total = store
194 .get_count(&query.target, &query.collection, &query.path)
195 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
196 Ok(acceptable(
197 accept,
198 GetLinksCountResponse {
199 total,
200 query: (*query).clone(),
201 },
202 ))
203}
204
205#[derive(Clone, Deserialize)]
206struct GetDidsCountQuery {
207 target: String,
208 collection: String,
209 path: String,
210}
211#[derive(Template, Serialize)]
212#[template(path = "dids-count.html.j2")]
213struct GetDidsCountResponse {
214 total: u64,
215 #[serde(skip_serializing)]
216 query: GetDidsCountQuery,
217}
218fn count_distinct_dids(
219 accept: ExtractAccept,
220 query: Query<GetDidsCountQuery>,
221 store: impl LinkReader,
222) -> Result<impl IntoResponse, http::StatusCode> {
223 let total = store
224 .get_distinct_did_count(&query.target, &query.collection, &query.path)
225 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
226 Ok(acceptable(
227 accept,
228 GetDidsCountResponse {
229 total,
230 query: (*query).clone(),
231 },
232 ))
233}
234
235#[derive(Clone, Deserialize)]
236struct GetLinkItemsQuery {
237 target: String,
238 collection: String,
239 path: String,
240 cursor: Option<OpaqueApiCursor>,
241 /// Filter links only from these DIDs
242 ///
243 /// include multiple times to filter by multiple source DIDs
244 #[serde(default)]
245 did: Vec<String>,
246 /// [deprecated] Filter links only from these DIDs
247 ///
248 /// format: comma-separated sequence of DIDs
249 ///
250 /// errors: if `did` parameter is also present
251 ///
252 /// deprecated: use `did`, which can be repeated multiple times
253 from_dids: Option<String>, // comma separated: gross
254 #[serde(default = "get_default_limit")]
255 limit: u64,
256 // TODO: allow reverse (er, forward) order as well
257}
258fn get_default_limit() -> u64 {
259 DEFAULT_CURSOR_LIMIT
260}
261#[derive(Template, Serialize)]
262#[template(path = "links.html.j2")]
263struct GetLinkItemsResponse {
264 // what does staleness mean?
265 // - new links have appeared. would be nice to offer a `since` cursor to fetch these. and/or,
266 // - links have been deleted. hmm.
267 total: u64,
268 linking_records: Vec<RecordId>,
269 cursor: Option<OpaqueApiCursor>,
270 #[serde(skip_serializing)]
271 query: GetLinkItemsQuery,
272}
273fn get_links(
274 accept: ExtractAccept,
275 query: axum_extra::extract::Query<GetLinkItemsQuery>, // supports multiple param occurrences
276 store: impl LinkReader,
277) -> Result<impl IntoResponse, http::StatusCode> {
278 let until = query
279 .cursor
280 .clone()
281 .map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST))
282 .transpose()?
283 .map(|c| c.next);
284
285 let limit = query.limit;
286 if limit > DEFAULT_CURSOR_LIMIT_MAX {
287 return Err(http::StatusCode::BAD_REQUEST);
288 }
289
290 let mut filter_dids: HashSet<Did> = HashSet::from_iter(
291 query
292 .did
293 .iter()
294 .map(|d| d.trim())
295 .filter(|d| !d.is_empty())
296 .map(|d| Did(d.to_string())),
297 );
298
299 if let Some(comma_joined) = &query.from_dids {
300 if !filter_dids.is_empty() {
301 return Err(http::StatusCode::BAD_REQUEST);
302 }
303 for did in comma_joined.split(',') {
304 filter_dids.insert(Did(did.to_string()));
305 }
306 }
307
308 let paged = store
309 .get_links(
310 &query.target,
311 &query.collection,
312 &query.path,
313 limit,
314 until,
315 &filter_dids,
316 )
317 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
318
319 let cursor = paged.next.map(|next| {
320 ApiCursor {
321 version: paged.version,
322 next,
323 }
324 .into()
325 });
326
327 Ok(acceptable(
328 accept,
329 GetLinkItemsResponse {
330 total: paged.total,
331 linking_records: paged.items,
332 cursor,
333 query: (*query).clone(),
334 },
335 ))
336}
337
338#[derive(Clone, Deserialize)]
339struct GetDidItemsQuery {
340 target: String,
341 collection: String,
342 path: String,
343 cursor: Option<OpaqueApiCursor>,
344 limit: Option<u64>,
345 // TODO: allow reverse (er, forward) order as well
346}
347#[derive(Template, Serialize)]
348#[template(path = "dids.html.j2")]
349struct GetDidItemsResponse {
350 // what does staleness mean?
351 // - new links have appeared. would be nice to offer a `since` cursor to fetch these. and/or,
352 // - links have been deleted. hmm.
353 total: u64,
354 linking_dids: Vec<Did>,
355 cursor: Option<OpaqueApiCursor>,
356 #[serde(skip_serializing)]
357 query: GetDidItemsQuery,
358}
359fn get_distinct_dids(
360 accept: ExtractAccept,
361 query: Query<GetDidItemsQuery>,
362 store: impl LinkReader,
363) -> Result<impl IntoResponse, http::StatusCode> {
364 let until = query
365 .cursor
366 .clone()
367 .map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST))
368 .transpose()?
369 .map(|c| c.next);
370
371 let limit = query.limit.unwrap_or(DEFAULT_CURSOR_LIMIT);
372 if limit > DEFAULT_CURSOR_LIMIT_MAX {
373 return Err(http::StatusCode::BAD_REQUEST);
374 }
375
376 let paged = store
377 .get_distinct_dids(&query.target, &query.collection, &query.path, limit, until)
378 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
379
380 let cursor = paged.next.map(|next| {
381 ApiCursor {
382 version: paged.version,
383 next,
384 }
385 .into()
386 });
387
388 Ok(acceptable(
389 accept,
390 GetDidItemsResponse {
391 total: paged.total,
392 linking_dids: paged.items,
393 cursor,
394 query: (*query).clone(),
395 },
396 ))
397}
398
399#[derive(Clone, Deserialize)]
400struct GetAllLinksQuery {
401 target: String,
402}
403#[derive(Template, Serialize)]
404#[template(path = "links-all-count.html.j2")]
405struct GetAllLinksResponse {
406 links: HashMap<String, HashMap<String, u64>>,
407 #[serde(skip_serializing)]
408 query: GetAllLinksQuery,
409}
410fn count_all_links(
411 accept: ExtractAccept,
412 query: Query<GetAllLinksQuery>,
413 store: impl LinkReader,
414) -> Result<impl IntoResponse, http::StatusCode> {
415 let links = store
416 .get_all_record_counts(&query.target)
417 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
418 Ok(acceptable(
419 accept,
420 GetAllLinksResponse {
421 links,
422 query: (*query).clone(),
423 },
424 ))
425}
426
427#[derive(Clone, Deserialize)]
428struct ExploreLinksQuery {
429 target: String,
430}
431#[derive(Template, Serialize)]
432#[template(path = "explore-links.html.j2")]
433struct ExploreLinksResponse {
434 links: HashMap<String, HashMap<String, CountsByCount>>,
435 #[serde(skip_serializing)]
436 query: ExploreLinksQuery,
437}
438fn explore_links(
439 accept: ExtractAccept,
440 query: Query<ExploreLinksQuery>,
441 store: impl LinkReader,
442) -> Result<impl IntoResponse, http::StatusCode> {
443 let links = store
444 .get_all_counts(&query.target)
445 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
446 Ok(acceptable(
447 accept,
448 ExploreLinksResponse {
449 links,
450 query: (*query).clone(),
451 },
452 ))
453}
454
455#[serde_as]
456#[derive(Clone, Serialize, Deserialize)] // for json
457struct OpaqueApiCursor(#[serde_as(as = "serde_with::hex::Hex")] Vec<u8>);
458
459#[derive(Serialize, Deserialize)] // for bincode
460struct ApiCursor {
461 version: (u64, u64), // (collection length, deleted item count)
462 next: u64,
463}
464
465impl TryFrom<OpaqueApiCursor> for ApiCursor {
466 type Error = bincode::Error;
467
468 fn try_from(item: OpaqueApiCursor) -> Result<Self, Self::Error> {
469 bincode::DefaultOptions::new().deserialize(&item.0)
470 }
471}
472
473impl From<ApiCursor> for OpaqueApiCursor {
474 fn from(item: ApiCursor) -> Self {
475 OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap())
476 }
477}