Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

count dids for all events, totals by create/up/del

+1 -1
ufos/fuzz/fuzz_targets/counts_value.rs
···
assert_eq!(serialized.len(), n);
let (and_back, n_again) = CountsValue::from_db_bytes(&serialized).unwrap();
assert_eq!(n_again, n);
-
assert_eq!(and_back.records(), counts_value.records());
+
assert_eq!(and_back.counts(), counts_value.counts());
assert_eq!(and_back.dids().estimate(), counts_value.dids().estimate());
}
});
+9
ufos/src/db_types.rs
···
}
}
+
impl<P: DbBytes + Default, S: DbBytes + Default> Default for DbConcat<P, S> {
+
fn default() -> Self {
+
Self {
+
prefix: Default::default(),
+
suffix: Default::default(),
+
}
+
}
+
}
+
impl<P: DbBytes + std::fmt::Debug, S: DbBytes + std::fmt::Debug> fmt::Debug for DbConcat<P, S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "DbConcat<{:?} || {:?}>", self.prefix, self.suffix)
+71 -25
ufos/src/lib.rs
···
#[derive(Debug, Default, Clone)]
pub struct CollectionCommits<const LIMIT: usize> {
-
pub total_seen: usize,
+
pub creates: usize,
+
pub updates: usize,
+
pub deletes: usize,
pub dids_estimate: Sketch<14>,
pub commits: Vec<UFOsCommit>,
head: usize,
-
non_creates: usize,
}
impl<const LIMIT: usize> CollectionCommits<LIMIT> {
···
self.head = 0;
}
}
+
/// lossy-ish commit insertion
+
///
+
/// - new commits are *always* added to the batch or else rejected as full.
+
/// - when LIMIT is reached, new commits can displace existing `creates`.
+
/// `update`s and `delete`s are *never* displaced.
+
/// - if all batched `creates` have been displaced, the batch is full.
+
///
+
/// in general it's rare for commits to be displaced except for very high-
+
/// volume collections such as `app.bsky.feed.like`.
+
///
+
/// it could be nice in the future to retain all batched commits and just
+
/// drop new `creates` after a limit instead.
pub fn truncating_insert(
&mut self,
commit: UFOsCommit,
sketch_secret: &SketchSecretPrefix,
) -> Result<(), BatchInsertError> {
-
if self.non_creates == LIMIT {
+
if (self.updates + self.deletes) == LIMIT {
+
// nothing can be displaced (only `create`s may be displaced)
return Err(BatchInsertError::BatchFull(commit));
}
-
let did = commit.did.clone();
-
let is_create = commit.action.is_create();
+
+
// every kind of commit counts as "user activity"
+
self.dids_estimate
+
.insert(did_element(sketch_secret, &commit.did));
+
+
match commit.action {
+
CommitAction::Put(PutAction {
+
is_update: false, ..
+
}) => {
+
self.creates += 1;
+
}
+
CommitAction::Put(PutAction {
+
is_update: true, ..
+
}) => {
+
self.updates += 1;
+
}
+
CommitAction::Cut => {
+
self.deletes += 1;
+
}
+
}
+
if self.commits.len() < LIMIT {
+
// normal insert: there's space left to put a new commit at the end
self.commits.push(commit);
-
if self.commits.capacity() > LIMIT {
-
self.commits.shrink_to(LIMIT); // save mem?????? maybe??
-
}
} else {
+
// displacement insert: find an old `create` we can displace
let head_started_at = self.head;
loop {
let candidate = self
···
}
}
-
if is_create {
-
self.total_seen += 1;
-
self.dids_estimate.insert(did_element(sketch_secret, &did));
-
} else {
-
self.non_creates += 1;
-
}
-
Ok(())
}
}
···
.truncating_insert(commit, sketch_secret)?;
Ok(())
}
-
pub fn total_records(&self) -> usize {
-
self.commits_by_nsid.values().map(|v| v.commits.len()).sum()
-
}
-
pub fn total_seen(&self) -> usize {
-
self.commits_by_nsid.values().map(|v| v.total_seen).sum()
-
}
pub fn total_collections(&self) -> usize {
self.commits_by_nsid.len()
}
···
#[derive(Debug, Serialize, JsonSchema)]
pub struct NsidCount {
nsid: String,
-
records: u64,
+
creates: u64,
dids_estimate: u64,
}
#[derive(Debug, Serialize, JsonSchema)]
pub struct JustCount {
-
records: u64,
+
creates: u64,
dids_estimate: u64,
}
···
&[0u8; 16],
)?;
-
assert_eq!(commits.total_seen, 3);
+
assert_eq!(commits.creates, 3);
assert_eq!(commits.dids_estimate.estimate(), 1);
assert_eq!(commits.commits.len(), 2);
···
}
#[test]
+
fn test_truncating_insert_counts_updates() -> anyhow::Result<()> {
+
let mut commits: CollectionCommits<2> = Default::default();
+
+
commits.truncating_insert(
+
UFOsCommit {
+
cursor: Cursor::from_raw_u64(100),
+
did: Did::new("did:plc:whatever".to_string()).unwrap(),
+
rkey: RecordKey::new("rkey-asdf-a".to_string()).unwrap(),
+
rev: "rev-asdf".to_string(),
+
action: CommitAction::Put(PutAction {
+
record: RawValue::from_string("{}".to_string())?,
+
is_update: true,
+
}),
+
},
+
&[0u8; 16],
+
)?;
+
+
assert_eq!(commits.creates, 0);
+
assert_eq!(commits.updates, 1);
+
assert_eq!(commits.deletes, 0);
+
assert_eq!(commits.dids_estimate.estimate(), 1);
+
assert_eq!(commits.commits.len(), 1);
+
Ok(())
+
}
+
+
#[test]
fn test_truncating_insert_does_not_truncate_deletes() -> anyhow::Result<()> {
let mut commits: CollectionCommits<2> = Default::default();
···
&[0u8; 16],
)?;
-
assert_eq!(commits.total_seen, 2);
+
assert_eq!(commits.creates, 2);
+
assert_eq!(commits.deletes, 1);
assert_eq!(commits.dids_estimate.estimate(), 1);
assert_eq!(commits.commits.len(), 2);
+5 -3
ufos/src/server.rs
···
consumer,
})
}
+
+
// TODO: replace with normal (🙃) multi-qs value somehow
fn to_multiple_nsids(s: &str) -> Result<HashSet<Nsid>, String> {
let mut out = HashSet::new();
for collection in s.split(',') {
···
}
#[derive(Debug, Serialize, JsonSchema)]
struct TotalCounts {
-
total_records: u64,
+
total_creates: u64,
dids_estimate: u64,
}
/// Get total records seen by collection
···
let mut seen_by_collection = HashMap::with_capacity(collections.len());
for collection in &collections {
-
let (total_records, dids_estimate) = storage
+
let (total_creates, dids_estimate) = storage
.get_counts_by_collection(collection)
.await
.map_err(|e| HttpError::for_internal_error(format!("boooo: {e:?}")))?;
···
seen_by_collection.insert(
collection.to_string(),
TotalCounts {
-
total_records,
+
total_creates,
dids_estimate,
},
);
+66 -62
ufos/src/storage_fjall.rs
···
use crate::error::StorageError;
use crate::storage::{StorageResult, StorageWhatever, StoreBackground, StoreReader, StoreWriter};
use crate::store_types::{
-
AllTimeDidsKey, AllTimeRecordsKey, AllTimeRollupKey, CountsValue, CursorBucket,
+
AllTimeDidsKey, AllTimeRecordsKey, AllTimeRollupKey, CommitCounts, CountsValue, CursorBucket,
DeleteAccountQueueKey, DeleteAccountQueueVal, HourTruncatedCursor, HourlyDidsKey,
HourlyRecordsKey, HourlyRollupKey, HourlyRollupStaticPrefix, JetstreamCursorKey,
JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey,
···
}
out.push(NsidCount {
nsid: nsid.to_string(),
-
records: merged.records(),
+
creates: merged.counts().creates,
dids_estimate: merged.dids().estimate() as u64,
});
}
···
}
let mut ranked: Vec<(Nsid, CountsValue)> = ranked.into_iter().collect();
match order {
-
OrderCollectionsBy::RecordsCreated => ranked.sort_by_key(|(_, c)| c.records()),
+
OrderCollectionsBy::RecordsCreated => ranked.sort_by_key(|(_, c)| c.counts().creates),
OrderCollectionsBy::DidsEstimate => ranked.sort_by_key(|(_, c)| c.dids().estimate()),
OrderCollectionsBy::Lexi { .. } => unreachable!(),
}
···
.take(limit)
.map(|(nsid, cv)| NsidCount {
nsid: nsid.to_string(),
-
records: cv.records(),
+
creates: cv.counts().creates,
dids_estimate: cv.dids().estimate() as u64,
})
.collect();
···
}
}
Ok((
-
total_counts.records(),
+
total_counts.counts().creates,
total_counts.dids().estimate() as u64,
))
}
···
.unwrap_or_default();
// now that we have values, we can know the exising ranks
-
let before_records_count = rolled.records();
+
let before_creates_count = rolled.counts().creates;
let before_dids_estimate = rolled.dids().estimate() as u64;
// update the rollup
rolled.merge(&counts);
-
// replace rank entries
-
let (old_records, new_records, dids) = match rollup {
-
Rollup::Hourly(hourly_cursor) => {
-
let old_records =
-
HourlyRecordsKey::new(hourly_cursor, before_records_count.into(), &nsid);
-
let new_records = old_records.with_rank(rolled.records().into());
-
let new_estimate = rolled.dids().estimate() as u64;
-
let dids = if new_estimate == before_dids_estimate {
-
None
-
} else {
-
let old_dids =
-
HourlyDidsKey::new(hourly_cursor, before_dids_estimate.into(), &nsid);
-
let new_dids = old_dids.with_rank(new_estimate.into());
-
Some((old_dids.to_db_bytes()?, new_dids.to_db_bytes()?))
-
};
-
(old_records.to_db_bytes()?, new_records.to_db_bytes()?, dids)
-
}
-
Rollup::Weekly(weekly_cursor) => {
-
let old_records =
-
WeeklyRecordsKey::new(weekly_cursor, before_records_count.into(), &nsid);
-
let new_records = old_records.with_rank(rolled.records().into());
-
let new_estimate = rolled.dids().estimate() as u64;
-
let dids = if new_estimate == before_dids_estimate {
-
None
-
} else {
-
let old_dids =
-
WeeklyDidsKey::new(weekly_cursor, before_dids_estimate.into(), &nsid);
-
let new_dids = old_dids.with_rank(new_estimate.into());
-
Some((old_dids.to_db_bytes()?, new_dids.to_db_bytes()?))
-
};
-
(old_records.to_db_bytes()?, new_records.to_db_bytes()?, dids)
-
}
-
Rollup::AllTime => {
-
let old_records = AllTimeRecordsKey::new(before_records_count.into(), &nsid);
-
let new_records = old_records.with_rank(rolled.records().into());
-
let new_estimate = rolled.dids().estimate() as u64;
-
let dids = if new_estimate == before_dids_estimate {
-
None
-
} else {
-
let old_dids = AllTimeDidsKey::new(before_dids_estimate.into(), &nsid);
-
let new_dids = old_dids.with_rank(new_estimate.into());
-
Some((old_dids.to_db_bytes()?, new_dids.to_db_bytes()?))
-
};
-
(old_records.to_db_bytes()?, new_records.to_db_bytes()?, dids)
-
}
-
};
+
// new ranks
+
let new_creates_count = rolled.counts().creates;
+
let new_dids_estimate = rolled.dids().estimate() as u64;
-
// replace the ranks
-
batch.remove(&self.rollups, &old_records);
-
batch.insert(&self.rollups, &new_records, "");
-
if let Some((old_dids, new_dids)) = dids {
-
batch.remove(&self.rollups, &old_dids);
-
batch.insert(&self.rollups, &new_dids, "");
+
// update create-ranked secondary index if rank changed
+
if new_creates_count != before_creates_count {
+
let (old_k, new_k) = match rollup {
+
Rollup::Hourly(cursor) => (
+
HourlyRecordsKey::new(cursor, before_creates_count.into(), &nsid)
+
.to_db_bytes()?,
+
HourlyRecordsKey::new(cursor, new_creates_count.into(), &nsid)
+
.to_db_bytes()?,
+
),
+
Rollup::Weekly(cursor) => (
+
WeeklyRecordsKey::new(cursor, before_creates_count.into(), &nsid)
+
.to_db_bytes()?,
+
WeeklyRecordsKey::new(cursor, new_creates_count.into(), &nsid)
+
.to_db_bytes()?,
+
),
+
Rollup::AllTime => (
+
AllTimeRecordsKey::new(before_creates_count.into(), &nsid).to_db_bytes()?,
+
AllTimeRecordsKey::new(new_creates_count.into(), &nsid).to_db_bytes()?,
+
),
+
};
+
batch.remove(&self.rollups, &old_k); // TODO: when fjall gets weak delete, this will hopefully work way better
+
batch.insert(&self.rollups, &new_k, "");
+
}
+
+
// update dids-ranked secondary index if rank changed
+
if new_dids_estimate != before_dids_estimate {
+
let (old_k, new_k) = match rollup {
+
Rollup::Hourly(cursor) => (
+
HourlyDidsKey::new(cursor, before_dids_estimate.into(), &nsid)
+
.to_db_bytes()?,
+
HourlyDidsKey::new(cursor, new_dids_estimate.into(), &nsid)
+
.to_db_bytes()?,
+
),
+
Rollup::Weekly(cursor) => (
+
WeeklyDidsKey::new(cursor, before_dids_estimate.into(), &nsid)
+
.to_db_bytes()?,
+
WeeklyDidsKey::new(cursor, new_dids_estimate.into(), &nsid)
+
.to_db_bytes()?,
+
),
+
Rollup::AllTime => (
+
AllTimeDidsKey::new(before_dids_estimate.into(), &nsid).to_db_bytes()?,
+
AllTimeDidsKey::new(new_dids_estimate.into(), &nsid).to_db_bytes()?,
+
),
+
};
+
batch.remove(&self.rollups, &old_k); // TODO: when fjall gets weak delete, this will hopefully work way better
+
batch.insert(&self.rollups, &new_k, "");
-
// replace the rollup
+
// replace the main counts rollup
batch.insert(&self.rollups, &rollup_key_bytes, &rolled.to_db_bytes()?);
···
let live_counts_key: LiveCountsKey = (latest, &nsid).into();
-
let counts_value = CountsValue::new(commits.total_seen as u64, commits.dids_estimate);
+
let counts_value = CountsValue::new(
+
CommitCounts {
+
creates: commits.creates as u64,
+
updates: commits.updates as u64,
+
deletes: commits.deletes as u64,
+
},
+
commits.dids_estimate,
+
);
batch.insert(
&self.rollups,
&live_counts_key.to_db_bytes()?,
···
);
write.insert_batch(batch.batch)?;
-
let (records, dids) = read.get_counts_by_collection(&collection)?;
-
assert_eq!(records, 1);
+
let (creates, dids) = read.get_counts_by_collection(&collection)?;
+
assert_eq!(creates, 1);
assert_eq!(dids, 1);
let records = read.get_records_by_collections([collection].into(), 2, false)?;
+12 -5
ufos/src/storage_mem.rs
···
use crate::error::StorageError;
use crate::storage::{StorageResult, StorageWhatever, StoreBackground, StoreReader, StoreWriter};
use crate::store_types::{
-
AllTimeRollupKey, CountsValue, DeleteAccountQueueKey, DeleteAccountQueueVal,
+
AllTimeRollupKey, CommitCounts, CountsValue, DeleteAccountQueueKey, DeleteAccountQueueVal,
HourTruncatedCursor, HourlyRollupKey, JetstreamCursorKey, JetstreamCursorValue,
JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey, NewRollupCursorKey,
NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal, RecordLocationKey,
···
}
}
Ok((
-
total_counts.records(),
+
total_counts.counts().creates,
total_counts.dids().estimate() as u64,
))
}
···
assert_eq!(n, tripppin.len());
assert_eq!(counts.prefix, and_back.prefix);
assert_eq!(counts.dids().estimate(), and_back.dids().estimate());
-
if counts.records() > 20000000 {
+
if counts.counts().creates > 20000000 {
panic!("COUNTS maybe wtf? {counts:?}")
}
// assert_eq!(rolled, and_back);
···
assert_eq!(n, tripppin.len());
assert_eq!(rolled.prefix, and_back.prefix);
assert_eq!(rolled.dids().estimate(), and_back.dids().estimate());
-
if rolled.records() > 20000000 {
+
if rolled.counts().creates > 20000000 {
panic!("maybe wtf? {rolled:?}")
}
// assert_eq!(rolled, and_back);
···
}
}
let live_counts_key: LiveCountsKey = (latest, &nsid).into();
-
let counts_value = CountsValue::new(commits.total_seen as u64, commits.dids_estimate);
+
let counts_value = CountsValue::new(
+
CommitCounts {
+
creates: commits.creates as u64,
+
updates: commits.updates as u64,
+
deletes: commits.deletes as u64,
+
},
+
commits.dids_estimate,
+
);
batch.insert(
&self.rollups,
&live_counts_key.to_db_bytes()?,
+39 -23
ufos/src/store_types.rs
···
)
}
}
-
#[derive(Debug, PartialEq, Decode, Encode)]
-
pub struct TotalRecordsValue(pub u64);
-
impl UseBincodePlz for TotalRecordsValue {}
-
#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
+
#[derive(Debug, Clone, Copy, Default, PartialEq, Decode, Encode)]
+
pub struct CommitCounts {
+
pub creates: u64,
+
pub updates: u64,
+
pub deletes: u64,
+
}
+
impl CommitCounts {
+
pub fn merge(&mut self, other: &Self) {
+
self.creates += other.creates;
+
self.updates += other.updates;
+
self.deletes += other.deletes;
+
}
+
}
+
impl UseBincodePlz for CommitCounts {}
+
+
#[derive(Debug, Default, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct EstimatedDidsValue(pub Sketch<14>);
impl SerdeBytes for EstimatedDidsValue {}
impl DbBytes for EstimatedDidsValue {
···
}
}
-
pub type CountsValue = DbConcat<TotalRecordsValue, EstimatedDidsValue>;
+
pub type CountsValue = DbConcat<CommitCounts, EstimatedDidsValue>;
impl CountsValue {
-
pub fn new(total: u64, dids: Sketch<14>) -> Self {
+
pub fn new(counts: CommitCounts, dids: Sketch<14>) -> Self {
Self {
-
prefix: TotalRecordsValue(total),
+
prefix: counts,
suffix: EstimatedDidsValue(dids),
}
}
-
pub fn records(&self) -> u64 {
-
self.prefix.0
+
pub fn counts(&self) -> CommitCounts {
+
self.prefix
}
pub fn dids(&self) -> &Sketch<14> {
&self.suffix.0
}
pub fn merge(&mut self, other: &Self) {
-
self.prefix.0 += other.records();
-
self.suffix.0.merge(other.dids());
-
}
-
}
-
impl Default for CountsValue {
-
fn default() -> Self {
-
Self {
-
prefix: TotalRecordsValue(0),
-
suffix: EstimatedDidsValue(Sketch::<14>::default()),
-
}
+
self.prefix.merge(&other.prefix);
+
self.suffix.0.merge(&other.suffix.0);
}
}
impl From<&CountsValue> for JustCount {
fn from(cv: &CountsValue) -> Self {
Self {
-
records: cv.records(),
+
creates: cv.counts().creates,
dids_estimate: cv.dids().estimate() as u64,
}
}
···
#[cfg(test)]
mod test {
use super::{
-
CountsValue, Cursor, CursorBucket, Did, EncodingError, HourTruncatedCursor,
+
CommitCounts, CountsValue, Cursor, CursorBucket, Did, EncodingError, HourTruncatedCursor,
HourlyRollupKey, Nsid, Sketch, HOUR_IN_MICROS, WEEK_IN_MICROS,
};
use crate::db_types::DbBytes;
···
Did::new(format!("did:plc:inze6wrmsm7pjl7yta3oig7{i}")).unwrap(),
));
}
-
let original = CountsValue::new(123, estimator.clone());
+
let original = CountsValue::new(
+
CommitCounts {
+
creates: 123,
+
..Default::default()
+
},
+
estimator.clone(),
+
);
let serialized = original.to_db_bytes()?;
let (restored, bytes_consumed) = CountsValue::from_db_bytes(&serialized)?;
assert_eq!(restored, original);
···
Did::new(format!("did:plc:inze6wrmsm7pjl7yta3oig{i}")).unwrap(),
));
}
-
let original = CountsValue::new(123, estimator);
+
let original = CountsValue::new(
+
CommitCounts {
+
creates: 123,
+
..Default::default()
+
},
+
estimator,
+
);
let serialized = original.to_db_bytes()?;
let (restored, bytes_consumed) = CountsValue::from_db_bytes(&serialized)?;
assert_eq!(restored, original);