Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use askama::Template; 2use axum::{ 3 extract::{Query, Request}, 4 http::{self, header}, 5 middleware::{self, Next}, 6 response::{IntoResponse, Response}, 7 routing::get, 8 Router, 9}; 10use axum_metrics::{ExtraMetricLabels, MetricLayer}; 11use bincode::Options; 12use serde::{Deserialize, Serialize}; 13use serde_with::serde_as; 14use std::collections::{HashMap, HashSet}; 15use std::time::{Duration, UNIX_EPOCH}; 16use tokio::net::{TcpListener, ToSocketAddrs}; 17use tokio::task::block_in_place; 18use tokio_util::sync::CancellationToken; 19 20use crate::storage::{LinkReader, StorageStats}; 21use crate::{CountsByCount, Did, RecordId}; 22 23mod acceptable; 24mod filters; 25 26use acceptable::{acceptable, ExtractAccept}; 27 28const DEFAULT_CURSOR_LIMIT: u64 = 16; 29const DEFAULT_CURSOR_LIMIT_MAX: u64 = 100; 30 31const INDEX_BEGAN_AT_TS: u64 = 1738083600; // TODO: not this 32 33pub async fn serve<S, A>(store: S, addr: A, stay_alive: CancellationToken) -> anyhow::Result<()> 34where 35 S: LinkReader, 36 A: ToSocketAddrs, 37{ 38 let app = Router::new() 39 .route("/robots.txt", get(robots)) 40 .route( 41 "/", 42 get({ 43 let store = store.clone(); 44 move |accept| async { block_in_place(|| hello(accept, store)) } 45 }), 46 ) 47 .route( 48 "/links/count", 49 get({ 50 let store = store.clone(); 51 move |accept, query| async { block_in_place(|| count_links(accept, query, store)) } 52 }), 53 ) 54 .route( 55 "/links/count/distinct-dids", 56 get({ 57 let store = store.clone(); 58 move |accept, query| async { 59 block_in_place(|| count_distinct_dids(accept, query, store)) 60 } 61 }), 62 ) 63 .route( 64 "/links", 65 get({ 66 let store = store.clone(); 67 move |accept, query| async { block_in_place(|| get_links(accept, query, store)) } 68 }), 69 ) 70 .route( 71 "/links/distinct-dids", 72 get({ 73 let store = store.clone(); 74 move |accept, query| async { 75 block_in_place(|| get_distinct_dids(accept, query, store)) 76 } 77 }), 78 ) 79 .route( 80 // deprecated 81 "/links/all/count", 82 get({ 83 let store = store.clone(); 84 move |accept, query| async { 85 block_in_place(|| count_all_links(accept, query, store)) 86 } 87 }), 88 ) 89 .route( 90 "/links/all", 91 get({ 92 let store = store.clone(); 93 move |accept, query| async { 94 block_in_place(|| explore_links(accept, query, store)) 95 } 96 }), 97 ) 98 .layer(tower_http::cors::CorsLayer::permissive()) 99 .layer(middleware::from_fn(add_lables)) 100 .layer(MetricLayer::default()); 101 102 let listener = TcpListener::bind(addr).await?; 103 println!("api: listening at http://{:?}", listener.local_addr()?); 104 axum::serve(listener, app) 105 .with_graceful_shutdown(async move { stay_alive.cancelled().await }) 106 .await?; 107 108 Ok(()) 109} 110 111async fn add_lables(request: Request, next: Next) -> Response { 112 let origin = request 113 .headers() 114 .get(header::ORIGIN) 115 .and_then(|o| o.to_str().map(|v| v.to_owned()).ok()); 116 let user_agent = request.headers().get(header::USER_AGENT).and_then(|ua| { 117 ua.to_str() 118 .map(|v| { 119 if v.starts_with("Mozilla/") { 120 "Mozilla/...".into() 121 } else { 122 v.to_owned() 123 } 124 }) 125 .ok() 126 }); 127 128 let mut res = next.run(request).await; 129 130 let mut labels = Vec::new(); 131 if let Some(o) = origin { 132 labels.push(metrics::Label::new("origin", o)); 133 } 134 if let Some(ua) = user_agent { 135 labels.push(metrics::Label::new("user_agent", ua)); 136 } 137 res.extensions_mut().insert(ExtraMetricLabels(labels)); 138 res 139} 140 141async fn robots() -> &'static str { 142 "\ 143User-agent: * 144Disallow: /links 145Disallow: /links/ 146 " 147} 148 149#[derive(Template, Serialize, Deserialize)] 150#[template(path = "hello.html.j2")] 151struct HelloReponse { 152 help: &'static str, 153 days_indexed: u64, 154 stats: StorageStats, 155} 156fn hello( 157 accept: ExtractAccept, 158 store: impl LinkReader, 159) -> Result<impl IntoResponse, http::StatusCode> { 160 let stats = store 161 .get_stats() 162 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 163 let days_indexed = (UNIX_EPOCH + Duration::from_secs(INDEX_BEGAN_AT_TS)) 164 .elapsed() 165 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)? 166 .as_secs() 167 / 86400; 168 Ok(acceptable(accept, HelloReponse { 169 help: "open this URL in a web browser (or request with Accept: text/html) for information about this API.", 170 days_indexed, 171 stats, 172 })) 173} 174 175#[derive(Clone, Deserialize)] 176struct GetLinksCountQuery { 177 target: String, 178 collection: String, 179 path: String, 180} 181#[derive(Template, Serialize)] 182#[template(path = "links-count.html.j2")] 183struct GetLinksCountResponse { 184 total: u64, 185 #[serde(skip_serializing)] 186 query: GetLinksCountQuery, 187} 188fn count_links( 189 accept: ExtractAccept, 190 query: Query<GetLinksCountQuery>, 191 store: impl LinkReader, 192) -> Result<impl IntoResponse, http::StatusCode> { 193 let total = store 194 .get_count(&query.target, &query.collection, &query.path) 195 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 196 Ok(acceptable( 197 accept, 198 GetLinksCountResponse { 199 total, 200 query: (*query).clone(), 201 }, 202 )) 203} 204 205#[derive(Clone, Deserialize)] 206struct GetDidsCountQuery { 207 target: String, 208 collection: String, 209 path: String, 210} 211#[derive(Template, Serialize)] 212#[template(path = "dids-count.html.j2")] 213struct GetDidsCountResponse { 214 total: u64, 215 #[serde(skip_serializing)] 216 query: GetDidsCountQuery, 217} 218fn count_distinct_dids( 219 accept: ExtractAccept, 220 query: Query<GetDidsCountQuery>, 221 store: impl LinkReader, 222) -> Result<impl IntoResponse, http::StatusCode> { 223 let total = store 224 .get_distinct_did_count(&query.target, &query.collection, &query.path) 225 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 226 Ok(acceptable( 227 accept, 228 GetDidsCountResponse { 229 total, 230 query: (*query).clone(), 231 }, 232 )) 233} 234 235#[derive(Clone, Deserialize)] 236struct GetLinkItemsQuery { 237 target: String, 238 collection: String, 239 path: String, 240 cursor: Option<OpaqueApiCursor>, 241 /// Filter links only from these DIDs 242 /// 243 /// include multiple times to filter by multiple source DIDs 244 #[serde(default)] 245 did: Vec<String>, 246 /// [deprecated] Filter links only from these DIDs 247 /// 248 /// format: comma-separated sequence of DIDs 249 /// 250 /// errors: if `did` parameter is also present 251 /// 252 /// deprecated: use `did`, which can be repeated multiple times 253 from_dids: Option<String>, // comma separated: gross 254 #[serde(default = "get_default_limit")] 255 limit: u64, 256 // TODO: allow reverse (er, forward) order as well 257} 258fn get_default_limit() -> u64 { 259 DEFAULT_CURSOR_LIMIT 260} 261#[derive(Template, Serialize)] 262#[template(path = "links.html.j2")] 263struct GetLinkItemsResponse { 264 // what does staleness mean? 265 // - new links have appeared. would be nice to offer a `since` cursor to fetch these. and/or, 266 // - links have been deleted. hmm. 267 total: u64, 268 linking_records: Vec<RecordId>, 269 cursor: Option<OpaqueApiCursor>, 270 #[serde(skip_serializing)] 271 query: GetLinkItemsQuery, 272} 273fn get_links( 274 accept: ExtractAccept, 275 query: axum_extra::extract::Query<GetLinkItemsQuery>, // supports multiple param occurrences 276 store: impl LinkReader, 277) -> Result<impl IntoResponse, http::StatusCode> { 278 let until = query 279 .cursor 280 .clone() 281 .map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST)) 282 .transpose()? 283 .map(|c| c.next); 284 285 let limit = query.limit; 286 if limit > DEFAULT_CURSOR_LIMIT_MAX { 287 return Err(http::StatusCode::BAD_REQUEST); 288 } 289 290 let mut filter_dids: HashSet<Did> = HashSet::from_iter( 291 query 292 .did 293 .iter() 294 .map(|d| d.trim()) 295 .filter(|d| !d.is_empty()) 296 .map(|d| Did(d.to_string())), 297 ); 298 299 if let Some(comma_joined) = &query.from_dids { 300 if !filter_dids.is_empty() { 301 return Err(http::StatusCode::BAD_REQUEST); 302 } 303 for did in comma_joined.split(',') { 304 filter_dids.insert(Did(did.to_string())); 305 } 306 } 307 308 let paged = store 309 .get_links( 310 &query.target, 311 &query.collection, 312 &query.path, 313 limit, 314 until, 315 &filter_dids, 316 ) 317 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 318 319 let cursor = paged.next.map(|next| { 320 ApiCursor { 321 version: paged.version, 322 next, 323 } 324 .into() 325 }); 326 327 Ok(acceptable( 328 accept, 329 GetLinkItemsResponse { 330 total: paged.total, 331 linking_records: paged.items, 332 cursor, 333 query: (*query).clone(), 334 }, 335 )) 336} 337 338#[derive(Clone, Deserialize)] 339struct GetDidItemsQuery { 340 target: String, 341 collection: String, 342 path: String, 343 cursor: Option<OpaqueApiCursor>, 344 limit: Option<u64>, 345 // TODO: allow reverse (er, forward) order as well 346} 347#[derive(Template, Serialize)] 348#[template(path = "dids.html.j2")] 349struct GetDidItemsResponse { 350 // what does staleness mean? 351 // - new links have appeared. would be nice to offer a `since` cursor to fetch these. and/or, 352 // - links have been deleted. hmm. 353 total: u64, 354 linking_dids: Vec<Did>, 355 cursor: Option<OpaqueApiCursor>, 356 #[serde(skip_serializing)] 357 query: GetDidItemsQuery, 358} 359fn get_distinct_dids( 360 accept: ExtractAccept, 361 query: Query<GetDidItemsQuery>, 362 store: impl LinkReader, 363) -> Result<impl IntoResponse, http::StatusCode> { 364 let until = query 365 .cursor 366 .clone() 367 .map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST)) 368 .transpose()? 369 .map(|c| c.next); 370 371 let limit = query.limit.unwrap_or(DEFAULT_CURSOR_LIMIT); 372 if limit > DEFAULT_CURSOR_LIMIT_MAX { 373 return Err(http::StatusCode::BAD_REQUEST); 374 } 375 376 let paged = store 377 .get_distinct_dids(&query.target, &query.collection, &query.path, limit, until) 378 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 379 380 let cursor = paged.next.map(|next| { 381 ApiCursor { 382 version: paged.version, 383 next, 384 } 385 .into() 386 }); 387 388 Ok(acceptable( 389 accept, 390 GetDidItemsResponse { 391 total: paged.total, 392 linking_dids: paged.items, 393 cursor, 394 query: (*query).clone(), 395 }, 396 )) 397} 398 399#[derive(Clone, Deserialize)] 400struct GetAllLinksQuery { 401 target: String, 402} 403#[derive(Template, Serialize)] 404#[template(path = "links-all-count.html.j2")] 405struct GetAllLinksResponse { 406 links: HashMap<String, HashMap<String, u64>>, 407 #[serde(skip_serializing)] 408 query: GetAllLinksQuery, 409} 410fn count_all_links( 411 accept: ExtractAccept, 412 query: Query<GetAllLinksQuery>, 413 store: impl LinkReader, 414) -> Result<impl IntoResponse, http::StatusCode> { 415 let links = store 416 .get_all_record_counts(&query.target) 417 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 418 Ok(acceptable( 419 accept, 420 GetAllLinksResponse { 421 links, 422 query: (*query).clone(), 423 }, 424 )) 425} 426 427#[derive(Clone, Deserialize)] 428struct ExploreLinksQuery { 429 target: String, 430} 431#[derive(Template, Serialize)] 432#[template(path = "explore-links.html.j2")] 433struct ExploreLinksResponse { 434 links: HashMap<String, HashMap<String, CountsByCount>>, 435 #[serde(skip_serializing)] 436 query: ExploreLinksQuery, 437} 438fn explore_links( 439 accept: ExtractAccept, 440 query: Query<ExploreLinksQuery>, 441 store: impl LinkReader, 442) -> Result<impl IntoResponse, http::StatusCode> { 443 let links = store 444 .get_all_counts(&query.target) 445 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 446 Ok(acceptable( 447 accept, 448 ExploreLinksResponse { 449 links, 450 query: (*query).clone(), 451 }, 452 )) 453} 454 455#[serde_as] 456#[derive(Clone, Serialize, Deserialize)] // for json 457struct OpaqueApiCursor(#[serde_as(as = "serde_with::hex::Hex")] Vec<u8>); 458 459#[derive(Serialize, Deserialize)] // for bincode 460struct ApiCursor { 461 version: (u64, u64), // (collection length, deleted item count) 462 next: u64, 463} 464 465impl TryFrom<OpaqueApiCursor> for ApiCursor { 466 type Error = bincode::Error; 467 468 fn try_from(item: OpaqueApiCursor) -> Result<Self, Self::Error> { 469 bincode::DefaultOptions::new().deserialize(&item.0) 470 } 471} 472 473impl From<ApiCursor> for OpaqueApiCursor { 474 fn from(item: ApiCursor) -> Self { 475 OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap()) 476 } 477}