Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1mod collections_query; 2mod cors; 3 4use crate::index_html::INDEX_HTML; 5use crate::storage::StoreReader; 6use crate::store_types::{HourTruncatedCursor, WeekTruncatedCursor}; 7use crate::{ 8 ConsumerInfo, Cursor, JustCount, Nsid, NsidCount, NsidPrefix, OrderCollectionsBy, PrefixChild, 9 UFOsRecord, 10}; 11use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _}; 12use chrono::{DateTime, Utc}; 13use collections_query::MultiCollectionQuery; 14use cors::{OkCors, OkCorsResponse}; 15use dropshot::endpoint; 16use dropshot::ApiDescription; 17use dropshot::Body; 18use dropshot::ConfigDropshot; 19use dropshot::ConfigLogging; 20use dropshot::ConfigLoggingLevel; 21use dropshot::HttpError; 22use dropshot::HttpResponse; 23use dropshot::Query; 24use dropshot::RequestContext; 25use dropshot::ServerBuilder; 26use dropshot::ServerContext; 27use http::{ 28 header::{ORIGIN, USER_AGENT}, 29 Response, StatusCode, 30}; 31use metrics::{counter, describe_counter, describe_histogram, histogram, Unit}; 32use schemars::JsonSchema; 33use serde::{Deserialize, Serialize}; 34use std::collections::{HashMap, HashSet}; 35use std::future::Future; 36use std::sync::Arc; 37use std::time::Instant; 38use std::time::{Duration, SystemTime, UNIX_EPOCH}; 39 40fn describe_metrics() { 41 describe_counter!( 42 "server_requests_total", 43 Unit::Count, 44 "total requests handled" 45 ); 46 describe_histogram!( 47 "server_handler_latency", 48 Unit::Microseconds, 49 "time to respond to a request in microseconds, excluding dropshot overhead" 50 ); 51} 52 53async fn instrument_handler<T, H, R>(ctx: &RequestContext<T>, handler: H) -> Result<R, HttpError> 54where 55 R: HttpResponse, 56 H: Future<Output = Result<R, HttpError>>, 57 T: ServerContext, 58{ 59 let start = Instant::now(); 60 let result = handler.await; 61 let latency = start.elapsed(); 62 let status_code = match &result { 63 Ok(response) => response.status_code(), 64 Err(ref e) => e.status_code.as_status(), 65 } 66 .as_str() // just the number (.to_string()'s Display does eg `200 OK`) 67 .to_string(); 68 let endpoint = ctx.endpoint.operation_id.clone(); 69 let headers = ctx.request.headers(); 70 let origin = headers 71 .get(ORIGIN) 72 .and_then(|v| v.to_str().ok()) 73 .unwrap_or("") 74 .to_string(); 75 let ua = headers 76 .get(USER_AGENT) 77 .and_then(|v| v.to_str().ok()) 78 .map(|ua| { 79 if ua.starts_with("Mozilla/5.0 ") { 80 "browser" 81 } else { 82 ua 83 } 84 }) 85 .unwrap_or("") 86 .to_string(); 87 counter!("server_requests_total", 88 "endpoint" => endpoint.clone(), 89 "origin" => origin, 90 "ua" => ua, 91 "status_code" => status_code, 92 ) 93 .increment(1); 94 histogram!("server_handler_latency", "endpoint" => endpoint).record(latency.as_micros() as f64); 95 result 96} 97 98struct Context { 99 pub spec: Arc<serde_json::Value>, 100 storage: Box<dyn StoreReader>, 101} 102 103fn dt_to_cursor(dt: DateTime<Utc>) -> Result<HourTruncatedCursor, HttpError> { 104 let t = dt.timestamp_micros(); 105 if t < 0 { 106 Err(HttpError::for_bad_request(None, "timestamp too old".into())) 107 } else { 108 let t = t as u64; 109 let t_now = SystemTime::now() 110 .duration_since(UNIX_EPOCH) 111 .unwrap() 112 .as_micros() as u64; 113 const ONE_HOUR: u64 = 60 * 60 * 1_000_000; 114 if t > t_now && (t - t_now > 2 * ONE_HOUR) { 115 Err(HttpError::for_bad_request(None, "future timestamp".into())) 116 } else { 117 Ok(HourTruncatedCursor::truncate_raw_u64(t)) 118 } 119 } 120} 121 122/// Serve index page as html 123#[endpoint { 124 method = GET, 125 path = "/", 126 /* 127 * not useful to have this in openapi 128 */ 129 unpublished = true, 130}] 131async fn index(ctx: RequestContext<Context>) -> Result<Response<Body>, HttpError> { 132 instrument_handler(&ctx, async { 133 Ok(Response::builder() 134 .status(StatusCode::OK) 135 .header(http::header::CONTENT_TYPE, "text/html") 136 .body(INDEX_HTML.into())?) 137 }) 138 .await 139} 140 141/// Meta: get the openapi spec for this api 142#[endpoint { 143 method = GET, 144 path = "/openapi", 145 /* 146 * not useful to have this in openapi 147 */ 148 unpublished = true, 149}] 150async fn get_openapi(ctx: RequestContext<Context>) -> OkCorsResponse<serde_json::Value> { 151 instrument_handler(&ctx, async { 152 let spec = (*ctx.context().spec).clone(); 153 OkCors(spec).into() 154 }) 155 .await 156} 157 158#[derive(Debug, Serialize, JsonSchema)] 159struct MetaInfo { 160 storage_name: String, 161 storage: serde_json::Value, 162 consumer: ConsumerInfo, 163} 164/// UFOs meta-info 165#[endpoint { 166 method = GET, 167 path = "/meta" 168}] 169async fn get_meta_info(ctx: RequestContext<Context>) -> OkCorsResponse<MetaInfo> { 170 let Context { storage, .. } = ctx.context(); 171 let failed_to_get = 172 |what| move |e| HttpError::for_internal_error(format!("failed to get {what}: {e:?}")); 173 174 instrument_handler(&ctx, async { 175 let storage_info = storage 176 .get_storage_stats() 177 .await 178 .map_err(failed_to_get("storage info"))?; 179 180 let consumer = storage 181 .get_consumer_info() 182 .await 183 .map_err(failed_to_get("consumer info"))?; 184 185 OkCors(MetaInfo { 186 storage_name: storage.name(), 187 storage: storage_info, 188 consumer, 189 }) 190 .into() 191 }) 192 .await 193} 194 195// TODO: replace with normal (🙃) multi-qs value somehow 196fn to_multiple_nsids(s: &str) -> Result<HashSet<Nsid>, String> { 197 let mut out = HashSet::new(); 198 for collection in s.split(',') { 199 let Ok(nsid) = Nsid::new(collection.to_string()) else { 200 return Err(format!("collection {collection:?} was not a valid NSID")); 201 }; 202 out.insert(nsid); 203 } 204 Ok(out) 205} 206 207#[derive(Debug, Deserialize, JsonSchema)] 208struct RecordsCollectionsQuery { 209 collection: Option<String>, // JsonSchema not implemented for Nsid :( 210} 211#[derive(Debug, Serialize, JsonSchema)] 212struct ApiRecord { 213 did: String, 214 collection: String, 215 rkey: String, 216 record: Box<serde_json::value::RawValue>, 217 time_us: u64, 218} 219impl From<UFOsRecord> for ApiRecord { 220 fn from(ufo: UFOsRecord) -> Self { 221 Self { 222 did: ufo.did.to_string(), 223 collection: ufo.collection.to_string(), 224 rkey: ufo.rkey.to_string(), 225 record: ufo.record, 226 time_us: ufo.cursor.to_raw_u64(), 227 } 228 } 229} 230/// Record samples 231/// 232/// Get most recent records seen in the firehose, by collection NSID 233/// 234/// Multiple collections are supported. They will be delivered in one big array with no 235/// specified order. 236#[endpoint { 237 method = GET, 238 path = "/records", 239}] 240async fn get_records_by_collections( 241 ctx: RequestContext<Context>, 242 collection_query: Query<RecordsCollectionsQuery>, 243) -> OkCorsResponse<Vec<ApiRecord>> { 244 let Context { storage, .. } = ctx.context(); 245 instrument_handler(&ctx, async { 246 let mut limit = 42; 247 let query = collection_query.into_inner(); 248 let collections = if let Some(provided_collection) = query.collection { 249 to_multiple_nsids(&provided_collection) 250 .map_err(|reason| HttpError::for_bad_request(None, reason))? 251 } else { 252 limit = 12; 253 let min_time_ago = SystemTime::now() - Duration::from_secs(86_400 * 3); // we want at least 3 days of data 254 let since: WeekTruncatedCursor = Cursor::at(min_time_ago).into(); 255 let (collections, _) = storage 256 .get_collections( 257 1000, 258 Default::default(), 259 Some(since.try_as().unwrap()), 260 None, 261 ) 262 .await 263 .map_err(|e| HttpError::for_internal_error(e.to_string()))?; 264 collections 265 .into_iter() 266 .map(|c| Nsid::new(c.nsid).unwrap()) 267 .collect() 268 }; 269 270 let records = storage 271 .get_records_by_collections(collections, limit, true) 272 .await 273 .map_err(|e| HttpError::for_internal_error(e.to_string()))? 274 .into_iter() 275 .map(|r| r.into()) 276 .collect(); 277 278 OkCors(records).into() 279 }) 280 .await 281} 282 283#[derive(Debug, Deserialize, JsonSchema)] 284struct CollectionsStatsQuery { 285 /// Limit stats to those seen after this UTC datetime 286 /// 287 /// default: 1 week ago 288 since: Option<DateTime<Utc>>, 289 /// Limit stats to those seen before this UTC datetime 290 /// 291 /// default: now 292 until: Option<DateTime<Utc>>, 293} 294/// Collection stats 295/// 296/// Get record statistics for collections during a specific time period. 297/// 298/// Note: the statistics are "rolled up" into hourly buckets in the background, 299/// so the data here can be as stale as that background task is behind. See the 300/// meta info endpoint to find out how up-to-date the rollup currently is. (In 301/// general it sholud be pretty close to live) 302#[endpoint { 303 method = GET, 304 path = "/collections/stats" 305}] 306async fn get_collection_stats( 307 ctx: RequestContext<Context>, 308 collections_query: MultiCollectionQuery, 309 query: Query<CollectionsStatsQuery>, 310) -> OkCorsResponse<HashMap<String, JustCount>> { 311 let Context { storage, .. } = ctx.context(); 312 313 instrument_handler(&ctx, async { 314 let q = query.into_inner(); 315 let collections: HashSet<Nsid> = collections_query.try_into()?; 316 317 let since = q.since.map(dt_to_cursor).transpose()?.unwrap_or_else(|| { 318 let week_ago_secs = 7 * 86_400; 319 let week_ago = SystemTime::now() - Duration::from_secs(week_ago_secs); 320 Cursor::at(week_ago).into() 321 }); 322 323 let until = q.until.map(dt_to_cursor).transpose()?; 324 325 let mut seen_by_collection = HashMap::with_capacity(collections.len()); 326 327 for collection in &collections { 328 let counts = storage 329 .get_collection_counts(collection, since, until) 330 .await 331 .map_err(|e| HttpError::for_internal_error(format!("boooo: {e:?}")))?; 332 333 seen_by_collection.insert(collection.to_string(), counts); 334 } 335 336 OkCors(seen_by_collection).into() 337 }) 338 .await 339} 340 341#[derive(Debug, Serialize, JsonSchema)] 342struct CollectionsResponse { 343 /// Each known collection and its associated statistics 344 /// 345 /// The order is unspecified. 346 collections: Vec<NsidCount>, 347 /// Include in a follow-up request to get the next page of results, if more are available 348 cursor: Option<String>, 349} 350#[derive(Debug, Deserialize, JsonSchema)] 351#[serde(rename_all = "kebab-case")] 352pub enum CollectionsQueryOrder { 353 RecordsCreated, 354 DidsEstimate, 355} 356impl From<&CollectionsQueryOrder> for OrderCollectionsBy { 357 fn from(q: &CollectionsQueryOrder) -> Self { 358 match q { 359 CollectionsQueryOrder::RecordsCreated => OrderCollectionsBy::RecordsCreated, 360 CollectionsQueryOrder::DidsEstimate => OrderCollectionsBy::DidsEstimate, 361 } 362 } 363} 364#[derive(Debug, Deserialize, JsonSchema)] 365struct CollectionsQuery { 366 /// The maximum number of collections to return in one request. 367 /// 368 /// Default: `100` normally, `32` if `order` is specified. 369 #[schemars(range(min = 1, max = 200))] 370 limit: Option<usize>, 371 /// Get a paginated response with more collections. 372 /// 373 /// Always omit the cursor for the first request. If more collections than the limit are available, the response will contain a non-null `cursor` to include with the next request. 374 /// 375 /// `cursor` is mutually exclusive with `order`. 376 cursor: Option<String>, 377 /// Limit collections and statistics to those seen after this UTC datetime 378 since: Option<DateTime<Utc>>, 379 /// Limit collections and statistics to those seen before this UTC datetime 380 until: Option<DateTime<Utc>>, 381 /// Get a limited, sorted list 382 /// 383 /// Mutually exclusive with `cursor` -- sorted results cannot be paged. 384 order: Option<CollectionsQueryOrder>, 385} 386 387/// List collections 388/// 389/// With statistics. 390/// 391/// ## To fetch a full list: 392/// 393/// Omit the `order` parameter and page through the results using the `cursor`. There have been a lot of collections seen in the ATmosphere, well over 400 at time of writing, so you *will* need to make a series of paginaged requests with `cursor`s to get them all. 394/// 395/// The set of collections across multiple requests is not guaranteed to be a perfectly consistent snapshot: 396/// 397/// - all collection NSIDs observed before the first request will be included in the results 398/// 399/// - *new* NSIDs observed in the firehose *while paging* might be included or excluded from the final set 400/// 401/// - no duplicate NSIDs will occur in the combined results 402/// 403/// In practice this is close enough for most use-cases to not worry about. 404/// 405/// ## To fetch the top collection NSIDs: 406/// 407/// Specify the `order` parameter (must be either `records-created` or `did-estimate`). Note that ordered results cannot be paged. 408/// 409/// All statistics are bucketed hourly, so the most granular effecitve time boundary for `since` and `until` is one hour. 410#[endpoint { 411 method = GET, 412 path = "/collections" 413}] 414async fn get_collections( 415 ctx: RequestContext<Context>, 416 query: Query<CollectionsQuery>, 417) -> OkCorsResponse<CollectionsResponse> { 418 let Context { storage, .. } = ctx.context(); 419 let q = query.into_inner(); 420 421 instrument_handler(&ctx, async { 422 if q.cursor.is_some() && q.order.is_some() { 423 let msg = 424 "`cursor` is mutually exclusive with `order`. ordered results cannot be paged."; 425 return Err(HttpError::for_bad_request(None, msg.to_string())); 426 } 427 428 let order = if let Some(ref o) = q.order { 429 o.into() 430 } else { 431 let cursor = q 432 .cursor 433 .and_then(|c| if c.is_empty() { None } else { Some(c) }) 434 .map(|c| URL_SAFE_NO_PAD.decode(&c)) 435 .transpose() 436 .map_err(|e| HttpError::for_bad_request(None, format!("invalid cursor: {e:?}")))?; 437 OrderCollectionsBy::Lexi { cursor } 438 }; 439 440 let limit = match (q.limit, q.order) { 441 (Some(limit), _) => limit, 442 (None, Some(_)) => 32, 443 (None, None) => 100, 444 }; 445 446 if !(1..=200).contains(&limit) { 447 let msg = format!("limit not in 1..=200: {limit}"); 448 return Err(HttpError::for_bad_request(None, msg)); 449 } 450 451 let since = q.since.map(dt_to_cursor).transpose()?; 452 let until = q.until.map(dt_to_cursor).transpose()?; 453 454 let (collections, next_cursor) = storage 455 .get_collections(limit, order, since, until) 456 .await 457 .map_err(|e| HttpError::for_internal_error(format!("oh shoot: {e:?}")))?; 458 459 let next_cursor = next_cursor.map(|c| URL_SAFE_NO_PAD.encode(c)); 460 461 OkCors(CollectionsResponse { 462 collections, 463 cursor: next_cursor, 464 }) 465 .into() 466 }) 467 .await 468} 469 470#[derive(Debug, Serialize, JsonSchema)] 471struct PrefixResponse { 472 /// Note that total may not include counts beyond the current page (TODO) 473 total: JustCount, 474 children: Vec<PrefixChild>, 475 /// Include in a follow-up request to get the next page of results, if more are available 476 cursor: Option<String>, 477} 478#[derive(Debug, Deserialize, JsonSchema)] 479struct PrefixQuery { 480 /// 481 /// The final segment of a collection NSID is the `name`, and everything before it is called its `group`. eg: 482 /// 483 /// - `app.bsky.feed.post` and `app.bsky.feed.like` are both in the _lexicon group_ "`app.bsky.feed`". 484 /// 485 prefix: String, 486 /// The maximum number of collections to return in one request. 487 /// 488 /// The number of items actually returned may be less than the limit. If paginating, this does **not** indicate that no 489 /// more items are available! Check if the `cursor` in the response is `null` to determine the end of items. 490 /// 491 /// Default: `100` normally, `32` if `order` is specified. 492 #[schemars(range(min = 1, max = 200))] 493 limit: Option<usize>, 494 /// Get a paginated response with more collections. 495 /// 496 /// Always omit the cursor for the first request. If more collections than the limit are available, the response will contain a non-null `cursor` to include with the next request. 497 /// 498 /// `cursor` is mutually exclusive with `order`. 499 cursor: Option<String>, 500 /// Limit collections and statistics to those seen after this UTC datetime 501 /// 502 /// Default: all-time 503 since: Option<DateTime<Utc>>, 504 /// Limit collections and statistics to those seen before this UTC datetime 505 /// 506 /// Default: now 507 until: Option<DateTime<Utc>>, 508 /// Get a limited, sorted list 509 /// 510 /// Mutually exclusive with `cursor` -- sorted results cannot be paged. 511 order: Option<CollectionsQueryOrder>, 512} 513/// Prefix-filter collections list 514/// 515/// This endpoint enumerates all collection NSIDs for a lexicon group. 516/// 517/// ## To fetch a full list: 518/// 519/// Omit the `order` parameter and page through the results using the `cursor`. There have been a lot of collections seen in the ATmosphere, well over 400 at time of writing, so you *will* need to make a series of paginaged requests with `cursor`s to get them all. 520/// 521/// The set of collections across multiple requests is not guaranteed to be a perfectly consistent snapshot: 522/// 523/// - all collection NSIDs observed before the first request will be included in the results 524/// 525/// - *new* NSIDs observed in the firehose *while paging* might be included or excluded from the final set 526/// 527/// - no duplicate NSIDs will occur in the combined results 528/// 529/// In practice this is close enough for most use-cases to not worry about. 530/// 531/// ## To fetch the top collection NSIDs: 532/// 533/// Specify the `order` parameter (must be either `records-created` or `did-estimate`). Note that ordered results cannot be paged. 534/// 535/// All statistics are bucketed hourly, so the most granular effecitve time boundary for `since` and `until` is one hour. 536#[endpoint { 537 method = GET, 538 path = "/prefix" 539}] 540async fn get_prefix( 541 ctx: RequestContext<Context>, 542 query: Query<PrefixQuery>, 543) -> OkCorsResponse<PrefixResponse> { 544 let Context { storage, .. } = ctx.context(); 545 let q = query.into_inner(); 546 547 instrument_handler(&ctx, async { 548 let prefix = NsidPrefix::new(&q.prefix).map_err(|e| { 549 HttpError::for_bad_request( 550 None, 551 format!("{:?} was not a valid NSID prefix: {e:?}", q.prefix), 552 ) 553 })?; 554 555 if q.cursor.is_some() && q.order.is_some() { 556 let msg = 557 "`cursor` is mutually exclusive with `order`. ordered results cannot be paged."; 558 return Err(HttpError::for_bad_request(None, msg.to_string())); 559 } 560 561 let order = if let Some(ref o) = q.order { 562 o.into() 563 } else { 564 let cursor = q 565 .cursor 566 .and_then(|c| if c.is_empty() { None } else { Some(c) }) 567 .map(|c| URL_SAFE_NO_PAD.decode(&c)) 568 .transpose() 569 .map_err(|e| HttpError::for_bad_request(None, format!("invalid cursor: {e:?}")))?; 570 OrderCollectionsBy::Lexi { cursor } 571 }; 572 573 let limit = match (q.limit, q.order) { 574 (Some(limit), _) => limit, 575 (None, Some(_)) => 32, 576 (None, None) => 100, 577 }; 578 579 if !(1..=200).contains(&limit) { 580 let msg = format!("limit not in 1..=200: {limit}"); 581 return Err(HttpError::for_bad_request(None, msg)); 582 } 583 584 let since = q.since.map(dt_to_cursor).transpose()?; 585 let until = q.until.map(dt_to_cursor).transpose()?; 586 587 let (total, children, next_cursor) = storage 588 .get_prefix(prefix, limit, order, since, until) 589 .await 590 .map_err(|e| HttpError::for_internal_error(format!("oh shoot: {e:?}")))?; 591 592 let next_cursor = next_cursor.map(|c| URL_SAFE_NO_PAD.encode(c)); 593 594 OkCors(PrefixResponse { 595 total, 596 children, 597 cursor: next_cursor, 598 }) 599 .into() 600 }) 601 .await 602} 603 604#[derive(Debug, Deserialize, JsonSchema)] 605struct CollectionTimeseriesQuery { 606 collection: String, // JsonSchema not implemented for Nsid :( 607 /// Limit collections and statistics to those seen after this UTC datetime 608 /// 609 /// default: 1 week ago 610 since: Option<DateTime<Utc>>, 611 /// Limit collections and statistics to those seen before this UTC datetime 612 /// 613 /// default: now 614 until: Option<DateTime<Utc>>, 615 /// time steps between data, in seconds 616 /// 617 /// the step will be rounded down to the nearest hour 618 /// 619 /// default: 86400 (24hrs) 620 #[schemars(range(min = 3600))] 621 step: Option<u64>, 622 // todo: rolling averages 623} 624#[derive(Debug, Serialize, JsonSchema)] 625struct CollectionTimeseriesResponse { 626 range: Vec<DateTime<Utc>>, 627 series: HashMap<String, Vec<JustCount>>, 628} 629/// Collection timeseries stats 630#[endpoint { 631 method = GET, 632 path = "/timeseries" 633}] 634async fn get_timeseries( 635 ctx: RequestContext<Context>, 636 query: Query<CollectionTimeseriesQuery>, 637) -> OkCorsResponse<CollectionTimeseriesResponse> { 638 let Context { storage, .. } = ctx.context(); 639 let q = query.into_inner(); 640 641 instrument_handler(&ctx, async { 642 let since = q.since.map(dt_to_cursor).transpose()?.unwrap_or_else(|| { 643 let week_ago_secs = 7 * 86_400; 644 let week_ago = SystemTime::now() - Duration::from_secs(week_ago_secs); 645 Cursor::at(week_ago).into() 646 }); 647 648 let until = q.until.map(dt_to_cursor).transpose()?; 649 650 let step = if let Some(secs) = q.step { 651 if secs < 3600 { 652 let msg = format!("step is too small: {secs}"); 653 Err(HttpError::for_bad_request(None, msg))?; 654 } 655 (secs / 3600) * 3600 // trucate to hour 656 } else { 657 86_400 658 }; 659 660 let nsid = Nsid::new(q.collection).map_err(|e| { 661 HttpError::for_bad_request(None, format!("collection was not a valid NSID: {e:?}")) 662 })?; 663 664 let (range_cursors, series) = storage 665 .get_timeseries(vec![nsid], since, until, step) 666 .await 667 .map_err(|e| HttpError::for_internal_error(format!("oh shoot: {e:?}")))?; 668 669 let range = range_cursors 670 .into_iter() 671 .map(|c| DateTime::<Utc>::from_timestamp_micros(c.to_raw_u64() as i64).unwrap()) 672 .collect(); 673 674 let series = series 675 .into_iter() 676 .map(|(k, v)| (k.to_string(), v.iter().map(Into::into).collect())) 677 .collect(); 678 679 OkCors(CollectionTimeseriesResponse { range, series }).into() 680 }) 681 .await 682} 683 684#[derive(Debug, Deserialize, JsonSchema)] 685struct SearchQuery { 686 /// Query 687 /// 688 /// at least two alphanumeric (+hyphen) characters must be present 689 q: String, 690} 691#[derive(Debug, Serialize, JsonSchema)] 692struct SearchResponse { 693 matches: Vec<NsidCount>, 694} 695/// Search lexicons 696#[endpoint { 697 method = GET, 698 path = "/search" 699}] 700async fn search_collections( 701 ctx: RequestContext<Context>, 702 query: Query<SearchQuery>, 703) -> OkCorsResponse<SearchResponse> { 704 let Context { storage, .. } = ctx.context(); 705 let q = query.into_inner(); 706 instrument_handler(&ctx, async { 707 // TODO: query validation 708 // TODO: also handle multi-space stuff (ufos-app tries to on client) 709 let terms: Vec<String> = q.q.split(' ').map(Into::into).collect(); 710 let matches = storage 711 .search_collections(terms) 712 .await 713 .map_err(|e| HttpError::for_internal_error(format!("oh ugh: {e:?}")))?; 714 OkCors(SearchResponse { matches }).into() 715 }) 716 .await 717} 718 719pub async fn serve(storage: impl StoreReader + 'static) -> Result<(), String> { 720 describe_metrics(); 721 let log = ConfigLogging::StderrTerminal { 722 level: ConfigLoggingLevel::Warn, 723 } 724 .to_logger("server") 725 .map_err(|e| e.to_string())?; 726 727 let mut api = ApiDescription::new(); 728 729 api.register(index).unwrap(); 730 api.register(get_openapi).unwrap(); 731 api.register(get_meta_info).unwrap(); 732 api.register(get_records_by_collections).unwrap(); 733 api.register(get_collection_stats).unwrap(); 734 api.register(get_collections).unwrap(); 735 api.register(get_prefix).unwrap(); 736 api.register(get_timeseries).unwrap(); 737 api.register(search_collections).unwrap(); 738 739 let context = Context { 740 spec: Arc::new( 741 api.openapi( 742 "UFOs API: Every lexicon in the ATmosphere", 743 env!("CARGO_PKG_VERSION") 744 .parse() 745 .inspect_err(|e| { 746 log::warn!("failed to parse cargo package version for openapi: {e:?}") 747 }) 748 .unwrap_or(semver::Version::new(0, 0, 1)), 749 ) 750 .description("Samples and statistics of atproto records by their collection NSID") 751 .contact_name("part of @microcosm.blue") 752 .contact_url("https://microcosm.blue") 753 .json() 754 .map_err(|e| e.to_string())?, 755 ), 756 storage: Box::new(storage), 757 }; 758 759 ServerBuilder::new(api, context, log) 760 .config(ConfigDropshot { 761 bind_address: "0.0.0.0:9999".parse().unwrap(), 762 ..Default::default() 763 }) 764 .start() 765 .map_err(|error| format!("failed to start server: {error}"))? 766 .await 767}