forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1mod collections_query;
2mod cors;
3
4use crate::index_html::INDEX_HTML;
5use crate::storage::StoreReader;
6use crate::store_types::{HourTruncatedCursor, WeekTruncatedCursor};
7use crate::{
8 ConsumerInfo, Cursor, JustCount, Nsid, NsidCount, NsidPrefix, OrderCollectionsBy, PrefixChild,
9 UFOsRecord,
10};
11use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _};
12use chrono::{DateTime, Utc};
13use collections_query::MultiCollectionQuery;
14use cors::{OkCors, OkCorsResponse};
15use dropshot::endpoint;
16use dropshot::ApiDescription;
17use dropshot::Body;
18use dropshot::ConfigDropshot;
19use dropshot::ConfigLogging;
20use dropshot::ConfigLoggingLevel;
21use dropshot::HttpError;
22use dropshot::HttpResponse;
23use dropshot::Query;
24use dropshot::RequestContext;
25use dropshot::ServerBuilder;
26use dropshot::ServerContext;
27use http::{
28 header::{ORIGIN, USER_AGENT},
29 Response, StatusCode,
30};
31use metrics::{counter, describe_counter, describe_histogram, histogram, Unit};
32use schemars::JsonSchema;
33use serde::{Deserialize, Serialize};
34use std::collections::{HashMap, HashSet};
35use std::future::Future;
36use std::sync::Arc;
37use std::time::Instant;
38use std::time::{Duration, SystemTime, UNIX_EPOCH};
39
40fn describe_metrics() {
41 describe_counter!(
42 "server_requests_total",
43 Unit::Count,
44 "total requests handled"
45 );
46 describe_histogram!(
47 "server_handler_latency",
48 Unit::Microseconds,
49 "time to respond to a request in microseconds, excluding dropshot overhead"
50 );
51}
52
53async fn instrument_handler<T, H, R>(ctx: &RequestContext<T>, handler: H) -> Result<R, HttpError>
54where
55 R: HttpResponse,
56 H: Future<Output = Result<R, HttpError>>,
57 T: ServerContext,
58{
59 let start = Instant::now();
60 let result = handler.await;
61 let latency = start.elapsed();
62 let status_code = match &result {
63 Ok(response) => response.status_code(),
64 Err(ref e) => e.status_code.as_status(),
65 }
66 .as_str() // just the number (.to_string()'s Display does eg `200 OK`)
67 .to_string();
68 let endpoint = ctx.endpoint.operation_id.clone();
69 let headers = ctx.request.headers();
70 let origin = headers
71 .get(ORIGIN)
72 .and_then(|v| v.to_str().ok())
73 .unwrap_or("")
74 .to_string();
75 let ua = headers
76 .get(USER_AGENT)
77 .and_then(|v| v.to_str().ok())
78 .map(|ua| {
79 if ua.starts_with("Mozilla/5.0 ") {
80 "browser"
81 } else {
82 ua
83 }
84 })
85 .unwrap_or("")
86 .to_string();
87 counter!("server_requests_total",
88 "endpoint" => endpoint.clone(),
89 "origin" => origin,
90 "ua" => ua,
91 "status_code" => status_code,
92 )
93 .increment(1);
94 histogram!("server_handler_latency", "endpoint" => endpoint).record(latency.as_micros() as f64);
95 result
96}
97
98struct Context {
99 pub spec: Arc<serde_json::Value>,
100 storage: Box<dyn StoreReader>,
101}
102
103fn dt_to_cursor(dt: DateTime<Utc>) -> Result<HourTruncatedCursor, HttpError> {
104 let t = dt.timestamp_micros();
105 if t < 0 {
106 Err(HttpError::for_bad_request(None, "timestamp too old".into()))
107 } else {
108 let t = t as u64;
109 let t_now = SystemTime::now()
110 .duration_since(UNIX_EPOCH)
111 .unwrap()
112 .as_micros() as u64;
113 const ONE_HOUR: u64 = 60 * 60 * 1_000_000;
114 if t > t_now && (t - t_now > 2 * ONE_HOUR) {
115 Err(HttpError::for_bad_request(None, "future timestamp".into()))
116 } else {
117 Ok(HourTruncatedCursor::truncate_raw_u64(t))
118 }
119 }
120}
121
122/// Serve index page as html
123#[endpoint {
124 method = GET,
125 path = "/",
126 /*
127 * not useful to have this in openapi
128 */
129 unpublished = true,
130}]
131async fn index(ctx: RequestContext<Context>) -> Result<Response<Body>, HttpError> {
132 instrument_handler(&ctx, async {
133 Ok(Response::builder()
134 .status(StatusCode::OK)
135 .header(http::header::CONTENT_TYPE, "text/html")
136 .body(INDEX_HTML.into())?)
137 })
138 .await
139}
140
141/// Meta: get the openapi spec for this api
142#[endpoint {
143 method = GET,
144 path = "/openapi",
145 /*
146 * not useful to have this in openapi
147 */
148 unpublished = true,
149}]
150async fn get_openapi(ctx: RequestContext<Context>) -> OkCorsResponse<serde_json::Value> {
151 instrument_handler(&ctx, async {
152 let spec = (*ctx.context().spec).clone();
153 OkCors(spec).into()
154 })
155 .await
156}
157
158#[derive(Debug, Serialize, JsonSchema)]
159struct MetaInfo {
160 storage_name: String,
161 storage: serde_json::Value,
162 consumer: ConsumerInfo,
163}
164/// UFOs meta-info
165#[endpoint {
166 method = GET,
167 path = "/meta"
168}]
169async fn get_meta_info(ctx: RequestContext<Context>) -> OkCorsResponse<MetaInfo> {
170 let Context { storage, .. } = ctx.context();
171 let failed_to_get =
172 |what| move |e| HttpError::for_internal_error(format!("failed to get {what}: {e:?}"));
173
174 instrument_handler(&ctx, async {
175 let storage_info = storage
176 .get_storage_stats()
177 .await
178 .map_err(failed_to_get("storage info"))?;
179
180 let consumer = storage
181 .get_consumer_info()
182 .await
183 .map_err(failed_to_get("consumer info"))?;
184
185 OkCors(MetaInfo {
186 storage_name: storage.name(),
187 storage: storage_info,
188 consumer,
189 })
190 .into()
191 })
192 .await
193}
194
195// TODO: replace with normal (🙃) multi-qs value somehow
196fn to_multiple_nsids(s: &str) -> Result<HashSet<Nsid>, String> {
197 let mut out = HashSet::new();
198 for collection in s.split(',') {
199 let Ok(nsid) = Nsid::new(collection.to_string()) else {
200 return Err(format!("collection {collection:?} was not a valid NSID"));
201 };
202 out.insert(nsid);
203 }
204 Ok(out)
205}
206
207#[derive(Debug, Deserialize, JsonSchema)]
208struct RecordsCollectionsQuery {
209 collection: Option<String>, // JsonSchema not implemented for Nsid :(
210}
211#[derive(Debug, Serialize, JsonSchema)]
212struct ApiRecord {
213 did: String,
214 collection: String,
215 rkey: String,
216 record: Box<serde_json::value::RawValue>,
217 time_us: u64,
218}
219impl From<UFOsRecord> for ApiRecord {
220 fn from(ufo: UFOsRecord) -> Self {
221 Self {
222 did: ufo.did.to_string(),
223 collection: ufo.collection.to_string(),
224 rkey: ufo.rkey.to_string(),
225 record: ufo.record,
226 time_us: ufo.cursor.to_raw_u64(),
227 }
228 }
229}
230/// Record samples
231///
232/// Get most recent records seen in the firehose, by collection NSID
233///
234/// Multiple collections are supported. They will be delivered in one big array with no
235/// specified order.
236#[endpoint {
237 method = GET,
238 path = "/records",
239}]
240async fn get_records_by_collections(
241 ctx: RequestContext<Context>,
242 collection_query: Query<RecordsCollectionsQuery>,
243) -> OkCorsResponse<Vec<ApiRecord>> {
244 let Context { storage, .. } = ctx.context();
245 instrument_handler(&ctx, async {
246 let mut limit = 42;
247 let query = collection_query.into_inner();
248 let collections = if let Some(provided_collection) = query.collection {
249 to_multiple_nsids(&provided_collection)
250 .map_err(|reason| HttpError::for_bad_request(None, reason))?
251 } else {
252 limit = 12;
253 let min_time_ago = SystemTime::now() - Duration::from_secs(86_400 * 3); // we want at least 3 days of data
254 let since: WeekTruncatedCursor = Cursor::at(min_time_ago).into();
255 let (collections, _) = storage
256 .get_collections(
257 1000,
258 Default::default(),
259 Some(since.try_as().unwrap()),
260 None,
261 )
262 .await
263 .map_err(|e| HttpError::for_internal_error(e.to_string()))?;
264 collections
265 .into_iter()
266 .map(|c| Nsid::new(c.nsid).unwrap())
267 .collect()
268 };
269
270 let records = storage
271 .get_records_by_collections(collections, limit, true)
272 .await
273 .map_err(|e| HttpError::for_internal_error(e.to_string()))?
274 .into_iter()
275 .map(|r| r.into())
276 .collect();
277
278 OkCors(records).into()
279 })
280 .await
281}
282
283#[derive(Debug, Deserialize, JsonSchema)]
284struct CollectionsStatsQuery {
285 /// Limit stats to those seen after this UTC datetime
286 ///
287 /// default: 1 week ago
288 since: Option<DateTime<Utc>>,
289 /// Limit stats to those seen before this UTC datetime
290 ///
291 /// default: now
292 until: Option<DateTime<Utc>>,
293}
294/// Collection stats
295///
296/// Get record statistics for collections during a specific time period.
297///
298/// Note: the statistics are "rolled up" into hourly buckets in the background,
299/// so the data here can be as stale as that background task is behind. See the
300/// meta info endpoint to find out how up-to-date the rollup currently is. (In
301/// general it sholud be pretty close to live)
302#[endpoint {
303 method = GET,
304 path = "/collections/stats"
305}]
306async fn get_collection_stats(
307 ctx: RequestContext<Context>,
308 collections_query: MultiCollectionQuery,
309 query: Query<CollectionsStatsQuery>,
310) -> OkCorsResponse<HashMap<String, JustCount>> {
311 let Context { storage, .. } = ctx.context();
312
313 instrument_handler(&ctx, async {
314 let q = query.into_inner();
315 let collections: HashSet<Nsid> = collections_query.try_into()?;
316
317 let since = q.since.map(dt_to_cursor).transpose()?.unwrap_or_else(|| {
318 let week_ago_secs = 7 * 86_400;
319 let week_ago = SystemTime::now() - Duration::from_secs(week_ago_secs);
320 Cursor::at(week_ago).into()
321 });
322
323 let until = q.until.map(dt_to_cursor).transpose()?;
324
325 let mut seen_by_collection = HashMap::with_capacity(collections.len());
326
327 for collection in &collections {
328 let counts = storage
329 .get_collection_counts(collection, since, until)
330 .await
331 .map_err(|e| HttpError::for_internal_error(format!("boooo: {e:?}")))?;
332
333 seen_by_collection.insert(collection.to_string(), counts);
334 }
335
336 OkCors(seen_by_collection).into()
337 })
338 .await
339}
340
341#[derive(Debug, Serialize, JsonSchema)]
342struct CollectionsResponse {
343 /// Each known collection and its associated statistics
344 ///
345 /// The order is unspecified.
346 collections: Vec<NsidCount>,
347 /// Include in a follow-up request to get the next page of results, if more are available
348 cursor: Option<String>,
349}
350#[derive(Debug, Deserialize, JsonSchema)]
351#[serde(rename_all = "kebab-case")]
352pub enum CollectionsQueryOrder {
353 RecordsCreated,
354 DidsEstimate,
355}
356impl From<&CollectionsQueryOrder> for OrderCollectionsBy {
357 fn from(q: &CollectionsQueryOrder) -> Self {
358 match q {
359 CollectionsQueryOrder::RecordsCreated => OrderCollectionsBy::RecordsCreated,
360 CollectionsQueryOrder::DidsEstimate => OrderCollectionsBy::DidsEstimate,
361 }
362 }
363}
364#[derive(Debug, Deserialize, JsonSchema)]
365struct CollectionsQuery {
366 /// The maximum number of collections to return in one request.
367 ///
368 /// Default: `100` normally, `32` if `order` is specified.
369 #[schemars(range(min = 1, max = 200))]
370 limit: Option<usize>,
371 /// Get a paginated response with more collections.
372 ///
373 /// Always omit the cursor for the first request. If more collections than the limit are available, the response will contain a non-null `cursor` to include with the next request.
374 ///
375 /// `cursor` is mutually exclusive with `order`.
376 cursor: Option<String>,
377 /// Limit collections and statistics to those seen after this UTC datetime
378 since: Option<DateTime<Utc>>,
379 /// Limit collections and statistics to those seen before this UTC datetime
380 until: Option<DateTime<Utc>>,
381 /// Get a limited, sorted list
382 ///
383 /// Mutually exclusive with `cursor` -- sorted results cannot be paged.
384 order: Option<CollectionsQueryOrder>,
385}
386
387/// List collections
388///
389/// With statistics.
390///
391/// ## To fetch a full list:
392///
393/// Omit the `order` parameter and page through the results using the `cursor`. There have been a lot of collections seen in the ATmosphere, well over 400 at time of writing, so you *will* need to make a series of paginaged requests with `cursor`s to get them all.
394///
395/// The set of collections across multiple requests is not guaranteed to be a perfectly consistent snapshot:
396///
397/// - all collection NSIDs observed before the first request will be included in the results
398///
399/// - *new* NSIDs observed in the firehose *while paging* might be included or excluded from the final set
400///
401/// - no duplicate NSIDs will occur in the combined results
402///
403/// In practice this is close enough for most use-cases to not worry about.
404///
405/// ## To fetch the top collection NSIDs:
406///
407/// Specify the `order` parameter (must be either `records-created` or `did-estimate`). Note that ordered results cannot be paged.
408///
409/// All statistics are bucketed hourly, so the most granular effecitve time boundary for `since` and `until` is one hour.
410#[endpoint {
411 method = GET,
412 path = "/collections"
413}]
414async fn get_collections(
415 ctx: RequestContext<Context>,
416 query: Query<CollectionsQuery>,
417) -> OkCorsResponse<CollectionsResponse> {
418 let Context { storage, .. } = ctx.context();
419 let q = query.into_inner();
420
421 instrument_handler(&ctx, async {
422 if q.cursor.is_some() && q.order.is_some() {
423 let msg =
424 "`cursor` is mutually exclusive with `order`. ordered results cannot be paged.";
425 return Err(HttpError::for_bad_request(None, msg.to_string()));
426 }
427
428 let order = if let Some(ref o) = q.order {
429 o.into()
430 } else {
431 let cursor = q
432 .cursor
433 .and_then(|c| if c.is_empty() { None } else { Some(c) })
434 .map(|c| URL_SAFE_NO_PAD.decode(&c))
435 .transpose()
436 .map_err(|e| HttpError::for_bad_request(None, format!("invalid cursor: {e:?}")))?;
437 OrderCollectionsBy::Lexi { cursor }
438 };
439
440 let limit = match (q.limit, q.order) {
441 (Some(limit), _) => limit,
442 (None, Some(_)) => 32,
443 (None, None) => 100,
444 };
445
446 if !(1..=200).contains(&limit) {
447 let msg = format!("limit not in 1..=200: {limit}");
448 return Err(HttpError::for_bad_request(None, msg));
449 }
450
451 let since = q.since.map(dt_to_cursor).transpose()?;
452 let until = q.until.map(dt_to_cursor).transpose()?;
453
454 let (collections, next_cursor) = storage
455 .get_collections(limit, order, since, until)
456 .await
457 .map_err(|e| HttpError::for_internal_error(format!("oh shoot: {e:?}")))?;
458
459 let next_cursor = next_cursor.map(|c| URL_SAFE_NO_PAD.encode(c));
460
461 OkCors(CollectionsResponse {
462 collections,
463 cursor: next_cursor,
464 })
465 .into()
466 })
467 .await
468}
469
470#[derive(Debug, Serialize, JsonSchema)]
471struct PrefixResponse {
472 /// Note that total may not include counts beyond the current page (TODO)
473 total: JustCount,
474 children: Vec<PrefixChild>,
475 /// Include in a follow-up request to get the next page of results, if more are available
476 cursor: Option<String>,
477}
478#[derive(Debug, Deserialize, JsonSchema)]
479struct PrefixQuery {
480 ///
481 /// The final segment of a collection NSID is the `name`, and everything before it is called its `group`. eg:
482 ///
483 /// - `app.bsky.feed.post` and `app.bsky.feed.like` are both in the _lexicon group_ "`app.bsky.feed`".
484 ///
485 prefix: String,
486 /// The maximum number of collections to return in one request.
487 ///
488 /// The number of items actually returned may be less than the limit. If paginating, this does **not** indicate that no
489 /// more items are available! Check if the `cursor` in the response is `null` to determine the end of items.
490 ///
491 /// Default: `100` normally, `32` if `order` is specified.
492 #[schemars(range(min = 1, max = 200))]
493 limit: Option<usize>,
494 /// Get a paginated response with more collections.
495 ///
496 /// Always omit the cursor for the first request. If more collections than the limit are available, the response will contain a non-null `cursor` to include with the next request.
497 ///
498 /// `cursor` is mutually exclusive with `order`.
499 cursor: Option<String>,
500 /// Limit collections and statistics to those seen after this UTC datetime
501 ///
502 /// Default: all-time
503 since: Option<DateTime<Utc>>,
504 /// Limit collections and statistics to those seen before this UTC datetime
505 ///
506 /// Default: now
507 until: Option<DateTime<Utc>>,
508 /// Get a limited, sorted list
509 ///
510 /// Mutually exclusive with `cursor` -- sorted results cannot be paged.
511 order: Option<CollectionsQueryOrder>,
512}
513/// Prefix-filter collections list
514///
515/// This endpoint enumerates all collection NSIDs for a lexicon group.
516///
517/// ## To fetch a full list:
518///
519/// Omit the `order` parameter and page through the results using the `cursor`. There have been a lot of collections seen in the ATmosphere, well over 400 at time of writing, so you *will* need to make a series of paginaged requests with `cursor`s to get them all.
520///
521/// The set of collections across multiple requests is not guaranteed to be a perfectly consistent snapshot:
522///
523/// - all collection NSIDs observed before the first request will be included in the results
524///
525/// - *new* NSIDs observed in the firehose *while paging* might be included or excluded from the final set
526///
527/// - no duplicate NSIDs will occur in the combined results
528///
529/// In practice this is close enough for most use-cases to not worry about.
530///
531/// ## To fetch the top collection NSIDs:
532///
533/// Specify the `order` parameter (must be either `records-created` or `did-estimate`). Note that ordered results cannot be paged.
534///
535/// All statistics are bucketed hourly, so the most granular effecitve time boundary for `since` and `until` is one hour.
536#[endpoint {
537 method = GET,
538 path = "/prefix"
539}]
540async fn get_prefix(
541 ctx: RequestContext<Context>,
542 query: Query<PrefixQuery>,
543) -> OkCorsResponse<PrefixResponse> {
544 let Context { storage, .. } = ctx.context();
545 let q = query.into_inner();
546
547 instrument_handler(&ctx, async {
548 let prefix = NsidPrefix::new(&q.prefix).map_err(|e| {
549 HttpError::for_bad_request(
550 None,
551 format!("{:?} was not a valid NSID prefix: {e:?}", q.prefix),
552 )
553 })?;
554
555 if q.cursor.is_some() && q.order.is_some() {
556 let msg =
557 "`cursor` is mutually exclusive with `order`. ordered results cannot be paged.";
558 return Err(HttpError::for_bad_request(None, msg.to_string()));
559 }
560
561 let order = if let Some(ref o) = q.order {
562 o.into()
563 } else {
564 let cursor = q
565 .cursor
566 .and_then(|c| if c.is_empty() { None } else { Some(c) })
567 .map(|c| URL_SAFE_NO_PAD.decode(&c))
568 .transpose()
569 .map_err(|e| HttpError::for_bad_request(None, format!("invalid cursor: {e:?}")))?;
570 OrderCollectionsBy::Lexi { cursor }
571 };
572
573 let limit = match (q.limit, q.order) {
574 (Some(limit), _) => limit,
575 (None, Some(_)) => 32,
576 (None, None) => 100,
577 };
578
579 if !(1..=200).contains(&limit) {
580 let msg = format!("limit not in 1..=200: {limit}");
581 return Err(HttpError::for_bad_request(None, msg));
582 }
583
584 let since = q.since.map(dt_to_cursor).transpose()?;
585 let until = q.until.map(dt_to_cursor).transpose()?;
586
587 let (total, children, next_cursor) = storage
588 .get_prefix(prefix, limit, order, since, until)
589 .await
590 .map_err(|e| HttpError::for_internal_error(format!("oh shoot: {e:?}")))?;
591
592 let next_cursor = next_cursor.map(|c| URL_SAFE_NO_PAD.encode(c));
593
594 OkCors(PrefixResponse {
595 total,
596 children,
597 cursor: next_cursor,
598 })
599 .into()
600 })
601 .await
602}
603
604#[derive(Debug, Deserialize, JsonSchema)]
605struct CollectionTimeseriesQuery {
606 collection: String, // JsonSchema not implemented for Nsid :(
607 /// Limit collections and statistics to those seen after this UTC datetime
608 ///
609 /// default: 1 week ago
610 since: Option<DateTime<Utc>>,
611 /// Limit collections and statistics to those seen before this UTC datetime
612 ///
613 /// default: now
614 until: Option<DateTime<Utc>>,
615 /// time steps between data, in seconds
616 ///
617 /// the step will be rounded down to the nearest hour
618 ///
619 /// default: 86400 (24hrs)
620 #[schemars(range(min = 3600))]
621 step: Option<u64>,
622 // todo: rolling averages
623}
624#[derive(Debug, Serialize, JsonSchema)]
625struct CollectionTimeseriesResponse {
626 range: Vec<DateTime<Utc>>,
627 series: HashMap<String, Vec<JustCount>>,
628}
629/// Collection timeseries stats
630#[endpoint {
631 method = GET,
632 path = "/timeseries"
633}]
634async fn get_timeseries(
635 ctx: RequestContext<Context>,
636 query: Query<CollectionTimeseriesQuery>,
637) -> OkCorsResponse<CollectionTimeseriesResponse> {
638 let Context { storage, .. } = ctx.context();
639 let q = query.into_inner();
640
641 instrument_handler(&ctx, async {
642 let since = q.since.map(dt_to_cursor).transpose()?.unwrap_or_else(|| {
643 let week_ago_secs = 7 * 86_400;
644 let week_ago = SystemTime::now() - Duration::from_secs(week_ago_secs);
645 Cursor::at(week_ago).into()
646 });
647
648 let until = q.until.map(dt_to_cursor).transpose()?;
649
650 let step = if let Some(secs) = q.step {
651 if secs < 3600 {
652 let msg = format!("step is too small: {secs}");
653 Err(HttpError::for_bad_request(None, msg))?;
654 }
655 (secs / 3600) * 3600 // trucate to hour
656 } else {
657 86_400
658 };
659
660 let nsid = Nsid::new(q.collection).map_err(|e| {
661 HttpError::for_bad_request(None, format!("collection was not a valid NSID: {e:?}"))
662 })?;
663
664 let (range_cursors, series) = storage
665 .get_timeseries(vec![nsid], since, until, step)
666 .await
667 .map_err(|e| HttpError::for_internal_error(format!("oh shoot: {e:?}")))?;
668
669 let range = range_cursors
670 .into_iter()
671 .map(|c| DateTime::<Utc>::from_timestamp_micros(c.to_raw_u64() as i64).unwrap())
672 .collect();
673
674 let series = series
675 .into_iter()
676 .map(|(k, v)| (k.to_string(), v.iter().map(Into::into).collect()))
677 .collect();
678
679 OkCors(CollectionTimeseriesResponse { range, series }).into()
680 })
681 .await
682}
683
684#[derive(Debug, Deserialize, JsonSchema)]
685struct SearchQuery {
686 /// Query
687 ///
688 /// at least two alphanumeric (+hyphen) characters must be present
689 q: String,
690}
691#[derive(Debug, Serialize, JsonSchema)]
692struct SearchResponse {
693 matches: Vec<NsidCount>,
694}
695/// Search lexicons
696#[endpoint {
697 method = GET,
698 path = "/search"
699}]
700async fn search_collections(
701 ctx: RequestContext<Context>,
702 query: Query<SearchQuery>,
703) -> OkCorsResponse<SearchResponse> {
704 let Context { storage, .. } = ctx.context();
705 let q = query.into_inner();
706 instrument_handler(&ctx, async {
707 // TODO: query validation
708 // TODO: also handle multi-space stuff (ufos-app tries to on client)
709 let terms: Vec<String> = q.q.split(' ').map(Into::into).collect();
710 let matches = storage
711 .search_collections(terms)
712 .await
713 .map_err(|e| HttpError::for_internal_error(format!("oh ugh: {e:?}")))?;
714 OkCors(SearchResponse { matches }).into()
715 })
716 .await
717}
718
719pub async fn serve(storage: impl StoreReader + 'static) -> Result<(), String> {
720 describe_metrics();
721 let log = ConfigLogging::StderrTerminal {
722 level: ConfigLoggingLevel::Warn,
723 }
724 .to_logger("server")
725 .map_err(|e| e.to_string())?;
726
727 let mut api = ApiDescription::new();
728
729 api.register(index).unwrap();
730 api.register(get_openapi).unwrap();
731 api.register(get_meta_info).unwrap();
732 api.register(get_records_by_collections).unwrap();
733 api.register(get_collection_stats).unwrap();
734 api.register(get_collections).unwrap();
735 api.register(get_prefix).unwrap();
736 api.register(get_timeseries).unwrap();
737 api.register(search_collections).unwrap();
738
739 let context = Context {
740 spec: Arc::new(
741 api.openapi(
742 "UFOs API: Every lexicon in the ATmosphere",
743 env!("CARGO_PKG_VERSION")
744 .parse()
745 .inspect_err(|e| {
746 log::warn!("failed to parse cargo package version for openapi: {e:?}")
747 })
748 .unwrap_or(semver::Version::new(0, 0, 1)),
749 )
750 .description("Samples and statistics of atproto records by their collection NSID")
751 .contact_name("part of @microcosm.blue")
752 .contact_url("https://microcosm.blue")
753 .json()
754 .map_err(|e| e.to_string())?,
755 ),
756 storage: Box::new(storage),
757 };
758
759 ServerBuilder::new(api, context, log)
760 .config(ConfigDropshot {
761 bind_address: "0.0.0.0:9999".parse().unwrap(),
762 ..Default::default()
763 })
764 .start()
765 .map_err(|error| format!("failed to start server: {error}"))?
766 .await
767}