Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

deserialize multiple collections from query

blehhhhhh

Changed files
+121 -120
ufos
-1
ufos/src/lib.rs
···
pub mod error;
pub mod file_consumer;
pub mod index_html;
-
pub mod qs_query;
pub mod server;
pub mod storage;
pub mod storage_fjall;
-73
ufos/src/qs_query.rs
···
-
use async_trait::async_trait;
-
use dropshot::{
-
ApiEndpointBodyContentType, ExclusiveExtractor, ExtractorMetadata, HttpError, RequestContext,
-
RequestInfo, ServerContext, SharedExtractor,
-
};
-
/// copied from https://github.com/oxidecomputer/dropshot/blob/695e1d8872c988c43066eb0848c87c127eeda361/dropshot/src/extractor/query.rs
-
/// Apache 2.0: https://github.com/oxidecomputer/dropshot/blob/695e1d8872c988c43066eb0848c87c127eeda361/LICENSE
-
use schemars::JsonSchema;
-
use serde::de::DeserializeOwned;
-
-
/// `VecsAllowedQuery<QueryType>` is an extractor used to deserialize an
-
/// instance of `QueryType` from an HTTP request's query string. `QueryType`
-
/// is any structure of yours that implements [serde::Deserialize] and
-
/// [schemars::JsonSchema]. See the crate documentation for more information.
-
#[derive(Debug)]
-
pub struct VecsAllowedQuery<QueryType: DeserializeOwned + JsonSchema + Send + Sync> {
-
inner: QueryType,
-
}
-
impl<QueryType: DeserializeOwned + JsonSchema + Send + Sync> VecsAllowedQuery<QueryType> {
-
// TODO drop this in favor of Deref? + Display and Debug for convenience?
-
pub fn into_inner(self) -> QueryType {
-
self.inner
-
}
-
}
-
-
/// Given an HTTP request, pull out the query string and attempt to deserialize
-
/// it as an instance of `QueryType`.
-
fn http_request_load_query<QueryType>(
-
request: &RequestInfo,
-
) -> Result<VecsAllowedQuery<QueryType>, HttpError>
-
where
-
QueryType: DeserializeOwned + JsonSchema + Send + Sync,
-
{
-
let raw_query_string = request.uri().query().unwrap_or("");
-
// TODO-correctness: are query strings defined to be urlencoded in this way?
-
match serde_qs::from_str(raw_query_string) {
-
Ok(q) => Ok(VecsAllowedQuery { inner: q }),
-
Err(e) => Err(HttpError::for_bad_request(
-
None,
-
format!("unable to parse query string: {}", e),
-
)),
-
}
-
}
-
-
// The `SharedExtractor` implementation for Query<QueryType> describes how to
-
// construct an instance of `Query<QueryType>` from an HTTP request: namely, by
-
// parsing the query string to an instance of `QueryType`.
-
// TODO-cleanup We shouldn't have to use the "'static" bound on `QueryType`
-
// here. It seems like we ought to be able to use 'async_trait, but that
-
// doesn't seem to be defined.
-
#[async_trait]
-
impl<QueryType> SharedExtractor for VecsAllowedQuery<QueryType>
-
where
-
QueryType: JsonSchema + DeserializeOwned + Send + Sync + 'static,
-
{
-
async fn from_request<Context: ServerContext>(
-
rqctx: &RequestContext<Context>,
-
) -> Result<VecsAllowedQuery<QueryType>, HttpError> {
-
http_request_load_query(&rqctx.request)
-
}
-
-
fn metadata(body_content_type: ApiEndpointBodyContentType) -> ExtractorMetadata {
-
// HACK: would love to use Query here but it "helpfully" panics when it sees a Vec.
-
// we can't really get at enough of Query's logic to use it directly, sadly, so the
-
// resulting openapi docs suck (query params are listed as body payload, example
-
// codes make no sense, etc.)
-
//
-
// trying to hack the resulting ExtractorMetadata to look like Query's is a pain:
-
// things almost work out but then something in dropshot won't be `pub` and it falls
-
// apart. maybe it's possible, i didn't get it in the time i had.
-
dropshot::TypedBody::<QueryType>::metadata(body_content_type)
-
}
-
}
+26 -46
ufos/src/server.rs ufos/src/server/mod.rs
···
+
mod collections_query;
+
mod cors;
+
use crate::index_html::INDEX_HTML;
-
use crate::qs_query::VecsAllowedQuery;
use crate::storage::StoreReader;
use crate::store_types::{HourTruncatedCursor, WeekTruncatedCursor};
use crate::{ConsumerInfo, Cursor, JustCount, Nsid, NsidCount, OrderCollectionsBy, UFOsRecord};
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _};
use chrono::{DateTime, Utc};
+
use collections_query::MultiCollectionQuery;
+
use cors::{OkCors, OkCorsResponse};
use dropshot::endpoint;
use dropshot::ApiDescription;
use dropshot::Body;
···
use dropshot::ConfigLogging;
use dropshot::ConfigLoggingLevel;
use dropshot::HttpError;
-
use dropshot::HttpResponseHeaders;
-
use dropshot::HttpResponseOk;
use dropshot::Query;
use dropshot::RequestContext;
use dropshot::ServerBuilder;
···
}]
async fn get_openapi(ctx: RequestContext<Context>) -> OkCorsResponse<serde_json::Value> {
let spec = (*ctx.context().spec).clone();
-
ok_cors(spec)
+
OkCors(spec).into()
}
#[derive(Debug, Serialize, JsonSchema)]
···
.await
.map_err(failed_to_get("consumer info"))?;
-
ok_cors(MetaInfo {
+
OkCors(MetaInfo {
storage_name: storage.name(),
storage: storage_info,
consumer,
})
+
.into()
}
// TODO: replace with normal (🙃) multi-qs value somehow
···
.map(|r| r.into())
.collect();
-
ok_cors(records)
+
OkCors(records).into()
}
#[derive(Debug, Deserialize, JsonSchema)]
-
struct TotalSeenCollectionsQuery {
-
collection: Vec<String>, // JsonSchema not implemented for Nsid :(
+
struct CollectionsStatsQuery {
/// Limit stats to those seen after this UTC datetime
///
/// default: 1 week ago
···
}
/// Collection stats
///
-
/// Get stats for a collection over a specific time period
-
///
-
/// API docs note: the **Body** fields here are actually query parameters!!
-
///
-
/// Due to limitations with dropshot's query parsing (no support for sequences),
-
/// this is kind of the best i could do for now. sadly.
+
/// Get record statistics for collections during a specific time period
#[endpoint {
method = GET,
path = "/collections/stats"
}]
-
async fn get_records_total_seen(
+
async fn get_collection_stats(
ctx: RequestContext<Context>,
-
query: VecsAllowedQuery<TotalSeenCollectionsQuery>,
+
collections_query: MultiCollectionQuery,
+
query: Query<CollectionsStatsQuery>,
) -> OkCorsResponse<HashMap<String, TotalCounts>> {
let Context { storage, .. } = ctx.context();
let q = query.into_inner();
-
-
log::warn!("collection: {:?}", q.collection);
-
-
let mut collections = Vec::with_capacity(q.collection.len());
-
for c in q.collection {
-
let Ok(nsid) = Nsid::new(c.clone()) else {
-
return Err(HttpError::for_bad_request(
-
None,
-
format!("could not parse collection to nsid: {c}"),
-
));
-
};
-
collections.push(nsid);
-
}
+
let collections: HashSet<Nsid> = collections_query.try_into()?;
-
let since = q.since.map(dt_to_cursor).transpose()?;
-
let until = q.until.map(dt_to_cursor).transpose()?;
+
let _since = q.since.map(dt_to_cursor).transpose()?;
+
let _until = q.until.map(dt_to_cursor).transpose()?;
let mut seen_by_collection = HashMap::with_capacity(collections.len());
···
);
}
-
ok_cors(seen_by_collection)
+
OkCors(seen_by_collection).into()
}
#[derive(Debug, Serialize, JsonSchema)]
···
order: Option<CollectionsQueryOrder>,
}
-
/// List collections (with stats)
+
/// List collections
+
///
+
/// With statistics.
///
/// ## To fetch a full list:
///
···
let next_cursor = next_cursor.map(|c| URL_SAFE_NO_PAD.encode(c));
-
ok_cors(CollectionsResponse {
+
OkCors(CollectionsResponse {
collections,
cursor: next_cursor,
})
+
.into()
}
#[derive(Debug, Deserialize, JsonSchema)]
···
let step = if let Some(secs) = q.step {
if secs < 3600 {
let msg = format!("step is too small: {}", secs);
-
return Err(HttpError::for_bad_request(None, msg));
+
Err(HttpError::for_bad_request(None, msg))?;
}
(secs / 3600) * 3600 // trucate to hour
} else {
···
.map(|(k, v)| (k.to_string(), v.iter().map(Into::into).collect()))
.collect();
-
ok_cors(CollectionTimeseriesResponse { range, series })
+
OkCors(CollectionTimeseriesResponse { range, series }).into()
}
pub async fn serve(storage: impl StoreReader + 'static) -> Result<(), String> {
···
api.register(get_openapi).unwrap();
api.register(get_meta_info).unwrap();
api.register(get_records_by_collections).unwrap();
-
api.register(get_records_total_seen).unwrap();
+
api.register(get_collection_stats).unwrap();
api.register(get_collections).unwrap();
api.register(get_timeseries).unwrap();
···
.map_err(|error| format!("failed to start server: {}", error))?
.await
}
-
-
/// awkward helpers
-
type OkCorsResponse<T> = Result<HttpResponseHeaders<HttpResponseOk<T>>, HttpError>;
-
fn ok_cors<T: Send + Sync + Serialize + JsonSchema>(t: T) -> OkCorsResponse<T> {
-
let mut res = HttpResponseHeaders::new_unnamed(HttpResponseOk(t));
-
res.headers_mut()
-
.insert("access-control-allow-origin", "*".parse().unwrap());
-
Ok(res)
-
}
+72
ufos/src/server/collections_query.rs
···
+
use crate::Nsid;
+
use async_trait::async_trait;
+
use dropshot::{
+
ApiEndpointBodyContentType, ExtractorMetadata, HttpError, Query, RequestContext, ServerContext,
+
SharedExtractor,
+
};
+
use schemars::JsonSchema;
+
use serde::Deserialize;
+
use std::collections::HashSet;
+
+
/// The real type that gets deserialized
+
#[derive(Debug, Deserialize, JsonSchema)]
+
pub struct MultiCollectionQuery {
+
pub collection: Vec<String>,
+
}
+
+
/// The fake corresponding type for docs that dropshot won't freak out about a
+
/// vec for
+
#[derive(Deserialize, JsonSchema)]
+
#[allow(dead_code)]
+
struct MultiCollectionQueryForDocs {
+
/// One or more collection [NSID](https://atproto.com/specs/nsid)s
+
///
+
/// Pass this parameter multiple times to specify multiple collections, like
+
/// `collection=app.bsky.feed.like&collection=app.bsky.feed.post`
+
collection: String,
+
}
+
+
impl TryFrom<MultiCollectionQuery> for HashSet<Nsid> {
+
type Error = HttpError;
+
fn try_from(mcq: MultiCollectionQuery) -> Result<Self, Self::Error> {
+
let mut out = HashSet::with_capacity(mcq.collection.len());
+
for c in mcq.collection {
+
let nsid = Nsid::new(c).map_err(|e| {
+
HttpError::for_bad_request(
+
None,
+
format!("failed to convert collection to an NSID: {e:?}"),
+
)
+
})?;
+
out.insert(nsid);
+
}
+
Ok(out)
+
}
+
}
+
+
// The `SharedExtractor` implementation for Query<QueryType> describes how to
+
// construct an instance of `Query<QueryType>` from an HTTP request: namely, by
+
// parsing the query string to an instance of `QueryType`.
+
#[async_trait]
+
impl SharedExtractor for MultiCollectionQuery {
+
async fn from_request<Context: ServerContext>(
+
ctx: &RequestContext<Context>,
+
) -> Result<MultiCollectionQuery, HttpError> {
+
let raw_query = ctx.request.uri().query().unwrap_or("");
+
let q = serde_qs::from_str(raw_query).map_err(|e| {
+
HttpError::for_bad_request(None, format!("unable to parse query string: {}", e))
+
})?;
+
Ok(q)
+
}
+
+
fn metadata(body_content_type: ApiEndpointBodyContentType) -> ExtractorMetadata {
+
// HACK: query type switcheroo: passing MultiCollectionQuery to
+
// `metadata` would "helpfully" panic because dropshot believes we can
+
// only have scalar types in a query.
+
//
+
// so instead we have a fake second type whose only job is to look the
+
// same as MultiCollectionQuery exept that it has `String` instead of
+
// `Vec<String>`, which dropshot will accept, and generate ~close-enough
+
// docs for.
+
<Query<MultiCollectionQueryForDocs> as SharedExtractor>::metadata(body_content_type)
+
}
+
}
+23
ufos/src/server/cors.rs
···
+
use dropshot::{HttpError, HttpResponseHeaders, HttpResponseOk};
+
use schemars::JsonSchema;
+
use serde::Serialize;
+
+
pub type OkCorsResponse<T> = Result<HttpResponseHeaders<HttpResponseOk<T>>, HttpError>;
+
+
/// Helper for constructing Ok responses: return OkCors(T).into()
+
/// (not happy with this yet)
+
pub struct OkCors<T: Serialize + JsonSchema + Send + Sync>(pub T);
+
+
impl<T> From<OkCors<T>> for OkCorsResponse<T>
+
where
+
T: Serialize + JsonSchema + Send + Sync,
+
{
+
fn from(ok: OkCors<T>) -> OkCorsResponse<T> {
+
let mut res = HttpResponseHeaders::new_unnamed(HttpResponseOk(ok.0));
+
res.headers_mut()
+
.insert("access-control-allow-origin", "*".parse().unwrap());
+
Ok(res)
+
}
+
}
+
+
// TODO: cors for HttpError