Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use askama::Template; 2use axum::{ 3 extract::{Query, Request}, 4 http::{self, header}, 5 middleware::{self, Next}, 6 response::{IntoResponse, Response}, 7 routing::get, 8 Router, 9}; 10use axum_metrics::{ExtraMetricLabels, MetricLayer}; 11use bincode::Options; 12use serde::{Deserialize, Serialize}; 13use serde_with::serde_as; 14use std::collections::HashMap; 15use std::time::{Duration, UNIX_EPOCH}; 16use tokio::net::{TcpListener, ToSocketAddrs}; 17use tokio::task::block_in_place; 18use tokio_util::sync::CancellationToken; 19 20use crate::storage::{LinkReader, StorageStats}; 21use crate::{CountsByCount, Did, RecordId}; 22 23mod acceptable; 24mod filters; 25 26use acceptable::{acceptable, ExtractAccept}; 27 28const DEFAULT_CURSOR_LIMIT: u64 = 16; 29const DEFAULT_CURSOR_LIMIT_MAX: u64 = 100; 30 31const INDEX_BEGAN_AT_TS: u64 = 1738083600; // TODO: not this 32 33pub async fn serve<S, A>(store: S, addr: A, stay_alive: CancellationToken) -> anyhow::Result<()> 34where 35 S: LinkReader, 36 A: ToSocketAddrs, 37{ 38 let app = Router::new() 39 .route("/robots.txt", get(robots)) 40 .route( 41 "/", 42 get({ 43 let store = store.clone(); 44 move |accept| async { block_in_place(|| hello(accept, store)) } 45 }), 46 ) 47 .route( 48 "/links/count", 49 get({ 50 let store = store.clone(); 51 move |accept, query| async { block_in_place(|| count_links(accept, query, store)) } 52 }), 53 ) 54 .route( 55 "/links/count/distinct-dids", 56 get({ 57 let store = store.clone(); 58 move |accept, query| async { 59 block_in_place(|| count_distinct_dids(accept, query, store)) 60 } 61 }), 62 ) 63 .route( 64 "/links", 65 get({ 66 let store = store.clone(); 67 move |accept, query| async { block_in_place(|| get_links(accept, query, store)) } 68 }), 69 ) 70 .route( 71 "/links/distinct-dids", 72 get({ 73 let store = store.clone(); 74 move |accept, query| async { 75 block_in_place(|| get_distinct_dids(accept, query, store)) 76 } 77 }), 78 ) 79 .route( 80 // deprecated 81 "/links/all/count", 82 get({ 83 let store = store.clone(); 84 move |accept, query| async { 85 block_in_place(|| count_all_links(accept, query, store)) 86 } 87 }), 88 ) 89 .route( 90 "/links/all", 91 get({ 92 let store = store.clone(); 93 move |accept, query| async { 94 block_in_place(|| explore_links(accept, query, store)) 95 } 96 }), 97 ) 98 .layer(tower_http::cors::CorsLayer::permissive()) 99 .layer(middleware::from_fn(add_lables)) 100 .layer(MetricLayer::default()); 101 102 let listener = TcpListener::bind(addr).await?; 103 println!("api: listening at http://{:?}", listener.local_addr()?); 104 axum::serve(listener, app) 105 .with_graceful_shutdown(async move { stay_alive.cancelled().await }) 106 .await?; 107 108 Ok(()) 109} 110 111async fn add_lables(request: Request, next: Next) -> Response { 112 let origin = request 113 .headers() 114 .get(header::ORIGIN) 115 .and_then(|o| o.to_str().map(|v| v.to_owned()).ok()); 116 let user_agent = request.headers().get(header::USER_AGENT).and_then(|ua| { 117 ua.to_str() 118 .map(|v| { 119 if v.starts_with("Mozilla/") { 120 "Mozilla/...".into() 121 } else { 122 v.to_owned() 123 } 124 }) 125 .ok() 126 }); 127 128 let mut res = next.run(request).await; 129 130 let mut labels = Vec::new(); 131 if let Some(o) = origin { 132 labels.push(metrics::Label::new("origin", o)); 133 } 134 if let Some(ua) = user_agent { 135 labels.push(metrics::Label::new("user_agent", ua)); 136 } 137 res.extensions_mut().insert(ExtraMetricLabels(labels)); 138 res 139} 140 141async fn robots() -> &'static str { 142 "\ 143User-agent: * 144Disallow: /links 145Disallow: /links/ 146 " 147} 148 149#[derive(Template, Serialize, Deserialize)] 150#[template(path = "hello.html.j2")] 151struct HelloReponse { 152 help: &'static str, 153 days_indexed: u64, 154 stats: StorageStats, 155} 156fn hello( 157 accept: ExtractAccept, 158 store: impl LinkReader, 159) -> Result<impl IntoResponse, http::StatusCode> { 160 let stats = store 161 .get_stats() 162 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 163 let days_indexed = (UNIX_EPOCH + Duration::from_secs(INDEX_BEGAN_AT_TS)) 164 .elapsed() 165 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)? 166 .as_secs() 167 / 86400; 168 Ok(acceptable(accept, HelloReponse { 169 help: "open this URL in a web browser (or request with Accept: text/html) for information about this API.", 170 days_indexed, 171 stats, 172 })) 173} 174 175#[derive(Clone, Deserialize)] 176struct GetLinksCountQuery { 177 target: String, 178 collection: String, 179 path: String, 180} 181#[derive(Template, Serialize)] 182#[template(path = "links-count.html.j2")] 183struct GetLinksCountResponse { 184 total: u64, 185 #[serde(skip_serializing)] 186 query: GetLinksCountQuery, 187} 188fn count_links( 189 accept: ExtractAccept, 190 query: Query<GetLinksCountQuery>, 191 store: impl LinkReader, 192) -> Result<impl IntoResponse, http::StatusCode> { 193 let total = store 194 .get_count(&query.target, &query.collection, &query.path) 195 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 196 Ok(acceptable( 197 accept, 198 GetLinksCountResponse { 199 total, 200 query: (*query).clone(), 201 }, 202 )) 203} 204 205#[derive(Clone, Deserialize)] 206struct GetDidsCountQuery { 207 target: String, 208 collection: String, 209 path: String, 210} 211#[derive(Template, Serialize)] 212#[template(path = "dids-count.html.j2")] 213struct GetDidsCountResponse { 214 total: u64, 215 #[serde(skip_serializing)] 216 query: GetDidsCountQuery, 217} 218fn count_distinct_dids( 219 accept: ExtractAccept, 220 query: Query<GetDidsCountQuery>, 221 store: impl LinkReader, 222) -> Result<impl IntoResponse, http::StatusCode> { 223 let total = store 224 .get_distinct_did_count(&query.target, &query.collection, &query.path) 225 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 226 Ok(acceptable( 227 accept, 228 GetDidsCountResponse { 229 total, 230 query: (*query).clone(), 231 }, 232 )) 233} 234 235#[derive(Clone, Deserialize)] 236struct GetLinkItemsQuery { 237 target: String, 238 collection: String, 239 path: String, 240 cursor: Option<OpaqueApiCursor>, 241 limit: Option<u64>, 242 // TODO: allow reverse (er, forward) order as well 243} 244#[derive(Template, Serialize)] 245#[template(path = "links.html.j2")] 246struct GetLinkItemsResponse { 247 // what does staleness mean? 248 // - new links have appeared. would be nice to offer a `since` cursor to fetch these. and/or, 249 // - links have been deleted. hmm. 250 total: u64, 251 linking_records: Vec<RecordId>, 252 cursor: Option<OpaqueApiCursor>, 253 #[serde(skip_serializing)] 254 query: GetLinkItemsQuery, 255} 256fn get_links( 257 accept: ExtractAccept, 258 query: Query<GetLinkItemsQuery>, 259 store: impl LinkReader, 260) -> Result<impl IntoResponse, http::StatusCode> { 261 let until = query 262 .cursor 263 .clone() 264 .map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST)) 265 .transpose()? 266 .map(|c| c.next); 267 268 let limit = query.limit.unwrap_or(DEFAULT_CURSOR_LIMIT); 269 if limit > DEFAULT_CURSOR_LIMIT_MAX { 270 return Err(http::StatusCode::BAD_REQUEST); 271 } 272 273 let paged = store 274 .get_links(&query.target, &query.collection, &query.path, limit, until) 275 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 276 277 let cursor = paged.next.map(|next| { 278 ApiCursor { 279 version: paged.version, 280 next, 281 } 282 .into() 283 }); 284 285 Ok(acceptable( 286 accept, 287 GetLinkItemsResponse { 288 total: paged.total, 289 linking_records: paged.items, 290 cursor, 291 query: (*query).clone(), 292 }, 293 )) 294} 295 296#[derive(Clone, Deserialize)] 297struct GetDidItemsQuery { 298 target: String, 299 collection: String, 300 path: String, 301 cursor: Option<OpaqueApiCursor>, 302 limit: Option<u64>, 303 // TODO: allow reverse (er, forward) order as well 304} 305#[derive(Template, Serialize)] 306#[template(path = "dids.html.j2")] 307struct GetDidItemsResponse { 308 // what does staleness mean? 309 // - new links have appeared. would be nice to offer a `since` cursor to fetch these. and/or, 310 // - links have been deleted. hmm. 311 total: u64, 312 linking_dids: Vec<Did>, 313 cursor: Option<OpaqueApiCursor>, 314 #[serde(skip_serializing)] 315 query: GetDidItemsQuery, 316} 317fn get_distinct_dids( 318 accept: ExtractAccept, 319 query: Query<GetDidItemsQuery>, 320 store: impl LinkReader, 321) -> Result<impl IntoResponse, http::StatusCode> { 322 let until = query 323 .cursor 324 .clone() 325 .map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST)) 326 .transpose()? 327 .map(|c| c.next); 328 329 let limit = query.limit.unwrap_or(DEFAULT_CURSOR_LIMIT); 330 if limit > DEFAULT_CURSOR_LIMIT_MAX { 331 return Err(http::StatusCode::BAD_REQUEST); 332 } 333 334 let paged = store 335 .get_distinct_dids(&query.target, &query.collection, &query.path, limit, until) 336 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 337 338 let cursor = paged.next.map(|next| { 339 ApiCursor { 340 version: paged.version, 341 next, 342 } 343 .into() 344 }); 345 346 Ok(acceptable( 347 accept, 348 GetDidItemsResponse { 349 total: paged.total, 350 linking_dids: paged.items, 351 cursor, 352 query: (*query).clone(), 353 }, 354 )) 355} 356 357#[derive(Clone, Deserialize)] 358struct GetAllLinksQuery { 359 target: String, 360} 361#[derive(Template, Serialize)] 362#[template(path = "links-all-count.html.j2")] 363struct GetAllLinksResponse { 364 links: HashMap<String, HashMap<String, u64>>, 365 #[serde(skip_serializing)] 366 query: GetAllLinksQuery, 367} 368fn count_all_links( 369 accept: ExtractAccept, 370 query: Query<GetAllLinksQuery>, 371 store: impl LinkReader, 372) -> Result<impl IntoResponse, http::StatusCode> { 373 let links = store 374 .get_all_record_counts(&query.target) 375 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 376 Ok(acceptable( 377 accept, 378 GetAllLinksResponse { 379 links, 380 query: (*query).clone(), 381 }, 382 )) 383} 384 385#[derive(Clone, Deserialize)] 386struct ExploreLinksQuery { 387 target: String, 388} 389#[derive(Template, Serialize)] 390#[template(path = "explore-links.html.j2")] 391struct ExploreLinksResponse { 392 links: HashMap<String, HashMap<String, CountsByCount>>, 393 #[serde(skip_serializing)] 394 query: ExploreLinksQuery, 395} 396fn explore_links( 397 accept: ExtractAccept, 398 query: Query<ExploreLinksQuery>, 399 store: impl LinkReader, 400) -> Result<impl IntoResponse, http::StatusCode> { 401 let links = store 402 .get_all_counts(&query.target) 403 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 404 Ok(acceptable( 405 accept, 406 ExploreLinksResponse { 407 links, 408 query: (*query).clone(), 409 }, 410 )) 411} 412 413#[serde_as] 414#[derive(Clone, Serialize, Deserialize)] // for json 415struct OpaqueApiCursor(#[serde_as(as = "serde_with::hex::Hex")] Vec<u8>); 416 417#[derive(Serialize, Deserialize)] // for bincode 418struct ApiCursor { 419 version: (u64, u64), // (collection length, deleted item count) 420 next: u64, 421} 422 423impl TryFrom<OpaqueApiCursor> for ApiCursor { 424 type Error = bincode::Error; 425 426 fn try_from(item: OpaqueApiCursor) -> Result<Self, Self::Error> { 427 bincode::DefaultOptions::new().deserialize(&item.0) 428 } 429} 430 431impl From<ApiCursor> for OpaqueApiCursor { 432 fn from(item: ApiCursor) -> Self { 433 OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap()) 434 } 435}