forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use askama::Template;
2use axum::{
3 extract::{Query, Request},
4 http::{self, header},
5 middleware::{self, Next},
6 response::{IntoResponse, Response},
7 routing::get,
8 Router,
9};
10use axum_metrics::{ExtraMetricLabels, MetricLayer};
11use bincode::Options;
12use serde::{Deserialize, Serialize};
13use serde_with::serde_as;
14use std::collections::HashMap;
15use std::time::{Duration, UNIX_EPOCH};
16use tokio::net::{TcpListener, ToSocketAddrs};
17use tokio::task::block_in_place;
18use tokio_util::sync::CancellationToken;
19
20use crate::storage::{LinkReader, StorageStats};
21use crate::{CountsByCount, Did, RecordId};
22
23mod acceptable;
24mod filters;
25
26use acceptable::{acceptable, ExtractAccept};
27
28const DEFAULT_CURSOR_LIMIT: u64 = 16;
29const DEFAULT_CURSOR_LIMIT_MAX: u64 = 100;
30
31const INDEX_BEGAN_AT_TS: u64 = 1738083600; // TODO: not this
32
33pub async fn serve<S, A>(store: S, addr: A, stay_alive: CancellationToken) -> anyhow::Result<()>
34where
35 S: LinkReader,
36 A: ToSocketAddrs,
37{
38 let app = Router::new()
39 .route("/robots.txt", get(robots))
40 .route(
41 "/",
42 get({
43 let store = store.clone();
44 move |accept| async { block_in_place(|| hello(accept, store)) }
45 }),
46 )
47 .route(
48 "/links/count",
49 get({
50 let store = store.clone();
51 move |accept, query| async { block_in_place(|| count_links(accept, query, store)) }
52 }),
53 )
54 .route(
55 "/links/count/distinct-dids",
56 get({
57 let store = store.clone();
58 move |accept, query| async {
59 block_in_place(|| count_distinct_dids(accept, query, store))
60 }
61 }),
62 )
63 .route(
64 "/links",
65 get({
66 let store = store.clone();
67 move |accept, query| async { block_in_place(|| get_links(accept, query, store)) }
68 }),
69 )
70 .route(
71 "/links/distinct-dids",
72 get({
73 let store = store.clone();
74 move |accept, query| async {
75 block_in_place(|| get_distinct_dids(accept, query, store))
76 }
77 }),
78 )
79 .route(
80 // deprecated
81 "/links/all/count",
82 get({
83 let store = store.clone();
84 move |accept, query| async {
85 block_in_place(|| count_all_links(accept, query, store))
86 }
87 }),
88 )
89 .route(
90 "/links/all",
91 get({
92 let store = store.clone();
93 move |accept, query| async {
94 block_in_place(|| explore_links(accept, query, store))
95 }
96 }),
97 )
98 .layer(tower_http::cors::CorsLayer::permissive())
99 .layer(middleware::from_fn(add_lables))
100 .layer(MetricLayer::default());
101
102 let listener = TcpListener::bind(addr).await?;
103 println!("api: listening at http://{:?}", listener.local_addr()?);
104 axum::serve(listener, app)
105 .with_graceful_shutdown(async move { stay_alive.cancelled().await })
106 .await?;
107
108 Ok(())
109}
110
111async fn add_lables(request: Request, next: Next) -> Response {
112 let origin = request
113 .headers()
114 .get(header::ORIGIN)
115 .and_then(|o| o.to_str().map(|v| v.to_owned()).ok());
116 let user_agent = request.headers().get(header::USER_AGENT).and_then(|ua| {
117 ua.to_str()
118 .map(|v| {
119 if v.starts_with("Mozilla/") {
120 "Mozilla/...".into()
121 } else {
122 v.to_owned()
123 }
124 })
125 .ok()
126 });
127
128 let mut res = next.run(request).await;
129
130 let mut labels = Vec::new();
131 if let Some(o) = origin {
132 labels.push(metrics::Label::new("origin", o));
133 }
134 if let Some(ua) = user_agent {
135 labels.push(metrics::Label::new("user_agent", ua));
136 }
137 res.extensions_mut().insert(ExtraMetricLabels(labels));
138 res
139}
140
141async fn robots() -> &'static str {
142 "\
143User-agent: *
144Disallow: /links
145Disallow: /links/
146 "
147}
148
149#[derive(Template, Serialize, Deserialize)]
150#[template(path = "hello.html.j2")]
151struct HelloReponse {
152 help: &'static str,
153 days_indexed: u64,
154 stats: StorageStats,
155}
156fn hello(
157 accept: ExtractAccept,
158 store: impl LinkReader,
159) -> Result<impl IntoResponse, http::StatusCode> {
160 let stats = store
161 .get_stats()
162 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
163 let days_indexed = (UNIX_EPOCH + Duration::from_secs(INDEX_BEGAN_AT_TS))
164 .elapsed()
165 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?
166 .as_secs()
167 / 86400;
168 Ok(acceptable(accept, HelloReponse {
169 help: "open this URL in a web browser (or request with Accept: text/html) for information about this API.",
170 days_indexed,
171 stats,
172 }))
173}
174
175#[derive(Clone, Deserialize)]
176struct GetLinksCountQuery {
177 target: String,
178 collection: String,
179 path: String,
180}
181#[derive(Template, Serialize)]
182#[template(path = "links-count.html.j2")]
183struct GetLinksCountResponse {
184 total: u64,
185 #[serde(skip_serializing)]
186 query: GetLinksCountQuery,
187}
188fn count_links(
189 accept: ExtractAccept,
190 query: Query<GetLinksCountQuery>,
191 store: impl LinkReader,
192) -> Result<impl IntoResponse, http::StatusCode> {
193 let total = store
194 .get_count(&query.target, &query.collection, &query.path)
195 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
196 Ok(acceptable(
197 accept,
198 GetLinksCountResponse {
199 total,
200 query: (*query).clone(),
201 },
202 ))
203}
204
205#[derive(Clone, Deserialize)]
206struct GetDidsCountQuery {
207 target: String,
208 collection: String,
209 path: String,
210}
211#[derive(Template, Serialize)]
212#[template(path = "dids-count.html.j2")]
213struct GetDidsCountResponse {
214 total: u64,
215 #[serde(skip_serializing)]
216 query: GetDidsCountQuery,
217}
218fn count_distinct_dids(
219 accept: ExtractAccept,
220 query: Query<GetDidsCountQuery>,
221 store: impl LinkReader,
222) -> Result<impl IntoResponse, http::StatusCode> {
223 let total = store
224 .get_distinct_did_count(&query.target, &query.collection, &query.path)
225 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
226 Ok(acceptable(
227 accept,
228 GetDidsCountResponse {
229 total,
230 query: (*query).clone(),
231 },
232 ))
233}
234
235#[derive(Clone, Deserialize)]
236struct GetLinkItemsQuery {
237 target: String,
238 collection: String,
239 path: String,
240 cursor: Option<OpaqueApiCursor>,
241 limit: Option<u64>,
242 // TODO: allow reverse (er, forward) order as well
243}
244#[derive(Template, Serialize)]
245#[template(path = "links.html.j2")]
246struct GetLinkItemsResponse {
247 // what does staleness mean?
248 // - new links have appeared. would be nice to offer a `since` cursor to fetch these. and/or,
249 // - links have been deleted. hmm.
250 total: u64,
251 linking_records: Vec<RecordId>,
252 cursor: Option<OpaqueApiCursor>,
253 #[serde(skip_serializing)]
254 query: GetLinkItemsQuery,
255}
256fn get_links(
257 accept: ExtractAccept,
258 query: Query<GetLinkItemsQuery>,
259 store: impl LinkReader,
260) -> Result<impl IntoResponse, http::StatusCode> {
261 let until = query
262 .cursor
263 .clone()
264 .map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST))
265 .transpose()?
266 .map(|c| c.next);
267
268 let limit = query.limit.unwrap_or(DEFAULT_CURSOR_LIMIT);
269 if limit > DEFAULT_CURSOR_LIMIT_MAX {
270 return Err(http::StatusCode::BAD_REQUEST);
271 }
272
273 let paged = store
274 .get_links(&query.target, &query.collection, &query.path, limit, until)
275 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
276
277 let cursor = paged.next.map(|next| {
278 ApiCursor {
279 version: paged.version,
280 next,
281 }
282 .into()
283 });
284
285 Ok(acceptable(
286 accept,
287 GetLinkItemsResponse {
288 total: paged.total,
289 linking_records: paged.items,
290 cursor,
291 query: (*query).clone(),
292 },
293 ))
294}
295
296#[derive(Clone, Deserialize)]
297struct GetDidItemsQuery {
298 target: String,
299 collection: String,
300 path: String,
301 cursor: Option<OpaqueApiCursor>,
302 limit: Option<u64>,
303 // TODO: allow reverse (er, forward) order as well
304}
305#[derive(Template, Serialize)]
306#[template(path = "dids.html.j2")]
307struct GetDidItemsResponse {
308 // what does staleness mean?
309 // - new links have appeared. would be nice to offer a `since` cursor to fetch these. and/or,
310 // - links have been deleted. hmm.
311 total: u64,
312 linking_dids: Vec<Did>,
313 cursor: Option<OpaqueApiCursor>,
314 #[serde(skip_serializing)]
315 query: GetDidItemsQuery,
316}
317fn get_distinct_dids(
318 accept: ExtractAccept,
319 query: Query<GetDidItemsQuery>,
320 store: impl LinkReader,
321) -> Result<impl IntoResponse, http::StatusCode> {
322 let until = query
323 .cursor
324 .clone()
325 .map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST))
326 .transpose()?
327 .map(|c| c.next);
328
329 let limit = query.limit.unwrap_or(DEFAULT_CURSOR_LIMIT);
330 if limit > DEFAULT_CURSOR_LIMIT_MAX {
331 return Err(http::StatusCode::BAD_REQUEST);
332 }
333
334 let paged = store
335 .get_distinct_dids(&query.target, &query.collection, &query.path, limit, until)
336 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
337
338 let cursor = paged.next.map(|next| {
339 ApiCursor {
340 version: paged.version,
341 next,
342 }
343 .into()
344 });
345
346 Ok(acceptable(
347 accept,
348 GetDidItemsResponse {
349 total: paged.total,
350 linking_dids: paged.items,
351 cursor,
352 query: (*query).clone(),
353 },
354 ))
355}
356
357#[derive(Clone, Deserialize)]
358struct GetAllLinksQuery {
359 target: String,
360}
361#[derive(Template, Serialize)]
362#[template(path = "links-all-count.html.j2")]
363struct GetAllLinksResponse {
364 links: HashMap<String, HashMap<String, u64>>,
365 #[serde(skip_serializing)]
366 query: GetAllLinksQuery,
367}
368fn count_all_links(
369 accept: ExtractAccept,
370 query: Query<GetAllLinksQuery>,
371 store: impl LinkReader,
372) -> Result<impl IntoResponse, http::StatusCode> {
373 let links = store
374 .get_all_record_counts(&query.target)
375 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
376 Ok(acceptable(
377 accept,
378 GetAllLinksResponse {
379 links,
380 query: (*query).clone(),
381 },
382 ))
383}
384
385#[derive(Clone, Deserialize)]
386struct ExploreLinksQuery {
387 target: String,
388}
389#[derive(Template, Serialize)]
390#[template(path = "explore-links.html.j2")]
391struct ExploreLinksResponse {
392 links: HashMap<String, HashMap<String, CountsByCount>>,
393 #[serde(skip_serializing)]
394 query: ExploreLinksQuery,
395}
396fn explore_links(
397 accept: ExtractAccept,
398 query: Query<ExploreLinksQuery>,
399 store: impl LinkReader,
400) -> Result<impl IntoResponse, http::StatusCode> {
401 let links = store
402 .get_all_counts(&query.target)
403 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
404 Ok(acceptable(
405 accept,
406 ExploreLinksResponse {
407 links,
408 query: (*query).clone(),
409 },
410 ))
411}
412
413#[serde_as]
414#[derive(Clone, Serialize, Deserialize)] // for json
415struct OpaqueApiCursor(#[serde_as(as = "serde_with::hex::Hex")] Vec<u8>);
416
417#[derive(Serialize, Deserialize)] // for bincode
418struct ApiCursor {
419 version: (u64, u64), // (collection length, deleted item count)
420 next: u64,
421}
422
423impl TryFrom<OpaqueApiCursor> for ApiCursor {
424 type Error = bincode::Error;
425
426 fn try_from(item: OpaqueApiCursor) -> Result<Self, Self::Error> {
427 bincode::DefaultOptions::new().deserialize(&item.0)
428 }
429}
430
431impl From<ApiCursor> for OpaqueApiCursor {
432 fn from(item: ApiCursor) -> Self {
433 OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap())
434 }
435}