Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

include all counts in reponses

Changed files
+56 -38
ufos
+29 -3
ufos/src/lib.rs
···
use crate::db_types::{EncodingError, EncodingResult};
use crate::error::BatchInsertError;
-
use crate::store_types::SketchSecretPrefix;
use cardinality_estimator_safe::{Element, Sketch};
use error::FirehoseEventError;
use jetstream::events::{CommitEvent, CommitOp, Cursor};
···
pub struct NsidCount {
nsid: String,
creates: u64,
-
// TODO: add updates and deletes
dids_estimate: u64,
}
#[derive(Debug, PartialEq, Serialize, JsonSchema)]
pub struct PrefixCount {
prefix: String,
creates: u64,
-
// TODO: add updates and deletes
dids_estimate: u64,
}
#[derive(Debug, PartialEq, Serialize, JsonSchema)]
···
use crate::db_types::{EncodingError, EncodingResult};
use crate::error::BatchInsertError;
+
use crate::store_types::{CountsValue, SketchSecretPrefix};
use cardinality_estimator_safe::{Element, Sketch};
use error::FirehoseEventError;
use jetstream::events::{CommitEvent, CommitOp, Cursor};
···
pub struct NsidCount {
nsid: String,
creates: u64,
+
updates: u64,
+
deletes: u64,
dids_estimate: u64,
+
}
+
impl NsidCount {
+
pub fn new(nsid: &Nsid, counts: &CountsValue) -> Self {
+
let crud = counts.counts();
+
Self {
+
nsid: nsid.to_string(),
+
creates: crud.creates,
+
updates: crud.updates,
+
deletes: crud.deletes,
+
dids_estimate: counts.dids().estimate() as u64,
+
}
+
}
}
#[derive(Debug, PartialEq, Serialize, JsonSchema)]
pub struct PrefixCount {
prefix: String,
creates: u64,
+
updates: u64,
+
deletes: u64,
dids_estimate: u64,
+
}
+
impl PrefixCount {
+
pub fn new(prefix: &str, counts: &CountsValue) -> Self {
+
let crud = counts.counts();
+
Self {
+
prefix: prefix.to_string(),
+
creates: crud.creates,
+
updates: crud.updates,
+
deletes: crud.deletes,
+
dids_estimate: counts.dids().estimate() as u64,
+
}
+
}
}
#[derive(Debug, PartialEq, Serialize, JsonSchema)]
+27 -35
ufos/src/storage_fjall.rs
···
merged.merge(&counts);
}
}
-
out.push(NsidCount {
-
nsid: nsid.to_string(),
-
creates: merged.counts().creates,
-
dids_estimate: merged.dids().estimate() as u64,
-
});
}
let next_cursor = current_nsid.map(|s| s.to_db_bytes()).transpose()?;
···
.into_iter()
.rev()
.take(limit)
-
.map(|(nsid, cv)| NsidCount {
-
nsid: nsid.to_string(),
-
creates: cv.counts().creates,
-
dids_estimate: cv.dids().estimate() as u64,
-
})
.collect();
Ok(counts)
}
···
let mut prefix_count = CountsValue::default();
#[derive(Debug, Clone, PartialEq)]
enum Child {
-
FullNsid(String),
ChildPrefix(String),
}
impl Child {
fn from_prefix(nsid: &Nsid, prefix: &NsidPrefix) -> Option<Self> {
if prefix.is_group_of(nsid) {
-
return Some(Child::FullNsid(nsid.to_string()));
}
let suffix = nsid.as_str().strip_prefix(&format!("{}.", prefix.0))?;
let (segment, _) = suffix.split_once('.').unwrap();
···
}
fn is_before(&self, other: &Child) -> bool {
match (self, other) {
-
(Child::FullNsid(s), Child::ChildPrefix(o)) if s == o => true,
-
(Child::ChildPrefix(s), Child::FullNsid(o)) if s == o => false,
-
(Child::FullNsid(s), Child::FullNsid(o)) => s < o,
(Child::ChildPrefix(s), Child::ChildPrefix(o)) => s < o,
-
(Child::FullNsid(s), Child::ChildPrefix(o)) => s < o,
-
(Child::ChildPrefix(s), Child::FullNsid(o)) => s < o,
}
}
fn into_inner(self) -> String {
match self {
-
Child::FullNsid(s) => s,
Child::ChildPrefix(s) => s,
}
}
···
}
}
items.push(match child {
-
Child::FullNsid(nsid) => PrefixChild::Collection(NsidCount {
-
nsid,
-
creates: merged.counts().creates,
-
dids_estimate: merged.dids().estimate() as u64,
-
}),
-
Child::ChildPrefix(prefix) => PrefixChild::Prefix(PrefixCount {
-
prefix,
-
creates: merged.counts().creates,
-
dids_estimate: merged.dids().estimate() as u64,
-
}),
});
}
···
for kv in self.rollups.range((start, end)) {
let (key_bytes, val_bytes) = kv?;
let key = db_complete::<AllTimeRollupKey>(&key_bytes)?;
-
let nsid = key.collection().as_str().to_string();
for term in &terms {
if nsid.contains(term) {
let counts = db_complete::<CountsValue>(&val_bytes)?;
-
matches.push(NsidCount {
-
nsid: nsid.clone(),
-
creates: counts.counts().creates,
-
dids_estimate: counts.dids().estimate() as u64,
-
});
break;
}
}
···
vec![PrefixChild::Collection(NsidCount {
nsid: "a.a.a".to_string(),
creates: 1,
dids_estimate: 1
}),]
);
···
vec![PrefixChild::Prefix(PrefixCount {
prefix: "a.a.a".to_string(),
creates: 1,
-
dids_estimate: 1
}),]
);
assert_eq!(cursor, None);
···
vec![PrefixChild::Prefix(PrefixCount {
prefix: "a.a.a".to_string(),
creates: 2,
dids_estimate: 1
}),]
);
···
PrefixChild::Collection(NsidCount {
nsid: "a.a.a.a".to_string(),
creates: 1,
dids_estimate: 1
}),
PrefixChild::Prefix(PrefixCount {
prefix: "a.a.a.a".to_string(),
creates: 1,
dids_estimate: 1
}),
]
···
merged.merge(&counts);
}
}
+
out.push(NsidCount::new(&nsid, &merged));
}
let next_cursor = current_nsid.map(|s| s.to_db_bytes()).transpose()?;
···
.into_iter()
.rev()
.take(limit)
+
.map(|(nsid, cv)| NsidCount::new(&nsid, &cv))
.collect();
Ok(counts)
}
···
let mut prefix_count = CountsValue::default();
#[derive(Debug, Clone, PartialEq)]
enum Child {
+
FullNsid(Nsid),
ChildPrefix(String),
}
impl Child {
fn from_prefix(nsid: &Nsid, prefix: &NsidPrefix) -> Option<Self> {
if prefix.is_group_of(nsid) {
+
return Some(Child::FullNsid(nsid.clone()));
}
let suffix = nsid.as_str().strip_prefix(&format!("{}.", prefix.0))?;
let (segment, _) = suffix.split_once('.').unwrap();
···
}
fn is_before(&self, other: &Child) -> bool {
match (self, other) {
+
(Child::FullNsid(s), Child::ChildPrefix(o)) if s.as_str() == o => true,
+
(Child::ChildPrefix(s), Child::FullNsid(o)) if s == o.as_str() => false,
+
(Child::FullNsid(s), Child::FullNsid(o)) => s.as_str() < o.as_str(),
(Child::ChildPrefix(s), Child::ChildPrefix(o)) => s < o,
+
(Child::FullNsid(s), Child::ChildPrefix(o)) => s.to_string() < *o,
+
(Child::ChildPrefix(s), Child::FullNsid(o)) => *s < o.to_string(),
}
}
fn into_inner(self) -> String {
match self {
+
Child::FullNsid(s) => s.to_string(),
Child::ChildPrefix(s) => s,
}
}
···
}
}
items.push(match child {
+
Child::FullNsid(nsid) => PrefixChild::Collection(NsidCount::new(&nsid, &merged)),
+
Child::ChildPrefix(prefix) => {
+
PrefixChild::Prefix(PrefixCount::new(&prefix, &merged))
+
}
});
}
···
for kv in self.rollups.range((start, end)) {
let (key_bytes, val_bytes) = kv?;
let key = db_complete::<AllTimeRollupKey>(&key_bytes)?;
+
let nsid = key.collection();
for term in &terms {
if nsid.contains(term) {
let counts = db_complete::<CountsValue>(&val_bytes)?;
+
matches.push(NsidCount::new(nsid, &counts));
break;
}
}
···
vec![PrefixChild::Collection(NsidCount {
nsid: "a.a.a".to_string(),
creates: 1,
+
updates: 0,
+
deletes: 0,
dids_estimate: 1
}),]
);
···
vec![PrefixChild::Prefix(PrefixCount {
prefix: "a.a.a".to_string(),
creates: 1,
+
updates: 0,
+
deletes: 0,
+
dids_estimate: 1,
}),]
);
assert_eq!(cursor, None);
···
vec![PrefixChild::Prefix(PrefixCount {
prefix: "a.a.a".to_string(),
creates: 2,
+
updates: 0,
+
deletes: 0,
dids_estimate: 1
}),]
);
···
PrefixChild::Collection(NsidCount {
nsid: "a.a.a.a".to_string(),
creates: 1,
+
updates: 0,
+
deletes: 0,
dids_estimate: 1
}),
PrefixChild::Prefix(PrefixCount {
prefix: "a.a.a.a".to_string(),
creates: 1,
+
updates: 0,
+
deletes: 0,
dids_estimate: 1
}),
]