Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

first pass on a search endpoint

Changed files
+67
ufos
+33
ufos/src/server/mod.rs
···
OkCors(CollectionTimeseriesResponse { range, series }).into()
}
pub async fn serve(storage: impl StoreReader + 'static) -> Result<(), String> {
let log = ConfigLogging::StderrTerminal {
level: ConfigLoggingLevel::Info,
···
api.register(get_collections).unwrap();
api.register(get_prefix).unwrap();
api.register(get_timeseries).unwrap();
let context = Context {
spec: Arc::new(
···
OkCors(CollectionTimeseriesResponse { range, series }).into()
}
+
#[derive(Debug, Deserialize, JsonSchema)]
+
struct SearchQuery {
+
/// Query
+
///
+
/// at least two alphanumeric (+hyphen) characters must be present
+
q: String,
+
}
+
#[derive(Debug, Serialize, JsonSchema)]
+
struct SearchResponse {
+
matches: Vec<NsidCount>,
+
}
+
/// Search lexicons
+
#[endpoint {
+
method = GET,
+
path = "/search"
+
}]
+
async fn search_collections(
+
ctx: RequestContext<Context>,
+
query: Query<SearchQuery>,
+
) -> OkCorsResponse<SearchResponse> {
+
let Context { storage, .. } = ctx.context();
+
let q = query.into_inner();
+
// TODO: query validation
+
// TODO: also handle multi-space stuff (ufos-app tries to on client)
+
let terms: Vec<String> = q.q.split(' ').map(Into::into).collect();
+
let matches = storage
+
.search_collections(terms)
+
.await
+
.map_err(|e| HttpError::for_internal_error(format!("oh ugh: {e:?}")))?;
+
OkCors(SearchResponse { matches }).into()
+
}
+
pub async fn serve(storage: impl StoreReader + 'static) -> Result<(), String> {
let log = ConfigLogging::StderrTerminal {
level: ConfigLoggingLevel::Info,
···
api.register(get_collections).unwrap();
api.register(get_prefix).unwrap();
api.register(get_timeseries).unwrap();
+
api.register(search_collections).unwrap();
let context = Context {
spec: Arc::new(
+2
ufos/src/storage.rs
···
limit: usize,
expand_each_collection: bool,
) -> StorageResult<Vec<UFOsRecord>>;
}
···
limit: usize,
expand_each_collection: bool,
) -> StorageResult<Vec<UFOsRecord>>;
+
+
async fn search_collections(&self, terms: Vec<String>) -> StorageResult<Vec<NsidCount>>;
}
+32
ufos/src/storage_fjall.rs
···
}
Ok(merged)
}
}
#[async_trait]
···
FjallReader::get_records_by_collections(&s, collections, limit, expand_each_collection)
})
.await?
}
}
···
}
Ok(merged)
}
+
+
fn search_collections(&self, terms: Vec<String>) -> StorageResult<Vec<NsidCount>> {
+
let start = AllTimeRollupKey::start()?;
+
let end = AllTimeRollupKey::end()?;
+
let mut matches = Vec::new();
+
let limit = 16; // TODO: param
+
for kv in self.rollups.range((start, end)) {
+
let (key_bytes, val_bytes) = kv?;
+
let key = db_complete::<AllTimeRollupKey>(&key_bytes)?;
+
let nsid = key.collection().as_str().to_string();
+
for term in &terms {
+
if nsid.contains(term) {
+
let counts = db_complete::<CountsValue>(&val_bytes)?;
+
matches.push(NsidCount {
+
nsid: nsid.clone(),
+
creates: counts.counts().creates,
+
dids_estimate: counts.dids().estimate() as u64,
+
});
+
break;
+
}
+
}
+
if matches.len() >= limit {
+
break;
+
}
+
}
+
// TODO: indicate incomplete results
+
Ok(matches)
+
}
}
#[async_trait]
···
FjallReader::get_records_by_collections(&s, collections, limit, expand_each_collection)
})
.await?
+
}
+
async fn search_collections(&self, terms: Vec<String>) -> StorageResult<Vec<NsidCount>> {
+
let s = self.clone();
+
tokio::task::spawn_blocking(move || FjallReader::search_collections(&s, terms)).await?
}
}