Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Merge pull request #18 from at-microcosm/ufos-salty-hash

UFOs HLL DID-hashing: secret prefix

+25 -13
Cargo.lock
···
[[package]]
name = "cardinality-estimator-safe"
-
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "50c14632b90cb42ff2174d2a544ca553c1bccfab54b848ae9ab9e004b90243bf"
dependencies = [
"enum_dispatch",
"serde",
-
"wyhash",
]
[[package]]
···
[[package]]
name = "getrandom"
-
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "73fea8450eea4bac3940448fb7ae50d91f034f941199fcd9d909a5a07aa455f0"
dependencies = [
"cfg-if",
"libc",
···
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38f262f097c174adebe41eb73d66ae9c06b2844fb0da69969647bbddd9b0538a"
dependencies = [
-
"getrandom 0.3.2",
"libc",
]
···
[[package]]
name = "rand"
-
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94"
dependencies = [
"rand_chacha 0.9.0",
"rand_core 0.9.3",
-
"zerocopy 0.8.24",
]
[[package]]
···
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38"
dependencies = [
-
"getrandom 0.3.2",
]
[[package]]
···
]
[[package]]
name = "sharded-slab"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "7437ac7763b9b123ccf33c338a5cc1bac6f69b45a136c19bdd8a65e3916435bf"
dependencies = [
"fastrand",
-
"getrandom 0.3.2",
"once_cell",
"rustix 1.0.5",
"windows-sys 0.59.0",
···
"httparse",
"log",
"native-tls",
-
"rand 0.9.0",
"sha1",
"thiserror 2.0.12",
"url",
···
"dropshot",
"env_logger",
"fjall",
"http",
"jetstream",
"log",
···
"semver",
"serde",
"serde_json",
"tempfile",
"thiserror 2.0.12",
"tikv-jemallocator",
···
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9"
dependencies = [
-
"getrandom 0.3.2",
"serde",
]
···
[[package]]
name = "cardinality-estimator-safe"
+
version = "4.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "b41ec0cd313b46ba3b508377544b25aa1d56d05ce9e657e77dfb001d5e726e53"
dependencies = [
+
"digest",
"enum_dispatch",
"serde",
]
[[package]]
···
[[package]]
name = "getrandom"
+
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4"
dependencies = [
"cfg-if",
"libc",
···
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38f262f097c174adebe41eb73d66ae9c06b2844fb0da69969647bbddd9b0538a"
dependencies = [
+
"getrandom 0.3.3",
"libc",
]
···
[[package]]
name = "rand"
+
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97"
dependencies = [
"rand_chacha 0.9.0",
"rand_core 0.9.3",
]
[[package]]
···
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38"
dependencies = [
+
"getrandom 0.3.3",
]
[[package]]
···
]
[[package]]
+
name = "sha2"
+
version = "0.10.9"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283"
+
dependencies = [
+
"cfg-if",
+
"cpufeatures",
+
"digest",
+
]
+
+
[[package]]
name = "sharded-slab"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "7437ac7763b9b123ccf33c338a5cc1bac6f69b45a136c19bdd8a65e3916435bf"
dependencies = [
"fastrand",
+
"getrandom 0.3.3",
"once_cell",
"rustix 1.0.5",
"windows-sys 0.59.0",
···
"httparse",
"log",
"native-tls",
+
"rand 0.9.1",
"sha1",
"thiserror 2.0.12",
"url",
···
"dropshot",
"env_logger",
"fjall",
+
"getrandom 0.3.3",
"http",
"jetstream",
"log",
···
"semver",
"serde",
"serde_json",
+
"sha2",
"tempfile",
"thiserror 2.0.12",
"tikv-jemallocator",
···
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9"
dependencies = [
+
"getrandom 0.3.3",
"serde",
]
+3 -1
ufos/Cargo.toml
···
anyhow = "1.0.97"
async-trait = "0.1.88"
bincode = { version = "2.0.1", features = ["serde"] }
-
cardinality-estimator-safe = { version = "2.1.1", features = ["with_serde"] }
clap = { version = "4.5.31", features = ["derive"] }
dropshot = "0.16.0"
env_logger = "0.11.7"
fjall = { version = "2.8.0", features = ["lz4"] }
http = "1.3.1"
jetstream = { path = "../jetstream" }
log = "0.4.26"
···
semver = "1.0.26"
serde = "1.0.219"
serde_json = "1.0.140"
thiserror = "2.0.12"
tokio = { version = "1.44.2", features = ["full", "sync", "time"] }
···
anyhow = "1.0.97"
async-trait = "0.1.88"
bincode = { version = "2.0.1", features = ["serde"] }
+
cardinality-estimator-safe = { version = "4.0.1", features = ["with_serde", "with_digest"] }
clap = { version = "4.5.31", features = ["derive"] }
dropshot = "0.16.0"
env_logger = "0.11.7"
fjall = { version = "2.8.0", features = ["lz4"] }
+
getrandom = "0.3.3"
http = "1.3.1"
jetstream = { path = "../jetstream" }
log = "0.4.26"
···
semver = "1.0.26"
serde = "1.0.219"
serde_json = "1.0.140"
+
sha2 = "0.10.9"
thiserror = "2.0.12"
tokio = { version = "1.44.2", features = ["full", "sync", "time"] }
+12 -2
ufos/src/consumer.rs
···
use jetstream::{
events::{Cursor, EventKind, JetstreamEvent},
exports::{Did, Nsid},
···
jetstream_receiver: JetstreamReceiver,
batch_sender: Sender<LimitedBatch>,
current_batch: CurrentBatch,
}
pub async fn consume(
jetstream_endpoint: &str,
cursor: Option<Cursor>,
no_compress: bool,
) -> anyhow::Result<Receiver<LimitedBatch>> {
let endpoint = DefaultJetstreamEndpoints::endpoint_or_shortcut(jetstream_endpoint);
if endpoint == jetstream_endpoint {
···
.connect_cursor(cursor)
.await?;
let (batch_sender, batch_reciever) = channel::<LimitedBatch>(BATCH_QUEUE_SIZE);
-
let mut batcher = Batcher::new(jetstream_receiver, batch_sender);
tokio::task::spawn(async move { batcher.run().await });
Ok(batch_reciever)
}
impl Batcher {
-
pub fn new(jetstream_receiver: JetstreamReceiver, batch_sender: Sender<LimitedBatch>) -> Self {
Self {
jetstream_receiver,
batch_sender,
current_batch: Default::default(),
}
}
···
&collection,
commit,
MAX_BATCHED_COLLECTIONS,
);
if let Err(BatchInsertError::BatchFull(commit)) = optimistic_res {
···
&collection,
commit,
MAX_BATCHED_COLLECTIONS,
)?;
} else {
optimistic_res?;
···
+
use crate::store_types::SketchSecretPrefix;
use jetstream::{
events::{Cursor, EventKind, JetstreamEvent},
exports::{Did, Nsid},
···
jetstream_receiver: JetstreamReceiver,
batch_sender: Sender<LimitedBatch>,
current_batch: CurrentBatch,
+
sketch_secret: SketchSecretPrefix,
}
pub async fn consume(
jetstream_endpoint: &str,
cursor: Option<Cursor>,
no_compress: bool,
+
sketch_secret: SketchSecretPrefix,
) -> anyhow::Result<Receiver<LimitedBatch>> {
let endpoint = DefaultJetstreamEndpoints::endpoint_or_shortcut(jetstream_endpoint);
if endpoint == jetstream_endpoint {
···
.connect_cursor(cursor)
.await?;
let (batch_sender, batch_reciever) = channel::<LimitedBatch>(BATCH_QUEUE_SIZE);
+
let mut batcher = Batcher::new(jetstream_receiver, batch_sender, sketch_secret);
tokio::task::spawn(async move { batcher.run().await });
Ok(batch_reciever)
}
impl Batcher {
+
pub fn new(
+
jetstream_receiver: JetstreamReceiver,
+
batch_sender: Sender<LimitedBatch>,
+
sketch_secret: SketchSecretPrefix,
+
) -> Self {
Self {
jetstream_receiver,
batch_sender,
current_batch: Default::default(),
+
sketch_secret,
}
}
···
&collection,
commit,
MAX_BATCHED_COLLECTIONS,
+
&self.sketch_secret,
);
if let Err(BatchInsertError::BatchFull(commit)) = optimistic_res {
···
&collection,
commit,
MAX_BATCHED_COLLECTIONS,
+
&self.sketch_secret,
)?;
} else {
optimistic_res?;
+2
ufos/src/db_types.rs
···
//////
impl DbBytes for Vec<u8> {
fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> {
Ok(self.to_vec())
···
//////
+
impl<const N: usize> UseBincodePlz for [u8; N] {}
+
impl DbBytes for Vec<u8> {
fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> {
Ok(self.to_vec())
+6 -2
ufos/src/file_consumer.rs
···
use crate::consumer::{Batcher, LimitedBatch, BATCH_QUEUE_SIZE};
use anyhow::Result;
use jetstream::{error::JetstreamEventError, events::JetstreamEvent};
use std::path::PathBuf;
···
Ok(())
}
-
pub async fn consume(p: PathBuf) -> Result<Receiver<LimitedBatch>> {
let f = File::open(p).await?;
let (jsonl_sender, jsonl_receiver) = channel::<JetstreamEvent>(16);
let (batch_sender, batch_reciever) = channel::<LimitedBatch>(BATCH_QUEUE_SIZE);
-
let mut batcher = Batcher::new(jsonl_receiver, batch_sender);
tokio::task::spawn(async move { read_jsonl(f, jsonl_sender).await });
tokio::task::spawn(async move { batcher.run().await });
Ok(batch_reciever)
···
use crate::consumer::{Batcher, LimitedBatch, BATCH_QUEUE_SIZE};
+
use crate::store_types::SketchSecretPrefix;
use anyhow::Result;
use jetstream::{error::JetstreamEventError, events::JetstreamEvent};
use std::path::PathBuf;
···
Ok(())
}
+
pub async fn consume(
+
p: PathBuf,
+
sketch_secret: SketchSecretPrefix,
+
) -> Result<Receiver<LimitedBatch>> {
let f = File::open(p).await?;
let (jsonl_sender, jsonl_receiver) = channel::<JetstreamEvent>(16);
let (batch_sender, batch_reciever) = channel::<LimitedBatch>(BATCH_QUEUE_SIZE);
+
let mut batcher = Batcher::new(jsonl_receiver, batch_sender, sketch_secret);
tokio::task::spawn(async move { read_jsonl(f, jsonl_sender).await });
tokio::task::spawn(async move { batcher.run().await });
Ok(batch_reciever)
+135 -94
ufos/src/lib.rs
···
pub mod store_types;
use crate::error::BatchInsertError;
-
use cardinality_estimator_safe::CardinalityEstimator;
use error::FirehoseEventError;
use jetstream::events::{CommitEvent, CommitOp, Cursor};
use jetstream::exports::{Did, Nsid, RecordKey};
use schemars::JsonSchema;
use serde::Serialize;
use serde_json::value::RawValue;
use std::collections::HashMap;
#[derive(Debug, Default, Clone)]
pub struct CollectionCommits<const LIMIT: usize> {
pub total_seen: usize,
-
pub dids_estimate: CardinalityEstimator<Did>,
pub commits: Vec<UFOsCommit>,
head: usize,
non_creates: usize,
···
self.head = 0;
}
}
-
pub fn truncating_insert(&mut self, commit: UFOsCommit) -> Result<(), BatchInsertError> {
if self.non_creates == LIMIT {
return Err(BatchInsertError::BatchFull(commit));
}
···
if is_create {
self.total_seen += 1;
-
self.dids_estimate.insert(&did);
} else {
self.non_creates += 1;
}
···
collection: &Nsid,
commit: UFOsCommit,
max_collections: usize,
) -> Result<(), BatchInsertError> {
let map = &mut self.commits_by_nsid;
if !map.contains_key(collection) && map.len() >= max_collections {
···
}
map.entry(collection.clone())
.or_default()
-
.truncating_insert(commit)?;
Ok(())
}
pub fn total_records(&self) -> usize {
···
self.account_removes.len()
}
pub fn estimate_dids(&self) -> usize {
-
let mut estimator = CardinalityEstimator::<Did>::new();
for commits in self.commits_by_nsid.values() {
estimator.merge(&commits.dids_estimate);
}
···
fn test_truncating_insert_truncates() -> anyhow::Result<()> {
let mut commits: CollectionCommits<2> = Default::default();
-
commits.truncating_insert(UFOsCommit {
-
cursor: Cursor::from_raw_u64(100),
-
did: Did::new("did:plc:whatever".to_string()).unwrap(),
-
rkey: RecordKey::new("rkey-asdf-a".to_string()).unwrap(),
-
rev: "rev-asdf".to_string(),
-
action: CommitAction::Put(PutAction {
-
record: RawValue::from_string("{}".to_string())?,
-
is_update: false,
-
}),
-
})?;
-
commits.truncating_insert(UFOsCommit {
-
cursor: Cursor::from_raw_u64(101),
-
did: Did::new("did:plc:whatever".to_string()).unwrap(),
-
rkey: RecordKey::new("rkey-asdf-b".to_string()).unwrap(),
-
rev: "rev-asdg".to_string(),
-
action: CommitAction::Put(PutAction {
-
record: RawValue::from_string("{}".to_string())?,
-
is_update: false,
-
}),
-
})?;
-
commits.truncating_insert(UFOsCommit {
-
cursor: Cursor::from_raw_u64(102),
-
did: Did::new("did:plc:whatever".to_string()).unwrap(),
-
rkey: RecordKey::new("rkey-asdf-c".to_string()).unwrap(),
-
rev: "rev-asdh".to_string(),
-
action: CommitAction::Put(PutAction {
-
record: RawValue::from_string("{}".to_string())?,
-
is_update: false,
-
}),
-
})?;
assert_eq!(commits.total_seen, 3);
assert_eq!(commits.dids_estimate.estimate(), 1);
···
fn test_truncating_insert_does_not_truncate_deletes() -> anyhow::Result<()> {
let mut commits: CollectionCommits<2> = Default::default();
-
commits.truncating_insert(UFOsCommit {
-
cursor: Cursor::from_raw_u64(100),
-
did: Did::new("did:plc:whatever".to_string()).unwrap(),
-
rkey: RecordKey::new("rkey-asdf-a".to_string()).unwrap(),
-
rev: "rev-asdf".to_string(),
-
action: CommitAction::Cut,
-
})?;
-
commits.truncating_insert(UFOsCommit {
-
cursor: Cursor::from_raw_u64(101),
-
did: Did::new("did:plc:whatever".to_string()).unwrap(),
-
rkey: RecordKey::new("rkey-asdf-b".to_string()).unwrap(),
-
rev: "rev-asdg".to_string(),
-
action: CommitAction::Put(PutAction {
-
record: RawValue::from_string("{}".to_string())?,
-
is_update: false,
-
}),
-
})?;
-
commits.truncating_insert(UFOsCommit {
-
cursor: Cursor::from_raw_u64(102),
-
did: Did::new("did:plc:whatever".to_string()).unwrap(),
-
rkey: RecordKey::new("rkey-asdf-c".to_string()).unwrap(),
-
rev: "rev-asdh".to_string(),
-
action: CommitAction::Put(PutAction {
-
record: RawValue::from_string("{}".to_string())?,
-
is_update: false,
-
}),
-
})?;
assert_eq!(commits.total_seen, 2);
assert_eq!(commits.dids_estimate.estimate(), 1);
···
let mut commits: CollectionCommits<2> = Default::default();
commits
-
.truncating_insert(UFOsCommit {
-
cursor: Cursor::from_raw_u64(100),
-
did: Did::new("did:plc:whatever".to_string()).unwrap(),
-
rkey: RecordKey::new("rkey-asdf-a".to_string()).unwrap(),
-
rev: "rev-asdf".to_string(),
-
action: CommitAction::Cut,
-
})
.unwrap();
// this create will just be discarded
commits
-
.truncating_insert(UFOsCommit {
-
cursor: Cursor::from_raw_u64(80),
-
did: Did::new("did:plc:whatever".to_string()).unwrap(),
-
rkey: RecordKey::new("rkey-asdf-zzz".to_string()).unwrap(),
-
rev: "rev-asdzzz".to_string(),
-
action: CommitAction::Put(PutAction {
-
record: RawValue::from_string("{}".to_string())?,
-
is_update: false,
-
}),
-
})
.unwrap();
commits
-
.truncating_insert(UFOsCommit {
-
cursor: Cursor::from_raw_u64(101),
-
did: Did::new("did:plc:whatever".to_string()).unwrap(),
-
rkey: RecordKey::new("rkey-asdf-b".to_string()).unwrap(),
-
rev: "rev-asdg".to_string(),
-
action: CommitAction::Cut,
-
})
.unwrap();
-
let res = commits.truncating_insert(UFOsCommit {
-
cursor: Cursor::from_raw_u64(102),
-
did: Did::new("did:plc:whatever".to_string()).unwrap(),
-
rkey: RecordKey::new("rkey-asdf-c".to_string()).unwrap(),
-
rev: "rev-asdh".to_string(),
-
action: CommitAction::Cut,
-
});
assert!(res.is_err());
let overflowed = match res {
···
pub mod store_types;
use crate::error::BatchInsertError;
+
use crate::store_types::SketchSecretPrefix;
+
use cardinality_estimator_safe::{Element, Sketch};
use error::FirehoseEventError;
use jetstream::events::{CommitEvent, CommitOp, Cursor};
use jetstream::exports::{Did, Nsid, RecordKey};
use schemars::JsonSchema;
use serde::Serialize;
use serde_json::value::RawValue;
+
use sha2::Sha256;
use std::collections::HashMap;
+
+
fn did_element(sketch_secret: &SketchSecretPrefix, did: &Did) -> Element<14> {
+
Element::from_digest_with_prefix::<Sha256>(sketch_secret, did.as_bytes())
+
}
#[derive(Debug, Default, Clone)]
pub struct CollectionCommits<const LIMIT: usize> {
pub total_seen: usize,
+
pub dids_estimate: Sketch<14>,
pub commits: Vec<UFOsCommit>,
head: usize,
non_creates: usize,
···
self.head = 0;
}
}
+
pub fn truncating_insert(
+
&mut self,
+
commit: UFOsCommit,
+
sketch_secret: &SketchSecretPrefix,
+
) -> Result<(), BatchInsertError> {
if self.non_creates == LIMIT {
return Err(BatchInsertError::BatchFull(commit));
}
···
if is_create {
self.total_seen += 1;
+
self.dids_estimate.insert(did_element(sketch_secret, &did));
} else {
self.non_creates += 1;
}
···
collection: &Nsid,
commit: UFOsCommit,
max_collections: usize,
+
sketch_secret: &SketchSecretPrefix,
) -> Result<(), BatchInsertError> {
let map = &mut self.commits_by_nsid;
if !map.contains_key(collection) && map.len() >= max_collections {
···
}
map.entry(collection.clone())
.or_default()
+
.truncating_insert(commit, sketch_secret)?;
Ok(())
}
pub fn total_records(&self) -> usize {
···
self.account_removes.len()
}
pub fn estimate_dids(&self) -> usize {
+
let mut estimator = Sketch::<14>::default();
for commits in self.commits_by_nsid.values() {
estimator.merge(&commits.dids_estimate);
}
···
fn test_truncating_insert_truncates() -> anyhow::Result<()> {
let mut commits: CollectionCommits<2> = Default::default();
+
commits.truncating_insert(
+
UFOsCommit {
+
cursor: Cursor::from_raw_u64(100),
+
did: Did::new("did:plc:whatever".to_string()).unwrap(),
+
rkey: RecordKey::new("rkey-asdf-a".to_string()).unwrap(),
+
rev: "rev-asdf".to_string(),
+
action: CommitAction::Put(PutAction {
+
record: RawValue::from_string("{}".to_string())?,
+
is_update: false,
+
}),
+
},
+
&[0u8; 16],
+
)?;
+
commits.truncating_insert(
+
UFOsCommit {
+
cursor: Cursor::from_raw_u64(101),
+
did: Did::new("did:plc:whatever".to_string()).unwrap(),
+
rkey: RecordKey::new("rkey-asdf-b".to_string()).unwrap(),
+
rev: "rev-asdg".to_string(),
+
action: CommitAction::Put(PutAction {
+
record: RawValue::from_string("{}".to_string())?,
+
is_update: false,
+
}),
+
},
+
&[0u8; 16],
+
)?;
+
commits.truncating_insert(
+
UFOsCommit {
+
cursor: Cursor::from_raw_u64(102),
+
did: Did::new("did:plc:whatever".to_string()).unwrap(),
+
rkey: RecordKey::new("rkey-asdf-c".to_string()).unwrap(),
+
rev: "rev-asdh".to_string(),
+
action: CommitAction::Put(PutAction {
+
record: RawValue::from_string("{}".to_string())?,
+
is_update: false,
+
}),
+
},
+
&[0u8; 16],
+
)?;
assert_eq!(commits.total_seen, 3);
assert_eq!(commits.dids_estimate.estimate(), 1);
···
fn test_truncating_insert_does_not_truncate_deletes() -> anyhow::Result<()> {
let mut commits: CollectionCommits<2> = Default::default();
+
commits.truncating_insert(
+
UFOsCommit {
+
cursor: Cursor::from_raw_u64(100),
+
did: Did::new("did:plc:whatever".to_string()).unwrap(),
+
rkey: RecordKey::new("rkey-asdf-a".to_string()).unwrap(),
+
rev: "rev-asdf".to_string(),
+
action: CommitAction::Cut,
+
},
+
&[0u8; 16],
+
)?;
+
commits.truncating_insert(
+
UFOsCommit {
+
cursor: Cursor::from_raw_u64(101),
+
did: Did::new("did:plc:whatever".to_string()).unwrap(),
+
rkey: RecordKey::new("rkey-asdf-b".to_string()).unwrap(),
+
rev: "rev-asdg".to_string(),
+
action: CommitAction::Put(PutAction {
+
record: RawValue::from_string("{}".to_string())?,
+
is_update: false,
+
}),
+
},
+
&[0u8; 16],
+
)?;
+
commits.truncating_insert(
+
UFOsCommit {
+
cursor: Cursor::from_raw_u64(102),
+
did: Did::new("did:plc:whatever".to_string()).unwrap(),
+
rkey: RecordKey::new("rkey-asdf-c".to_string()).unwrap(),
+
rev: "rev-asdh".to_string(),
+
action: CommitAction::Put(PutAction {
+
record: RawValue::from_string("{}".to_string())?,
+
is_update: false,
+
}),
+
},
+
&[0u8; 16],
+
)?;
assert_eq!(commits.total_seen, 2);
assert_eq!(commits.dids_estimate.estimate(), 1);
···
let mut commits: CollectionCommits<2> = Default::default();
commits
+
.truncating_insert(
+
UFOsCommit {
+
cursor: Cursor::from_raw_u64(100),
+
did: Did::new("did:plc:whatever".to_string()).unwrap(),
+
rkey: RecordKey::new("rkey-asdf-a".to_string()).unwrap(),
+
rev: "rev-asdf".to_string(),
+
action: CommitAction::Cut,
+
},
+
&[0u8; 16],
+
)
.unwrap();
// this create will just be discarded
commits
+
.truncating_insert(
+
UFOsCommit {
+
cursor: Cursor::from_raw_u64(80),
+
did: Did::new("did:plc:whatever".to_string()).unwrap(),
+
rkey: RecordKey::new("rkey-asdf-zzz".to_string()).unwrap(),
+
rev: "rev-asdzzz".to_string(),
+
action: CommitAction::Put(PutAction {
+
record: RawValue::from_string("{}".to_string())?,
+
is_update: false,
+
}),
+
},
+
&[0u8; 16],
+
)
.unwrap();
commits
+
.truncating_insert(
+
UFOsCommit {
+
cursor: Cursor::from_raw_u64(101),
+
did: Did::new("did:plc:whatever".to_string()).unwrap(),
+
rkey: RecordKey::new("rkey-asdf-b".to_string()).unwrap(),
+
rev: "rev-asdg".to_string(),
+
action: CommitAction::Cut,
+
},
+
&[0u8; 16],
+
)
.unwrap();
+
let res = commits.truncating_insert(
+
UFOsCommit {
+
cursor: Cursor::from_raw_u64(102),
+
did: Did::new("did:plc:whatever".to_string()).unwrap(),
+
rkey: RecordKey::new("rkey-asdf-c".to_string()).unwrap(),
+
rev: "rev-asdh".to_string(),
+
action: CommitAction::Cut,
+
},
+
&[0u8; 16],
+
);
assert!(res.is_err());
let overflowed = match res {
+8 -4
ufos/src/main.rs
···
use ufos::storage::{StorageWhatever, StoreBackground, StoreReader, StoreWriter};
use ufos::storage_fjall::FjallStorage;
use ufos::storage_mem::MemStorage;
#[cfg(not(target_env = "msvc"))]
use tikv_jemallocator::Jemalloc;
···
let args = Args::parse();
let jetstream = args.jetstream.clone();
if args.in_mem {
-
let (read_store, write_store, cursor) = MemStorage::init(
args.data,
jetstream,
args.jetstream_force,
···
read_store,
write_store,
cursor,
)
.await?;
} else {
-
let (read_store, write_store, cursor) = FjallStorage::init(
args.data,
jetstream,
args.jetstream_force,
···
read_store,
write_store,
cursor,
)
.await?;
}
···
read_store: impl StoreReader + 'static,
mut write_store: impl StoreWriter<B> + 'static,
cursor: Option<Cursor>,
) -> anyhow::Result<()> {
println!("starting server with storage...");
let serving = server::serve(read_store);
···
let batches = if jetstream_fixture {
log::info!("starting with jestream file fixture: {jetstream:?}");
-
file_consumer::consume(jetstream.into()).await?
} else {
log::info!(
"starting consumer with cursor: {cursor:?} from {:?} ago",
cursor.map(|c| c.elapsed())
);
-
consumer::consume(&jetstream, cursor, false).await?
};
let rolling = write_store.background_tasks()?.run();
···
use ufos::storage::{StorageWhatever, StoreBackground, StoreReader, StoreWriter};
use ufos::storage_fjall::FjallStorage;
use ufos::storage_mem::MemStorage;
+
use ufos::store_types::SketchSecretPrefix;
#[cfg(not(target_env = "msvc"))]
use tikv_jemallocator::Jemalloc;
···
let args = Args::parse();
let jetstream = args.jetstream.clone();
if args.in_mem {
+
let (read_store, write_store, cursor, sketch_secret) = MemStorage::init(
args.data,
jetstream,
args.jetstream_force,
···
read_store,
write_store,
cursor,
+
sketch_secret,
)
.await?;
} else {
+
let (read_store, write_store, cursor, sketch_secret) = FjallStorage::init(
args.data,
jetstream,
args.jetstream_force,
···
read_store,
write_store,
cursor,
+
sketch_secret,
)
.await?;
}
···
read_store: impl StoreReader + 'static,
mut write_store: impl StoreWriter<B> + 'static,
cursor: Option<Cursor>,
+
sketch_secret: SketchSecretPrefix,
) -> anyhow::Result<()> {
println!("starting server with storage...");
let serving = server::serve(read_store);
···
let batches = if jetstream_fixture {
log::info!("starting with jestream file fixture: {jetstream:?}");
+
file_consumer::consume(jetstream.into(), sketch_secret).await?
} else {
log::info!(
"starting consumer with cursor: {cursor:?} from {:?} ago",
cursor.map(|c| c.elapsed())
);
+
consumer::consume(&jetstream, cursor, false, sketch_secret).await?
};
let rolling = write_store.background_tasks()?.run();
+2 -1
ufos/src/storage.rs
···
use crate::{
error::StorageError, ConsumerInfo, Count, Cursor, EventBatch, QueryPeriod, TopCollections,
UFOsRecord,
···
endpoint: String,
force_endpoint: bool,
config: C,
-
) -> StorageResult<(R, W, Option<Cursor>)>
where
Self: Sized;
}
···
+
use crate::store_types::SketchSecretPrefix;
use crate::{
error::StorageError, ConsumerInfo, Count, Cursor, EventBatch, QueryPeriod, TopCollections,
UFOsRecord,
···
endpoint: String,
force_endpoint: bool,
config: C,
+
) -> StorageResult<(R, W, Option<Cursor>, SketchSecretPrefix)>
where
Self: Sized;
}
+59 -15
ufos/src/storage_fjall.rs
···
DeleteAccountQueueVal, HourTruncatedCursor, HourlyDidsKey, HourlyRecordsKey, HourlyRollupKey,
JetstreamCursorKey, JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue,
LiveCountsKey, NewRollupCursorKey, NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal,
-
RecordLocationKey, RecordLocationMeta, RecordLocationVal, RecordRawValue, TakeoffKey,
-
TakeoffValue, TrimCollectionCursorKey, WeekTruncatedCursor, WeeklyDidsKey, WeeklyRecordsKey,
-
WeeklyRollupKey,
};
use crate::{
CommitAction, ConsumerInfo, Count, Did, EventBatch, Nsid, QueryPeriod, TopCollections,
···
/// - Launch date
/// - key: "takeoff" (literal)
/// - val: u64 (micros timestamp, not from jetstream for now so not precise)
///
/// - Rollup cursor (bg work: roll stats into hourlies, delete accounts, old record deletes)
/// - key: "rollup_cursor" (literal)
···
endpoint: String,
force_endpoint: bool,
_config: FjallConfig,
-
) -> StorageResult<(FjallReader, FjallWriter, Option<Cursor>)> {
let keyspace = {
let config = Config::new(path);
···
let js_cursor = get_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?;
-
if js_cursor.is_some() {
let stored_endpoint =
get_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)?;
-
let JetstreamEndpointValue(stored) = stored_endpoint.ok_or(StorageError::InitError(
"found cursor but missing js_endpoint, refusing to start.".to_string(),
))?;
if stored != endpoint {
if force_endpoint {
log::warn!("forcing a jetstream switch from {stored:?} to {endpoint:?}");
···
"stored js_endpoint {stored:?} differs from provided {endpoint:?}, refusing to start.")));
}
}
} else {
-
insert_static_neu::<JetstreamEndpointKey>(
&global,
JetstreamEndpointValue(endpoint.to_string()),
)?;
-
insert_static_neu::<TakeoffKey>(&global, Cursor::at(SystemTime::now()))?;
-
insert_static_neu::<NewRollupCursorKey>(&global, Cursor::from_start())?;
-
}
let reader = FjallReader {
keyspace: keyspace.clone(),
···
rollups,
queues,
};
-
Ok((reader, writer, js_cursor))
}
}
···
Ok(())
}
/// Set a value to a fixed key
fn insert_batch_static_neu<K: StaticStr>(
batch: &mut FjallBatch,
···
use serde_json::value::RawValue;
fn fjall_db() -> (FjallReader, FjallWriter) {
-
let (read, write, _) = FjallStorage::init(
tempfile::tempdir().unwrap(),
"offline test (no real jetstream endpoint)".to_string(),
false,
···
.commits_by_nsid
.entry(collection.clone())
.or_default()
-
.truncating_insert(commit)
.unwrap();
collection
···
.commits_by_nsid
.entry(collection.clone())
.or_default()
-
.truncating_insert(commit)
.unwrap();
collection
···
.commits_by_nsid
.entry(collection.clone())
.or_default()
-
.truncating_insert(commit)
.unwrap();
collection
···
DeleteAccountQueueVal, HourTruncatedCursor, HourlyDidsKey, HourlyRecordsKey, HourlyRollupKey,
JetstreamCursorKey, JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue,
LiveCountsKey, NewRollupCursorKey, NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal,
+
RecordLocationKey, RecordLocationMeta, RecordLocationVal, RecordRawValue, SketchSecretKey,
+
SketchSecretPrefix, TakeoffKey, TakeoffValue, TrimCollectionCursorKey, WeekTruncatedCursor,
+
WeeklyDidsKey, WeeklyRecordsKey, WeeklyRollupKey,
};
use crate::{
CommitAction, ConsumerInfo, Count, Did, EventBatch, Nsid, QueryPeriod, TopCollections,
···
/// - Launch date
/// - key: "takeoff" (literal)
/// - val: u64 (micros timestamp, not from jetstream for now so not precise)
+
///
+
/// - Cardinality estimator secret
+
/// - key: "sketch_secret" (literal)
+
/// - val: [u8; 16]
///
/// - Rollup cursor (bg work: roll stats into hourlies, delete accounts, old record deletes)
/// - key: "rollup_cursor" (literal)
···
endpoint: String,
force_endpoint: bool,
_config: FjallConfig,
+
) -> StorageResult<(FjallReader, FjallWriter, Option<Cursor>, SketchSecretPrefix)> {
let keyspace = {
let config = Config::new(path);
···
let js_cursor = get_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?;
+
let sketch_secret = if js_cursor.is_some() {
let stored_endpoint =
get_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)?;
let JetstreamEndpointValue(stored) = stored_endpoint.ok_or(StorageError::InitError(
"found cursor but missing js_endpoint, refusing to start.".to_string(),
))?;
+
let Some(stored_secret) =
+
get_static_neu::<SketchSecretKey, SketchSecretPrefix>(&global)?
+
else {
+
return Err(StorageError::InitError(
+
"found cursor but missing sketch_secret, refusing to start.".to_string(),
+
));
+
};
+
if stored != endpoint {
if force_endpoint {
log::warn!("forcing a jetstream switch from {stored:?} to {endpoint:?}");
···
"stored js_endpoint {stored:?} differs from provided {endpoint:?}, refusing to start.")));
}
}
+
stored_secret
} else {
+
log::info!("initializing a fresh db!");
+
init_static_neu::<JetstreamEndpointKey>(
&global,
JetstreamEndpointValue(endpoint.to_string()),
)?;
+
+
log::info!("generating new secret for cardinality sketches...");
+
let mut sketch_secret: SketchSecretPrefix = [0u8; 16];
+
getrandom::fill(&mut sketch_secret).map_err(|e| {
+
StorageError::InitError(format!(
+
"failed to get a random secret for cardinality sketches: {e:?}"
+
))
+
})?;
+
init_static_neu::<SketchSecretKey>(&global, sketch_secret)?;
+
+
init_static_neu::<TakeoffKey>(&global, Cursor::at(SystemTime::now()))?;
+
init_static_neu::<NewRollupCursorKey>(&global, Cursor::from_start())?;
+
+
sketch_secret
+
};
let reader = FjallReader {
keyspace: keyspace.clone(),
···
rollups,
queues,
};
+
Ok((reader, writer, js_cursor, sketch_secret))
}
}
···
Ok(())
}
+
/// Set a value to a fixed key, erroring if the value already exists
+
///
+
/// Intended for single-threaded init: not safe under concurrency, since there
+
/// is no transaction between checking if the already exists and writing it.
+
fn init_static_neu<K: StaticStr>(
+
global: &PartitionHandle,
+
value: impl DbBytes,
+
) -> StorageResult<()> {
+
let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
+
if global.get(&key_bytes)?.is_some() {
+
return Err(StorageError::InitError(format!(
+
"init failed: value for key {key_bytes:?} already exists"
+
)));
+
}
+
let value_bytes = value.to_db_bytes()?;
+
global.insert(&key_bytes, &value_bytes)?;
+
Ok(())
+
}
+
/// Set a value to a fixed key
fn insert_batch_static_neu<K: StaticStr>(
batch: &mut FjallBatch,
···
use serde_json::value::RawValue;
fn fjall_db() -> (FjallReader, FjallWriter) {
+
let (read, write, _, _) = FjallStorage::init(
tempfile::tempdir().unwrap(),
"offline test (no real jetstream endpoint)".to_string(),
false,
···
.commits_by_nsid
.entry(collection.clone())
.or_default()
+
.truncating_insert(commit, &[0u8; 16])
.unwrap();
collection
···
.commits_by_nsid
.entry(collection.clone())
.or_default()
+
.truncating_insert(commit, &[0u8; 16])
.unwrap();
collection
···
.commits_by_nsid
.entry(collection.clone())
.or_default()
+
.truncating_insert(commit, &[0u8; 16])
.unwrap();
collection
+9 -8
ufos/src/storage_mem.rs
···
HourTruncatedCursor, HourlyRollupKey, JetstreamCursorKey, JetstreamCursorValue,
JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey, NewRollupCursorKey,
NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal, RecordLocationKey,
-
RecordLocationMeta, RecordLocationVal, RecordRawValue, TakeoffKey, TakeoffValue,
-
WeekTruncatedCursor, WeeklyRollupKey,
};
use crate::{
CommitAction, ConsumerInfo, Count, Did, EventBatch, Nsid, QueryPeriod, TopCollections,
···
endpoint: String,
force_endpoint: bool,
_config: MemConfig,
-
) -> StorageResult<(MemReader, MemWriter, Option<Cursor>)> {
let keyspace = MemKeyspace::open();
let global = keyspace.open_partition("global")?;
···
rollups,
queues,
};
-
Ok((reader, writer, js_cursor))
}
}
···
use serde_json::value::RawValue;
fn fjall_db() -> (MemReader, MemWriter) {
-
let (read, write, _) = MemStorage::init(
tempfile::tempdir().unwrap(),
"offline test (no real jetstream endpoint)".to_string(),
false,
···
.commits_by_nsid
.entry(collection.clone())
.or_default()
-
.truncating_insert(commit)
.unwrap();
collection
···
.commits_by_nsid
.entry(collection.clone())
.or_default()
-
.truncating_insert(commit)
.unwrap();
collection
···
.commits_by_nsid
.entry(collection.clone())
.or_default()
-
.truncating_insert(commit)
.unwrap();
collection
···
HourTruncatedCursor, HourlyRollupKey, JetstreamCursorKey, JetstreamCursorValue,
JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey, NewRollupCursorKey,
NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal, RecordLocationKey,
+
RecordLocationMeta, RecordLocationVal, RecordRawValue, SketchSecretPrefix, TakeoffKey,
+
TakeoffValue, WeekTruncatedCursor, WeeklyRollupKey,
};
use crate::{
CommitAction, ConsumerInfo, Count, Did, EventBatch, Nsid, QueryPeriod, TopCollections,
···
endpoint: String,
force_endpoint: bool,
_config: MemConfig,
+
) -> StorageResult<(MemReader, MemWriter, Option<Cursor>, SketchSecretPrefix)> {
let keyspace = MemKeyspace::open();
let global = keyspace.open_partition("global")?;
···
rollups,
queues,
};
+
let secret_prefix = [0u8; 16]; // in-mem store is always deterministic: no secret
+
Ok((reader, writer, js_cursor, secret_prefix))
}
}
···
use serde_json::value::RawValue;
fn fjall_db() -> (MemReader, MemWriter) {
+
let (read, write, _, _) = MemStorage::init(
tempfile::tempdir().unwrap(),
"offline test (no real jetstream endpoint)".to_string(),
false,
···
.commits_by_nsid
.entry(collection.clone())
.or_default()
+
.truncating_insert(commit, &[0u8; 16])
.unwrap();
collection
···
.commits_by_nsid
.entry(collection.clone())
.or_default()
+
.truncating_insert(commit, &[0u8; 16])
.unwrap();
collection
···
.commits_by_nsid
.entry(collection.clone())
.or_default()
+
.truncating_insert(commit, &[0u8; 16])
.unwrap();
collection
+23 -10
ufos/src/store_types.rs
···
};
use crate::{Cursor, Did, Nsid, PutAction, RecordKey, UFOsCommit};
use bincode::{Decode, Encode};
-
use cardinality_estimator_safe::CardinalityEstimator;
use std::ops::Range;
macro_rules! static_str {
···
// key format: ["js_cursor"]
static_str!("js_cursor", JetstreamCursorKey);
pub type JetstreamCursorValue = Cursor;
// key format: ["rollup_cursor"]
static_str!("rollup_cursor", NewRollupCursorKey);
···
impl UseBincodePlz for TotalRecordsValue {}
#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
-
pub struct EstimatedDidsValue(pub CardinalityEstimator<Did>);
impl SerdeBytes for EstimatedDidsValue {}
impl DbBytes for EstimatedDidsValue {
#[cfg(test)]
···
pub type CountsValue = DbConcat<TotalRecordsValue, EstimatedDidsValue>;
impl CountsValue {
-
pub fn new(total: u64, dids: CardinalityEstimator<Did>) -> Self {
Self {
prefix: TotalRecordsValue(total),
suffix: EstimatedDidsValue(dids),
···
pub fn records(&self) -> u64 {
self.prefix.0
}
-
pub fn dids(&self) -> &CardinalityEstimator<Did> {
&self.suffix.0
}
pub fn merge(&mut self, other: &Self) {
···
fn default() -> Self {
Self {
prefix: TotalRecordsValue(0),
-
suffix: EstimatedDidsValue(CardinalityEstimator::new()),
}
}
}
···
#[cfg(test)]
mod test {
use super::{
-
CardinalityEstimator, CountsValue, Cursor, Did, EncodingError, HourTruncatedCursor,
-
HourlyRollupKey, Nsid, HOUR_IN_MICROS,
};
use crate::db_types::DbBytes;
#[test]
fn test_by_hourly_rollup_key() -> Result<(), EncodingError> {
···
#[test]
fn test_by_hourly_rollup_value() -> Result<(), EncodingError> {
-
let mut estimator = CardinalityEstimator::new();
for i in 0..10 {
-
estimator.insert(&Did::new(format!("did:plc:inze6wrmsm7pjl7yta3oig7{i}")).unwrap());
}
let original = CountsValue::new(123, estimator.clone());
let serialized = original.to_db_bytes()?;
···
assert_eq!(bytes_consumed, serialized.len());
for i in 10..1_000 {
-
estimator.insert(&Did::new(format!("did:plc:inze6wrmsm7pjl7yta3oig{i}")).unwrap());
}
let original = CountsValue::new(123, estimator);
let serialized = original.to_db_bytes()?;
···
};
use crate::{Cursor, Did, Nsid, PutAction, RecordKey, UFOsCommit};
use bincode::{Decode, Encode};
+
use cardinality_estimator_safe::Sketch;
use std::ops::Range;
macro_rules! static_str {
···
// key format: ["js_cursor"]
static_str!("js_cursor", JetstreamCursorKey);
pub type JetstreamCursorValue = Cursor;
+
+
// key format: ["sketch_secret"]
+
static_str!("sketch_secret", SketchSecretKey);
+
pub type SketchSecretPrefix = [u8; 16];
// key format: ["rollup_cursor"]
static_str!("rollup_cursor", NewRollupCursorKey);
···
impl UseBincodePlz for TotalRecordsValue {}
#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
+
pub struct EstimatedDidsValue(pub Sketch<14>);
impl SerdeBytes for EstimatedDidsValue {}
impl DbBytes for EstimatedDidsValue {
#[cfg(test)]
···
pub type CountsValue = DbConcat<TotalRecordsValue, EstimatedDidsValue>;
impl CountsValue {
+
pub fn new(total: u64, dids: Sketch<14>) -> Self {
Self {
prefix: TotalRecordsValue(total),
suffix: EstimatedDidsValue(dids),
···
pub fn records(&self) -> u64 {
self.prefix.0
}
+
pub fn dids(&self) -> &Sketch<14> {
&self.suffix.0
}
pub fn merge(&mut self, other: &Self) {
···
fn default() -> Self {
Self {
prefix: TotalRecordsValue(0),
+
suffix: EstimatedDidsValue(Sketch::<14>::default()),
}
}
}
···
#[cfg(test)]
mod test {
use super::{
+
CountsValue, Cursor, Did, EncodingError, HourTruncatedCursor, HourlyRollupKey, Nsid,
+
Sketch, HOUR_IN_MICROS,
};
use crate::db_types::DbBytes;
+
use cardinality_estimator_safe::Element;
+
use sha2::Sha256;
#[test]
fn test_by_hourly_rollup_key() -> Result<(), EncodingError> {
···
#[test]
fn test_by_hourly_rollup_value() -> Result<(), EncodingError> {
+
let mut estimator = Sketch::<14>::default();
+
fn to_element(d: Did) -> Element<14> {
+
Element::from_digest_oneshot::<Sha256>(d.to_string().as_bytes())
+
}
for i in 0..10 {
+
estimator.insert(to_element(
+
Did::new(format!("did:plc:inze6wrmsm7pjl7yta3oig7{i}")).unwrap(),
+
));
}
let original = CountsValue::new(123, estimator.clone());
let serialized = original.to_db_bytes()?;
···
assert_eq!(bytes_consumed, serialized.len());
for i in 10..1_000 {
+
estimator.insert(to_element(
+
Did::new(format!("did:plc:inze6wrmsm7pjl7yta3oig{i}")).unwrap(),
+
));
}
let original = CountsValue::new(123, estimator);
let serialized = original.to_db_bytes()?;