Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

dropshot mult-value query param: pain

Changed files
+132 -11
ufos
+14
Cargo.lock
···
[[package]]
+
name = "serde_qs"
+
version = "1.0.0-rc.3"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "4cb0b9062a400c31442e67d1f2b1e7746bebd691110ebee1b7d0c7293b04fab1"
+
dependencies = [
+
"itoa",
+
"percent-encoding",
+
"ryu",
+
"serde",
+
"thiserror 2.0.12",
+
]
+
+
[[package]]
name = "serde_spanned"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
"semver",
"serde",
"serde_json",
+
"serde_qs",
"sha2",
"tempfile",
"thiserror 2.0.12",
+1
ufos/Cargo.toml
···
semver = "1.0.26"
serde = "1.0.219"
serde_json = "1.0.140"
+
serde_qs = "1.0.0-rc.3"
sha2 = "0.10.9"
thiserror = "2.0.12"
tokio = { version = "1.44.2", features = ["full", "sync", "time"] }
+1
ufos/src/lib.rs
···
pub mod error;
pub mod file_consumer;
pub mod index_html;
+
pub mod qs_query;
pub mod server;
pub mod storage;
pub mod storage_fjall;
+73
ufos/src/qs_query.rs
···
+
use async_trait::async_trait;
+
use dropshot::{
+
ApiEndpointBodyContentType, ExclusiveExtractor, ExtractorMetadata, HttpError, RequestContext,
+
RequestInfo, ServerContext, SharedExtractor,
+
};
+
/// copied from https://github.com/oxidecomputer/dropshot/blob/695e1d8872c988c43066eb0848c87c127eeda361/dropshot/src/extractor/query.rs
+
/// Apache 2.0: https://github.com/oxidecomputer/dropshot/blob/695e1d8872c988c43066eb0848c87c127eeda361/LICENSE
+
use schemars::JsonSchema;
+
use serde::de::DeserializeOwned;
+
+
/// `VecsAllowedQuery<QueryType>` is an extractor used to deserialize an
+
/// instance of `QueryType` from an HTTP request's query string. `QueryType`
+
/// is any structure of yours that implements [serde::Deserialize] and
+
/// [schemars::JsonSchema]. See the crate documentation for more information.
+
#[derive(Debug)]
+
pub struct VecsAllowedQuery<QueryType: DeserializeOwned + JsonSchema + Send + Sync> {
+
inner: QueryType,
+
}
+
impl<QueryType: DeserializeOwned + JsonSchema + Send + Sync> VecsAllowedQuery<QueryType> {
+
// TODO drop this in favor of Deref? + Display and Debug for convenience?
+
pub fn into_inner(self) -> QueryType {
+
self.inner
+
}
+
}
+
+
/// Given an HTTP request, pull out the query string and attempt to deserialize
+
/// it as an instance of `QueryType`.
+
fn http_request_load_query<QueryType>(
+
request: &RequestInfo,
+
) -> Result<VecsAllowedQuery<QueryType>, HttpError>
+
where
+
QueryType: DeserializeOwned + JsonSchema + Send + Sync,
+
{
+
let raw_query_string = request.uri().query().unwrap_or("");
+
// TODO-correctness: are query strings defined to be urlencoded in this way?
+
match serde_qs::from_str(raw_query_string) {
+
Ok(q) => Ok(VecsAllowedQuery { inner: q }),
+
Err(e) => Err(HttpError::for_bad_request(
+
None,
+
format!("unable to parse query string: {}", e),
+
)),
+
}
+
}
+
+
// The `SharedExtractor` implementation for Query<QueryType> describes how to
+
// construct an instance of `Query<QueryType>` from an HTTP request: namely, by
+
// parsing the query string to an instance of `QueryType`.
+
// TODO-cleanup We shouldn't have to use the "'static" bound on `QueryType`
+
// here. It seems like we ought to be able to use 'async_trait, but that
+
// doesn't seem to be defined.
+
#[async_trait]
+
impl<QueryType> SharedExtractor for VecsAllowedQuery<QueryType>
+
where
+
QueryType: JsonSchema + DeserializeOwned + Send + Sync + 'static,
+
{
+
async fn from_request<Context: ServerContext>(
+
rqctx: &RequestContext<Context>,
+
) -> Result<VecsAllowedQuery<QueryType>, HttpError> {
+
http_request_load_query(&rqctx.request)
+
}
+
+
fn metadata(body_content_type: ApiEndpointBodyContentType) -> ExtractorMetadata {
+
// HACK: would love to use Query here but it "helpfully" panics when it sees a Vec.
+
// we can't really get at enough of Query's logic to use it directly, sadly, so the
+
// resulting openapi docs suck (query params are listed as body payload, example
+
// codes make no sense, etc.)
+
//
+
// trying to hack the resulting ExtractorMetadata to look like Query's is a pain:
+
// things almost work out but then something in dropshot won't be `pub` and it falls
+
// apart. maybe it's possible, i didn't get it in the time i had.
+
dropshot::TypedBody::<QueryType>::metadata(body_content_type)
+
}
+
}
+43 -11
ufos/src/server.rs
···
use crate::index_html::INDEX_HTML;
+
use crate::qs_query::VecsAllowedQuery;
use crate::storage::StoreReader;
use crate::store_types::{HourTruncatedCursor, WeekTruncatedCursor};
use crate::{ConsumerInfo, Cursor, JustCount, Nsid, NsidCount, OrderCollectionsBy, UFOsRecord};
···
use dropshot::Query;
use dropshot::RequestContext;
use dropshot::ServerBuilder;
+
use http::{Response, StatusCode};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
···
storage: serde_json::Value,
consumer: ConsumerInfo,
}
-
/// Get meta information about UFOs itself
+
/// UFOs meta-info
#[endpoint {
method = GET,
path = "/meta"
···
}
}
}
-
/// Get recent records by collection
+
/// Record samples
+
///
+
/// Get most recent records seen in the firehose, by collection NSID
///
/// Multiple collections are supported. They will be delivered in one big array with no
/// specified order.
···
#[derive(Debug, Deserialize, JsonSchema)]
struct TotalSeenCollectionsQuery {
-
collection: String, // JsonSchema not implemented for Nsid :(
+
collection: Vec<String>, // JsonSchema not implemented for Nsid :(
+
/// Limit stats to those seen after this UTC datetime
+
///
+
/// default: 1 week ago
+
since: Option<DateTime<Utc>>,
+
/// Limit stats to those seen before this UTC datetime
+
///
+
/// default: now
+
until: Option<DateTime<Utc>>,
}
#[derive(Debug, Serialize, JsonSchema)]
struct TotalCounts {
total_creates: u64,
dids_estimate: u64,
}
-
/// Get total records seen by collection
+
/// Collection stats
+
///
+
/// Get stats for a collection over a specific time period
+
///
+
/// API docs note: the **Body** fields here are actually query parameters!!
+
///
+
/// Due to limitations with dropshot's query parsing (no support for sequences),
+
/// this is kind of the best i could do for now. sadly.
#[endpoint {
method = GET,
-
path = "/records/total-seen"
+
path = "/collections/stats"
}]
async fn get_records_total_seen(
ctx: RequestContext<Context>,
-
collection_query: Query<TotalSeenCollectionsQuery>,
+
query: VecsAllowedQuery<TotalSeenCollectionsQuery>,
) -> OkCorsResponse<HashMap<String, TotalCounts>> {
let Context { storage, .. } = ctx.context();
+
let q = query.into_inner();
-
let query = collection_query.into_inner();
-
let collections = to_multiple_nsids(&query.collection)
-
.map_err(|reason| HttpError::for_bad_request(None, reason))?;
+
log::warn!("collection: {:?}", q.collection);
+
+
let mut collections = Vec::with_capacity(q.collection.len());
+
for c in q.collection {
+
let Ok(nsid) = Nsid::new(c.clone()) else {
+
return Err(HttpError::for_bad_request(
+
None,
+
format!("could not parse collection to nsid: {c}"),
+
));
+
};
+
collections.push(nsid);
+
}
+
+
let since = q.since.map(dt_to_cursor).transpose()?;
+
let until = q.until.map(dt_to_cursor).transpose()?;
let mut seen_by_collection = HashMap::with_capacity(collections.len());
···
order: Option<CollectionsQueryOrder>,
}
-
/// Get collection with statistics
+
/// List collections (with stats)
///
/// ## To fetch a full list:
///
···
range: Vec<DateTime<Utc>>,
series: HashMap<String, Vec<JustCount>>,
}
-
/// Get timeseries data
+
/// Collection timeseries stats
#[endpoint {
method = GET,
path = "/timeseries"