···
-
use crate::db_types::{db_complete, DbBytes, DbStaticStr, StaticStr};
use crate::error::StorageError;
-
use crate::storage::{StorageResult, StorageWhatever, StoreReader, StoreWriter};
use crate::store_types::{
-
AllTimeRollupKey, CountsValue, DeleteAccountQueueKey, DeleteAccountQueueVal,
-
HourTruncatedCursor, HourlyRollupKey, JetstreamCursorKey, JetstreamCursorValue,
-
JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey, NewRollupCursorKey,
-
NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal, RecordLocationKey,
-
RecordLocationMeta, RecordLocationVal, RecordRawValue, TakeoffKey, TakeoffValue,
-
WeekTruncatedCursor, WeeklyRollupKey,
-
use crate::{CommitAction, ConsumerInfo, Did, EventBatch, Nsid, TopCollections, UFOsRecord};
use async_trait::async_trait;
-
use fjall::{Batch as FjallBatch, Config, Keyspace, PartitionCreateOptions, PartitionHandle};
use jetstream::events::Cursor;
-
use std::collections::HashMap;
-
use std::time::SystemTime;
-
const MAX_BATCHED_CLEANUP_SIZE: usize = 1024; // try to commit progress for longer feeds
const MAX_BATCHED_ACCOUNT_DELETE_RECORDS: usize = 1024;
const MAX_BATCHED_ROLLUP_COUNTS: usize = 256;
···
/// - key: "takeoff" (literal)
/// - val: u64 (micros timestamp, not from jetstream for now so not precise)
/// - Rollup cursor (bg work: roll stats into hourlies, delete accounts, old record deletes)
/// - key: "rollup_cursor" (literal)
/// - val: u64 (tracks behind js_cursor)
···
/// - key: "live_counts" || u64 || nullstr (js_cursor, nsid)
/// - val: u64 || HLL (count (not cursor), estimator)
/// - Hourly total record counts and dids estimate per collection
/// - key: "hourly_counts" || u64 || nullstr (hour, nsid)
/// - val: u64 || HLL (count (not cursor), estimator)
/// - Weekly total record counts and dids estimate per collection
-
/// - key: "weekly_counts" || u64 || nullstr (hour, nsid)
/// - val: u64 || HLL (count (not cursor), estimator)
/// - All-time total record counts and dids estimate per collection
/// - key: "ever_counts" || nullstr (nsid)
/// - val: u64 || HLL (count (not cursor), estimator)
-
/// - TODO: sorted indexes for all-times?
···
-
impl StorageWhatever<FjallReader, FjallWriter, FjallConfig> for FjallStorage {
-
) -> StorageResult<(FjallReader, FjallWriter, Option<Cursor>)> {
let config = Config::new(path);
-
let config = config.fsync_ms(Some(4_000));
···
let js_cursor = get_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?;
-
if js_cursor.is_some() {
get_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)?;
let JetstreamEndpointValue(stored) = stored_endpoint.ok_or(StorageError::InitError(
"found cursor but missing js_endpoint, refusing to start.".to_string(),
···
return Err(StorageError::InitError(format!(
-
"stored js_endpoint {stored:?} differs from provided {endpoint:?}, refusing to start.")));
-
insert_static_neu::<JetstreamEndpointKey>(
JetstreamEndpointValue(endpoint.to_string()),
-
insert_static_neu::<TakeoffKey>(&global, Cursor::at(SystemTime::now()))?;
-
insert_static_neu::<NewRollupCursorKey>(&global, Cursor::from_start())?;
let reader = FjallReader {
keyspace: keyspace.clone(),
···
rollups: rollups.clone(),
let writer = FjallWriter {
···
-
Ok((reader, writer, js_cursor))
···
fn get_storage_stats(&self) -> StorageResult<serde_json::Value> {
···
get_snapshot_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?
.map(|c| c.to_raw_u64());
Ok(ConsumerInfo::Jetstream {
-
fn get_top_collections(&self) -> Result<TopCollections, StorageError> {
-
// TODO: limit nsid traversal depth
-
// TODO: limit nsid traversal breadth
-
// TODO: be serious about anything
-
// TODO: probably use a stack of segments to reduce to ~log-n merges
-
children: HashMap<String, Blah>,
-
impl From<&Blah> for TopCollections {
-
fn from(bla: &Blah) -> Self {
-
total_records: bla.counts.records(),
-
dids_estimate: bla.counts.dids().estimate() as u64,
-
nsid_child_segments: HashMap::from_iter(
-
bla.children.iter().map(|(k, v)| (k.to_string(), v.into())),
-
let mut b = Blah::default();
-
let prefix = AllTimeRollupKey::from_prefix_to_db_bytes(&Default::default())?;
-
for kv in self.rollups.prefix(&prefix.to_db_bytes()?) {
-
let (key_bytes, val_bytes) = kv?;
-
let key = db_complete::<AllTimeRollupKey>(&key_bytes)?;
-
let val = db_complete::<CountsValue>(&val_bytes)?;
-
node.counts.merge(&val);
-
for segment in key.collection().split('.') {
-
node = node.children.entry(segment.to_string()).or_default();
-
node.counts.merge(&val);
-
fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> {
-
// 0. grab a snapshot in case rollups happen while we're working
-
let instant = self.keyspace.instant();
-
let global = self.global.snapshot_at(instant);
-
let rollups = self.rollups.snapshot_at(instant);
-
let all_time_key = AllTimeRollupKey::new(collection).to_db_bytes()?;
-
let mut total_counts = rollups
-
.map(db_complete::<CountsValue>)
-
// 2. live counts that haven't been rolled into all-time yet.
-
get_snapshot_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&global)?.ok_or(
-
StorageError::BadStateError("Could not find current rollup cursor".to_string()),
-
let full_range = LiveCountsKey::range_from_cursor(rollup_cursor)?;
-
for kv in rollups.range(full_range) {
-
let (key_bytes, val_bytes) = kv?;
-
let key = db_complete::<LiveCountsKey>(&key_bytes)?;
-
if key.collection() == collection {
-
let counts = db_complete::<CountsValue>(&val_bytes)?;
-
total_counts.merge(&counts);
-
total_counts.records(),
-
total_counts.dids().estimate() as u64,
fn get_records_by_collections(
expand_each_collection: bool,
) -> StorageResult<Vec<UFOsRecord>> {
···
let mut record_iterators = Vec::new();
for collection in collections {
-
let iter = RecordIterator::new(&self.feeds, self.records.clone(), collection, limit)?;
record_iterators.push(iter.peekable());
let mut merged = Vec::new();
···
tokio::task::spawn_blocking(move || FjallReader::get_consumer_info(&s)).await?
-
async fn get_top_collections(&self) -> Result<TopCollections, StorageError> {
-
tokio::task::spawn_blocking(move || FjallReader::get_top_collections(&s)).await?
-
async fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> {
let collection = collection.clone();
-
tokio::task::spawn_blocking(move || FjallReader::get_counts_by_collection(&s, &collection))
async fn get_records_by_collections(
expand_each_collection: bool,
) -> StorageResult<Vec<UFOsRecord>> {
-
let collections = collections.to_vec();
tokio::task::spawn_blocking(move || {
-
FjallReader::get_records_by_collections(&s, &collections, limit, expand_each_collection)
···
timelies: impl Iterator<Item = Result<(fjall::Slice, fjall::Slice), fjall::Error>>,
cursor_exclusive_limit: Option<Cursor>,
-
) -> StorageResult<usize> {
// current strategy is to buffer counts in mem before writing the rollups
// we *could* read+write every single batch to rollup.. but their merge is associative so
// ...so save the db some work up front? is this worth it? who knows...
#[derive(Eq, Hash, PartialEq)]
···
batch.remove(&self.rollups, key_bytes);
let val = db_complete::<CountsValue>(&val_bytes)?;
···
last_cursor = key.cursor();
for ((nsid, rollup), counts) in counts_by_rollup {
-
let key_bytes = match rollup {
Rollup::Hourly(hourly_cursor) => {
-
let k = HourlyRollupKey::new(hourly_cursor, &nsid);
Rollup::Weekly(weekly_cursor) => {
-
let k = WeeklyRollupKey::new(weekly_cursor, &nsid);
-
let k = AllTimeRollupKey::new(&nsid);
let mut rolled: CountsValue = self
.map(db_complete::<CountsValue>)
-
// try to round-trip before inserting, for funsies
-
let tripppin = counts.to_db_bytes()?;
-
let (and_back, n) = CountsValue::from_db_bytes(&tripppin)?;
-
assert_eq!(n, tripppin.len());
-
assert_eq!(counts.prefix, and_back.prefix);
-
assert_eq!(counts.dids().estimate(), and_back.dids().estimate());
-
if counts.records() > 200_000_000_000 {
-
panic!("COUNTS maybe wtf? {counts:?}")
-
batch.insert(&self.rollups, &key_bytes, &rolled.to_db_bytes()?);
insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, last_cursor)?;
-
impl StoreWriter for FjallWriter {
fn insert_batch<const LIMIT: usize>(
event_batch: EventBatch<LIMIT>,
···
let live_counts_key: LiveCountsKey = (latest, &nsid).into();
-
let counts_value = CountsValue::new(commits.total_seen as u64, commits.dids_estimate);
&live_counts_key.to_db_bytes()?,
···
-
fn step_rollup(&mut self) -> StorageResult<usize> {
get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?.ok_or(
StorageError::BadStateError("Could not find current rollup cursor".to_string()),
···
let live_counts_range = LiveCountsKey::range_from_cursor(rollup_cursor)?;
let mut timely_iter = self.rollups.range(live_counts_range).peekable();
-
let timely_next_cursor = timely_iter
-
.map(|kv| -> StorageResult<Cursor> {
Err(e) => Err(std::mem::replace(e, fjall::Error::Poisoned))?,
let key = db_complete::<LiveCountsKey>(key_bytes)?;
···
-
let cursors_stepped = match (timely_next_cursor, next_delete) {
-
Some(timely_next_cursor),
-
Some((delete_cursor, delete_key_bytes, delete_val_bytes)),
-
if timely_next_cursor < delete_cursor {
-
self.rollup_live_counts(
MAX_BATCHED_ROLLUP_COUNTS,
self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)?
-
self.rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS)?
(None, Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => {
self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)?
···
-
// TODO: could add a start cursor limit to avoid iterating deleted stuff at the start (/end)
-
) -> StorageResult<()> {
let mut dangling_feed_keys_cleaned = 0;
let mut records_deleted = 0;
-
let mut batch = self.keyspace.batch();
-
let prefix = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?;
-
for kv in self.feeds.prefix(prefix).rev() {
let (key_bytes, val_bytes) = kv?;
let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?;
let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?;
···
let Some(location_val_bytes) = self.records.get(&location_key_bytes)? else {
// record was deleted (hopefully)
-
batch.remove(&self.feeds, &location_key_bytes);
dangling_feed_keys_cleaned += 1;
let (meta, _) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?;
if meta.cursor() != feed_key.cursor() {
// older/different version
-
batch.remove(&self.feeds, &location_key_bytes);
dangling_feed_keys_cleaned += 1;
if meta.rev != feed_val.rev() {
log::warn!("record lookup: cursor match but rev did not...? removing.");
-
batch.remove(&self.feeds, &location_key_bytes);
dangling_feed_keys_cleaned += 1;
-
if batch.len() >= MAX_BATCHED_CLEANUP_SIZE {
-
batch = self.keyspace.batch();
-
batch.remove(&self.feeds, &location_key_bytes);
-
batch.remove(&self.records, &location_key_bytes);
-
log::info!("trim_collection ({collection:?}) removed {dangling_feed_keys_cleaned} dangling feed entries and {records_deleted} records");
fn delete_account(&mut self, did: &Did) -> Result<usize, StorageError> {
···
···
/// Set a value to a fixed key
fn insert_batch_static_neu<K: StaticStr>(
···
////////// temp stuff to remove:
-
// fn summarize_batch<const LIMIT: usize>(batch: &EventBatch<LIMIT>) -> String {
-
// "batch of {: >3} samples from {: >4} records in {: >2} collections from ~{: >4} DIDs, {} acct removes, cursor {: <12?}",
-
// batch.total_records(),
-
// batch.total_collections(),
-
// batch.estimate_dids(),
-
// batch.account_removes(),
-
// batch.latest_cursor().map(|c| c.elapsed()),
···
use serde_json::value::RawValue;
fn fjall_db() -> (FjallReader, FjallWriter) {
-
let (read, write, _) = FjallStorage::init(
tempfile::tempdir().unwrap(),
"offline test (no real jetstream endpoint)".to_string(),
···
const TEST_BATCH_LIMIT: usize = 16;
#[derive(Debug, Default)]
···
.entry(collection.clone())
-
.truncating_insert(commit)
···
.entry(collection.clone())
-
.truncating_insert(commit)
···
.entry(collection.clone())
-
.truncating_insert(commit)
···
fn test_hello() -> anyhow::Result<()> {
let (read, mut write) = fjall_db();
write.insert_batch::<TEST_BATCH_LIMIT>(EventBatch::default())?;
-
read.get_counts_by_collection(&Nsid::new("a.b.c".to_string()).unwrap())?;
-
assert_eq!(records, 0);
···
write.insert_batch(batch.batch)?;
-
let (records, dids) = read.get_counts_by_collection(&collection)?;
-
assert_eq!(records, 1);
-
read.get_counts_by_collection(&Nsid::new("d.e.f".to_string()).unwrap())?;
-
assert_eq!(records, 0);
-
let records = read.get_records_by_collections(&[collection], 2, false)?;
assert_eq!(records.len(), 1);
assert_eq!(rec.record.get(), "{}");
-
read.get_records_by_collections(&[Nsid::new("d.e.f".to_string()).unwrap()], 2, false)?;
assert_eq!(records.len(), 0);
···
write.insert_batch(batch.batch)?;
let records = read.get_records_by_collections(
Nsid::new("a.a.a".to_string()).unwrap(),
Nsid::new("a.a.b".to_string()).unwrap(),
Nsid::new("a.a.c".to_string()).unwrap(),
···
write.insert_batch(batch.batch)?;
let records = read.get_records_by_collections(
Nsid::new("a.a.a".to_string()).unwrap(),
Nsid::new("a.a.b".to_string()).unwrap(),
Nsid::new("a.a.c".to_string()).unwrap(),
···
write.insert_batch(batch.batch)?;
-
let (records, dids) = read.get_counts_by_collection(&collection)?;
-
assert_eq!(records, 1);
-
let records = read.get_records_by_collections(&[collection], 2, false)?;
assert_eq!(records.len(), 1);
assert_eq!(rec.record.get(), r#"{"ch": "ch-ch-ch-changes"}"#);
···
write.insert_batch(batch.batch)?;
-
let (records, dids) = read.get_counts_by_collection(&collection)?;
-
assert_eq!(records, 1);
-
let records = read.get_records_by_collections(&[collection], 2, false)?;
assert_eq!(records.len(), 0);
···
write.insert_batch(batch.batch)?;
let records = read.get_records_by_collections(
-
&[Nsid::new("a.a.a".to_string()).unwrap()],
assert_eq!(records.len(), 1);
let records = read.get_records_by_collections(
-
&[Nsid::new("a.a.b".to_string()).unwrap()],
assert_eq!(records.len(), 10);
let records = read.get_records_by_collections(
-
&[Nsid::new("a.a.c".to_string()).unwrap()],
assert_eq!(records.len(), 1);
let records = read.get_records_by_collections(
-
&[Nsid::new("a.a.d".to_string()).unwrap()],
assert_eq!(records.len(), 0);
-
write.trim_collection(&Nsid::new("a.a.a".to_string()).unwrap(), 6)?;
-
write.trim_collection(&Nsid::new("a.a.b".to_string()).unwrap(), 6)?;
-
write.trim_collection(&Nsid::new("a.a.c".to_string()).unwrap(), 6)?;
-
write.trim_collection(&Nsid::new("a.a.d".to_string()).unwrap(), 6)?;
let records = read.get_records_by_collections(
-
&[Nsid::new("a.a.a".to_string()).unwrap()],
assert_eq!(records.len(), 1);
let records = read.get_records_by_collections(
-
&[Nsid::new("a.a.b".to_string()).unwrap()],
assert_eq!(records.len(), 6);
let records = read.get_records_by_collections(
-
&[Nsid::new("a.a.c".to_string()).unwrap()],
assert_eq!(records.len(), 1);
let records = read.get_records_by_collections(
-
&[Nsid::new("a.a.d".to_string()).unwrap()],
···
write.insert_batch(batch.batch)?;
let records = read.get_records_by_collections(
-
&[Nsid::new("a.a.a".to_string()).unwrap()],
···
assert_eq!(records_deleted, 2);
let records = read.get_records_by_collections(
-
&[Nsid::new("a.a.a".to_string()).unwrap()],
···
-
read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 1, false)?;
assert_eq!(records.len(), 0);
···
write.insert_batch(batch.batch)?;
-
let n = write.step_rollup()?;
let mut batch = TestBatch::default();
batch.delete_account("did:plc:person-a", 10_001);
write.insert_batch(batch.batch)?;
-
read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 1, false)?;
assert_eq!(records.len(), 1);
-
let n = write.step_rollup()?;
-
read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 1, false)?;
assert_eq!(records.len(), 0);
let mut batch = TestBatch::default();
batch.delete_account("did:plc:person-a", 9_999);
write.insert_batch(batch.batch)?;
-
let n = write.step_rollup()?;
···
write.insert_batch(batch.batch)?;
-
let n = write.step_rollup()?;
-
let n = write.step_rollup()?;
···
write.insert_batch(batch.batch)?;
-
read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?;
-
assert_eq!(records, 3);
-
let n = write.step_rollup()?;
-
read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?;
-
assert_eq!(records, 3);
// delete account rolled up
-
let n = write.step_rollup()?;
-
read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?;
-
assert_eq!(records, 3);
// second batch rolled up
-
let n = write.step_rollup()?;
-
read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?;
-
assert_eq!(records, 3);
-
let n = write.step_rollup()?;
-
fn get_top_collections() -> anyhow::Result<()> {
let (read, mut write) = fjall_db();
let mut batch = TestBatch::default();
···
write.insert_batch(batch.batch)?;
-
let n = write.step_rollup()?;
-
assert_eq!(n, 3); // 3 collections
-
let tops = read.get_top_collections()?;
-
nsid_child_segments: HashMap::from([(
-
nsid_child_segments: HashMap::from([
-
nsid_child_segments: HashMap::from([
-
nsid_child_segments: HashMap::from([]),
-
nsid_child_segments: HashMap::from([]),
-
nsid_child_segments: HashMap::from([(
-
nsid_child_segments: HashMap::from([]),
-
fn get_top_collections_with_parent_nsid() -> anyhow::Result<()> {
let (read, mut write) = fjall_db();
let mut batch = TestBatch::default();
-
"did:plc:inze6wrmsm7pjl7yta3oig77",
-
"did:plc:inze6wrmsm7pjl7yta3oig77",
write.insert_batch(batch.batch)?;
-
let n = write.step_rollup()?;
-
assert_eq!(n, 2); // 3 collections
-
let tops = read.get_top_collections()?;
-
nsid_child_segments: HashMap::from([(
-
nsid_child_segments: HashMap::from([(
-
nsid_child_segments: HashMap::from([(
-
nsid_child_segments: HashMap::from([(
-
nsid_child_segments: HashMap::from([]),
-
// TODO: handle leaf node counts explicitly, since parent NSIDs can be leaves themselves
···
+
db_complete, DbBytes, DbStaticStr, EncodingResult, StaticStr, SubPrefixBytes,
use crate::error::StorageError;
+
use crate::storage::{StorageResult, StorageWhatever, StoreBackground, StoreReader, StoreWriter};
use crate::store_types::{
+
AllTimeDidsKey, AllTimeRecordsKey, AllTimeRollupKey, CommitCounts, CountsValue, CursorBucket,
+
DeleteAccountQueueKey, DeleteAccountQueueVal, HourTruncatedCursor, HourlyDidsKey,
+
HourlyRecordsKey, HourlyRollupKey, HourlyRollupStaticPrefix, JetstreamCursorKey,
+
JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey,
+
NewRollupCursorKey, NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal,
+
RecordLocationKey, RecordLocationMeta, RecordLocationVal, RecordRawValue, SketchSecretKey,
+
SketchSecretPrefix, TakeoffKey, TakeoffValue, TrimCollectionCursorKey, WeekTruncatedCursor,
+
WeeklyDidsKey, WeeklyRecordsKey, WeeklyRollupKey, WithCollection, WithRank, HOUR_IN_MICROS,
+
nice_duration, CommitAction, ConsumerInfo, Did, EncodingError, EventBatch, JustCount, Nsid,
+
NsidCount, NsidPrefix, OrderCollectionsBy, PrefixChild, PrefixCount, UFOsRecord,
use async_trait::async_trait;
+
Batch as FjallBatch, Config, Keyspace, PartitionCreateOptions, PartitionHandle, Snapshot,
use jetstream::events::Cursor;
+
use std::collections::{HashMap, HashSet};
+
use std::iter::Peekable;
+
atomic::{AtomicBool, Ordering},
+
use std::time::{Duration, Instant, SystemTime};
const MAX_BATCHED_ACCOUNT_DELETE_RECORDS: usize = 1024;
const MAX_BATCHED_ROLLUP_COUNTS: usize = 256;
···
/// - key: "takeoff" (literal)
/// - val: u64 (micros timestamp, not from jetstream for now so not precise)
+
/// - Cardinality estimator secret
+
/// - key: "sketch_secret" (literal)
/// - Rollup cursor (bg work: roll stats into hourlies, delete accounts, old record deletes)
/// - key: "rollup_cursor" (literal)
/// - val: u64 (tracks behind js_cursor)
+
/// - Feed trim cursor (bg work: delete oldest excess records)
+
/// - key: "trim_cursor" || nullstr (nsid)
+
/// - val: u64 (earliest previously-removed feed entry jetstream cursor)
···
/// - key: "live_counts" || u64 || nullstr (js_cursor, nsid)
/// - val: u64 || HLL (count (not cursor), estimator)
/// - Hourly total record counts and dids estimate per collection
/// - key: "hourly_counts" || u64 || nullstr (hour, nsid)
/// - val: u64 || HLL (count (not cursor), estimator)
+
/// - Hourly record count ranking
+
/// - key: "hourly_rank_records" || u64 || u64 || nullstr (hour, count, nsid)
+
/// - Hourly did estimate ranking
+
/// - key: "hourly_rank_dids" || u64 || u64 || nullstr (hour, dids estimate, nsid)
/// - Weekly total record counts and dids estimate per collection
+
/// - key: "weekly_counts" || u64 || nullstr (week, nsid)
/// - val: u64 || HLL (count (not cursor), estimator)
+
/// - Weekly record count ranking
+
/// - key: "weekly_rank_records" || u64 || u64 || nullstr (week, count, nsid)
+
/// - Weekly did estimate ranking
+
/// - key: "weekly_rank_dids" || u64 || u64 || nullstr (week, dids estimate, nsid)
/// - All-time total record counts and dids estimate per collection
/// - key: "ever_counts" || nullstr (nsid)
/// - val: u64 || HLL (count (not cursor), estimator)
+
/// - All-time total record record count ranking
+
/// - key: "ever_rank_records" || u64 || nullstr (count, nsid)
+
/// - All-time did estimate ranking
+
/// - key: "ever_rank_dids" || u64 || nullstr (dids estimate, nsid)
···
+
impl StorageWhatever<FjallReader, FjallWriter, FjallBackground, FjallConfig> for FjallStorage {
+
) -> StorageResult<(FjallReader, FjallWriter, Option<Cursor>, SketchSecretPrefix)> {
let config = Config::new(path);
+
// let config = config.fsync_ms(Some(4_000));
···
let js_cursor = get_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?;
+
let sketch_secret = if js_cursor.is_some() {
get_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)?;
let JetstreamEndpointValue(stored) = stored_endpoint.ok_or(StorageError::InitError(
"found cursor but missing js_endpoint, refusing to start.".to_string(),
+
let Some(stored_secret) =
+
get_static_neu::<SketchSecretKey, SketchSecretPrefix>(&global)?
+
return Err(StorageError::InitError(
+
"found cursor but missing sketch_secret, refusing to start.".to_string(),
···
return Err(StorageError::InitError(format!(
+
"stored js_endpoint {stored:?} differs from provided {endpoint:?}, refusing to start without --jetstream-force.")));
+
log::info!("initializing a fresh db!");
+
init_static_neu::<JetstreamEndpointKey>(
JetstreamEndpointValue(endpoint.to_string()),
+
log::info!("generating new secret for cardinality sketches...");
+
let mut sketch_secret: SketchSecretPrefix = [0u8; 16];
+
getrandom::fill(&mut sketch_secret).map_err(|e| {
+
StorageError::InitError(format!(
+
"failed to get a random secret for cardinality sketches: {e:?}"
+
init_static_neu::<SketchSecretKey>(&global, sketch_secret)?;
+
init_static_neu::<TakeoffKey>(&global, Cursor::at(SystemTime::now()))?;
+
init_static_neu::<NewRollupCursorKey>(&global, Cursor::from_start())?;
let reader = FjallReader {
keyspace: keyspace.clone(),
···
rollups: rollups.clone(),
let writer = FjallWriter {
+
bg_taken: Arc::new(AtomicBool::new(false)),
···
+
Ok((reader, writer, js_cursor, sketch_secret))
···
+
type GetCounts = Box<dyn FnOnce() -> StorageResult<CountsValue>>;
+
type GetByterCounts = StorageResult<(Nsid, GetCounts)>;
+
type NsidCounter = Box<dyn Iterator<Item = GetByterCounts>>;
+
fn get_lexi_iter<T: WithCollection + DbBytes + 'static>(
+
) -> StorageResult<NsidCounter> {
+
Ok(Box::new(snapshot.range((start, end)).map(|kv| {
+
let (k_bytes, v_bytes) = kv?;
+
let key = db_complete::<T>(&k_bytes)?;
+
let nsid = key.collection().clone();
+
let get_counts: GetCounts = Box::new(move || Ok(db_complete::<CountsValue>(&v_bytes)?));
+
type GetRollupKey = Arc<dyn Fn(&Nsid) -> EncodingResult<Vec<u8>>>;
+
fn get_lookup_iter<T: WithCollection + WithRank + DbBytes + 'static>(
+
snapshot: lsm_tree::Snapshot,
+
get_rollup_key: GetRollupKey,
+
) -> StorageResult<NsidCounter> {
+
Ok(Box::new(snapshot.range((start, end)).rev().map(
+
let (k_bytes, _) = kv?;
+
let key = db_complete::<T>(&k_bytes)?;
+
let nsid = key.collection().clone();
+
let get_counts: GetCounts = Box::new({
+
let nsid = nsid.clone();
+
let snapshot = snapshot.clone();
+
let get_rollup_key = get_rollup_key.clone();
+
let db_count_bytes = snapshot.get(get_rollup_key(&nsid)?)?.expect(
+
"integrity: all-time rank rollup must have corresponding all-time count rollup",
+
Ok(db_complete::<CountsValue>(&db_count_bytes)?)
+
type CollectionSerieses = HashMap<Nsid, Vec<CountsValue>>;
fn get_storage_stats(&self) -> StorageResult<serde_json::Value> {
···
get_snapshot_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?
.map(|c| c.to_raw_u64());
+
get_snapshot_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&global)?
+
.map(|c| c.to_raw_u64());
Ok(ConsumerInfo::Jetstream {
+
fn get_earliest_hour(&self, rollups: Option<&Snapshot>) -> StorageResult<HourTruncatedCursor> {
+
.unwrap_or(&self.rollups.snapshot())
+
.prefix(HourlyRollupStaticPrefix::default().to_db_bytes()?)
+
.map(|(key_bytes, _)| db_complete::<HourlyRollupKey>(&key_bytes))
+
.map(|key| key.cursor())
+
.unwrap_or_else(|| Cursor::from_start().into());
+
fn get_lexi_collections(
+
cursor: Option<Vec<u8>>,
+
buckets: Vec<CursorBucket>,
+
) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> {
+
let cursor_nsid = cursor.as_deref().map(db_complete::<Nsid>).transpose()?;
+
let mut iters: Vec<Peekable<NsidCounter>> = Vec::with_capacity(buckets.len());
+
for bucket in &buckets {
+
let it: NsidCounter = match bucket {
+
CursorBucket::Hour(t) => {
+
let start = cursor_nsid
+
.map(|nsid| HourlyRollupKey::after_nsid(*t, nsid))
+
.unwrap_or_else(|| HourlyRollupKey::start(*t))?;
+
let end = HourlyRollupKey::end(*t)?;
+
get_lexi_iter::<HourlyRollupKey>(&snapshot, start, end)?
+
CursorBucket::Week(t) => {
+
let start = cursor_nsid
+
.map(|nsid| WeeklyRollupKey::after_nsid(*t, nsid))
+
.unwrap_or_else(|| WeeklyRollupKey::start(*t))?;
+
let end = WeeklyRollupKey::end(*t)?;
+
get_lexi_iter::<WeeklyRollupKey>(&snapshot, start, end)?
+
CursorBucket::AllTime => {
+
let start = cursor_nsid
+
.map(AllTimeRollupKey::after_nsid)
+
.unwrap_or_else(AllTimeRollupKey::start)?;
+
let end = AllTimeRollupKey::end()?;
+
get_lexi_iter::<AllTimeRollupKey>(&snapshot, start, end)?
+
iters.push(it.peekable());
+
let mut out = Vec::new();
+
let mut current_nsid = None;
+
// double-scan the iters for each element: this could be eliminated but we're starting simple.
+
// first scan: find the lowest nsid
+
// second scan: take + merge, and advance all iters with lowest nsid
+
let mut lowest: Option<Nsid> = None;
+
for iter in &mut iters {
+
if let Some(bla) = iter.peek_mut() {
+
let (nsid, _) = match bla {
+
Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?,
+
lowest = match lowest {
+
Some(ref current) if nsid.as_str() > current.as_str() => lowest,
+
_ => Some(nsid.clone()),
+
current_nsid = lowest.clone();
+
let Some(nsid) = lowest else { break };
+
let mut merged = CountsValue::default();
+
for iter in &mut iters {
+
// unwrap: potential fjall error was already checked & bailed over when peeking in the first loop
+
if let Some(Ok((_, get_counts))) = iter.next_if(|v| v.as_ref().unwrap().0 == nsid) {
+
let counts = get_counts()?;
+
nsid: nsid.to_string(),
+
creates: merged.counts().creates,
+
dids_estimate: merged.dids().estimate() as u64,
+
let next_cursor = current_nsid.map(|s| s.to_db_bytes()).transpose()?;
+
fn get_ordered_collections(
+
order: OrderCollectionsBy,
+
buckets: Vec<CursorBucket>,
+
) -> StorageResult<Vec<NsidCount>> {
+
let mut iters: Vec<NsidCounter> = Vec::with_capacity(buckets.len());
+
for bucket in buckets {
+
let it: NsidCounter = match (&order, bucket) {
+
(OrderCollectionsBy::RecordsCreated, CursorBucket::Hour(t)) => {
+
get_lookup_iter::<HourlyRecordsKey>(
+
HourlyRecordsKey::start(t)?,
+
HourlyRecordsKey::end(t)?,
+
move |collection| HourlyRollupKey::new(t, collection).to_db_bytes()
+
(OrderCollectionsBy::DidsEstimate, CursorBucket::Hour(t)) => {
+
get_lookup_iter::<HourlyDidsKey>(
+
HourlyDidsKey::start(t)?,
+
HourlyDidsKey::end(t)?,
+
move |collection| HourlyRollupKey::new(t, collection).to_db_bytes()
+
(OrderCollectionsBy::RecordsCreated, CursorBucket::Week(t)) => {
+
get_lookup_iter::<WeeklyRecordsKey>(
+
WeeklyRecordsKey::start(t)?,
+
WeeklyRecordsKey::end(t)?,
+
move |collection| WeeklyRollupKey::new(t, collection).to_db_bytes()
+
(OrderCollectionsBy::DidsEstimate, CursorBucket::Week(t)) => {
+
get_lookup_iter::<WeeklyDidsKey>(
+
WeeklyDidsKey::start(t)?,
+
WeeklyDidsKey::end(t)?,
+
move |collection| WeeklyRollupKey::new(t, collection).to_db_bytes()
+
(OrderCollectionsBy::RecordsCreated, CursorBucket::AllTime) => {
+
get_lookup_iter::<AllTimeRecordsKey>(
+
AllTimeRecordsKey::start()?,
+
AllTimeRecordsKey::end()?,
+
Arc::new(|collection| AllTimeRollupKey::new(collection).to_db_bytes()),
+
(OrderCollectionsBy::DidsEstimate, CursorBucket::AllTime) => {
+
get_lookup_iter::<AllTimeDidsKey>(
+
AllTimeDidsKey::start()?,
+
AllTimeDidsKey::end()?,
+
Arc::new(|collection| AllTimeRollupKey::new(collection).to_db_bytes()),
+
(OrderCollectionsBy::Lexi { .. }, _) => unreachable!(),
+
// overfetch by taking a bit more than the limit
+
// sort by requested order, take limit, discard all remaining
+
// this isn't guaranteed to be correct, but it will hopefully be close most of the time:
+
// - it's possible that some NSIDs might score low during some time-buckets, and miss being merged
+
// - overfetching hopefully helps a bit by catching nsids near the threshold more often, but. yeah.
+
// this thing is heavy, there's probably a better way
+
let mut ranked: HashMap<Nsid, CountsValue> = HashMap::with_capacity(limit * 2);
+
for pair in iter.take((limit as f64 * 1.3).ceil() as usize) {
+
let (nsid, get_counts) = pair?;
+
let counts = get_counts()?;
+
ranked.entry(nsid).or_default().merge(&counts);
+
let mut ranked: Vec<(Nsid, CountsValue)> = ranked.into_iter().collect();
+
OrderCollectionsBy::RecordsCreated => ranked.sort_by_key(|(_, c)| c.counts().creates),
+
OrderCollectionsBy::DidsEstimate => ranked.sort_by_key(|(_, c)| c.dids().estimate()),
+
OrderCollectionsBy::Lexi { .. } => unreachable!(),
+
.map(|(nsid, cv)| NsidCount {
+
nsid: nsid.to_string(),
+
creates: cv.counts().creates,
+
dids_estimate: cv.dids().estimate() as u64,
+
order: OrderCollectionsBy,
+
since: Option<HourTruncatedCursor>,
+
until: Option<HourTruncatedCursor>,
+
) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> {
+
let snapshot = self.rollups.snapshot();
+
let buckets = if let (None, None) = (since, until) {
+
vec![CursorBucket::AllTime]
+
let mut lower = self.get_earliest_hour(Some(&snapshot))?;
+
if let Some(specified) = since {
+
let upper = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into());
+
CursorBucket::buckets_spanning(lower, upper)
+
OrderCollectionsBy::Lexi { cursor } => {
+
self.get_lexi_collections(snapshot, limit, cursor, buckets)
+
self.get_ordered_collections(snapshot, limit, order, buckets)?,
+
cursor: Option<Vec<u8>>,
+
buckets: Vec<CursorBucket>,
+
) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)> {
+
// let prefix_sub_with_null = prefix.as_str().to_string().to_db_bytes()?;
+
let prefix_sub = String::sub_prefix(&prefix.terminated())?; // with trailing dot to ensure full segment match
+
let cursor_child = cursor
+
let decoded: String = db_complete(encoded_bytes)?;
+
// TODO: write some tests for cursors, there's probably bugs here
+
let as_sub_prefix_with_null = decoded.to_db_bytes()?;
+
Ok::<_, EncodingError>(as_sub_prefix_with_null)
+
let mut iters: Vec<NsidCounter> = Vec::with_capacity(buckets.len());
+
for bucket in &buckets {
+
let it: NsidCounter = match bucket {
+
CursorBucket::Hour(t) => {
+
let start = cursor_child
+
.map(|child| HourlyRollupKey::after_nsid_prefix(*t, child))
+
.unwrap_or_else(|| HourlyRollupKey::after_nsid_prefix(*t, &prefix_sub))?;
+
let end = HourlyRollupKey::nsid_prefix_end(*t, &prefix_sub)?;
+
get_lexi_iter::<HourlyRollupKey>(&snapshot, start, end)?
+
CursorBucket::Week(t) => {
+
let start = cursor_child
+
.map(|child| WeeklyRollupKey::after_nsid_prefix(*t, child))
+
.unwrap_or_else(|| WeeklyRollupKey::after_nsid_prefix(*t, &prefix_sub))?;
+
let end = WeeklyRollupKey::nsid_prefix_end(*t, &prefix_sub)?;
+
get_lexi_iter::<WeeklyRollupKey>(&snapshot, start, end)?
+
CursorBucket::AllTime => {
+
let start = cursor_child
+
.map(|child| AllTimeRollupKey::after_nsid_prefix(child))
+
.unwrap_or_else(|| AllTimeRollupKey::after_nsid_prefix(&prefix_sub))?;
+
let end = AllTimeRollupKey::nsid_prefix_end(&prefix_sub)?;
+
get_lexi_iter::<AllTimeRollupKey>(&snapshot, start, end)?
+
let mut iters: Vec<_> = iters
+
let Some(child) = Child::from_prefix(&nsid, &prefix) else {
+
panic!("failed from_prefix: {nsid:?} {prefix:?} (bad iter bounds?)");
+
let mut items = Vec::new();
+
let mut prefix_count = CountsValue::default();
+
#[derive(Debug, Clone, PartialEq)]
+
fn from_prefix(nsid: &Nsid, prefix: &NsidPrefix) -> Option<Self> {
+
if prefix.is_group_of(nsid) {
+
return Some(Child::FullNsid(nsid.to_string()));
+
let suffix = nsid.as_str().strip_prefix(&format!("{}.", prefix.0))?;
+
let (segment, _) = suffix.split_once('.').unwrap();
+
let child_prefix = format!("{}.{segment}", prefix.0);
+
Some(Child::ChildPrefix(child_prefix))
+
fn is_before(&self, other: &Child) -> bool {
+
(Child::FullNsid(s), Child::ChildPrefix(o)) if s == o => true,
+
(Child::ChildPrefix(s), Child::FullNsid(o)) if s == o => false,
+
(Child::FullNsid(s), Child::FullNsid(o)) => s < o,
+
(Child::ChildPrefix(s), Child::ChildPrefix(o)) => s < o,
+
(Child::FullNsid(s), Child::ChildPrefix(o)) => s < o,
+
(Child::ChildPrefix(s), Child::FullNsid(o)) => s < o,
+
fn into_inner(self) -> String {
+
Child::FullNsid(s) => s,
+
Child::ChildPrefix(s) => s,
+
let mut current_child: Option<Child> = None;
+
// double-scan the iters for each element: this could be eliminated but we're starting simple.
+
// first scan: find the lowest nsid
+
// second scan: take + merge, and advance all iters with lowest nsid
+
let mut lowest: Option<Child> = None;
+
for iter in &mut iters {
+
if let Some(bla) = iter.peek_mut() {
+
let (child, _) = match bla {
+
Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?,
+
lowest = match lowest {
+
Some(ref current) if current.is_before(child) => lowest,
+
_ => Some(child.clone()),
+
current_child = lowest.clone();
+
let Some(child) = lowest else { break };
+
let mut merged = CountsValue::default();
+
for iter in &mut iters {
+
// unwrap: potential fjall error was already checked & bailed over when peeking in the first loop
+
while let Some(Ok((_, get_counts))) =
+
iter.next_if(|v| v.as_ref().unwrap().0 == child)
+
let counts = get_counts()?;
+
prefix_count.merge(&counts);
+
items.push(match child {
+
Child::FullNsid(nsid) => PrefixChild::Collection(NsidCount {
+
creates: merged.counts().creates,
+
dids_estimate: merged.dids().estimate() as u64,
+
Child::ChildPrefix(prefix) => PrefixChild::Prefix(PrefixCount {
+
creates: merged.counts().creates,
+
dids_estimate: merged.dids().estimate() as u64,
+
// TODO: could serialize the prefix count (with sketch) into the cursor so that uniqs can actually count up?
+
// ....er the sketch is probably too big
+
// TODO: this is probably buggy on child-type boundaries bleh
+
let next_cursor = current_child
+
.map(|s| s.into_inner().to_db_bytes())
+
Ok(((&prefix_count).into(), items, next_cursor))
+
order: OrderCollectionsBy,
+
since: Option<HourTruncatedCursor>,
+
until: Option<HourTruncatedCursor>,
+
) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)> {
+
let snapshot = self.rollups.snapshot();
+
let buckets = if let (None, None) = (since, until) {
+
vec![CursorBucket::AllTime]
+
let mut lower = self.get_earliest_hour(Some(&snapshot))?;
+
if let Some(specified) = since {
+
let upper = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into());
+
CursorBucket::buckets_spanning(lower, upper)
+
OrderCollectionsBy::Lexi { cursor } => {
+
self.get_lexi_prefix(snapshot, prefix, limit, cursor, buckets)
+
/// - step: output series time step, in seconds
+
collections: Vec<Nsid>,
+
since: HourTruncatedCursor,
+
until: Option<HourTruncatedCursor>,
+
) -> StorageResult<(Vec<HourTruncatedCursor>, CollectionSerieses)> {
+
if step > WEEK_IN_MICROS {
+
panic!("week-stepping is todo");
+
let until = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into());
+
let Ok(dt) = Cursor::from(until).duration_since(&Cursor::from(since)) else {
+
// empty: until < since
+
collections.into_iter().map(|c| (c, vec![])).collect(),
+
let n_hours = (dt.as_micros() as u64) / HOUR_IN_MICROS;
+
let mut counts_by_hour = Vec::with_capacity(n_hours as usize);
+
let snapshot = self.rollups.snapshot();
+
for hour in (0..n_hours).map(|i| since.nth_next(i)) {
+
let mut counts = Vec::with_capacity(collections.len());
+
for nsid in &collections {
+
.get(&HourlyRollupKey::new(hour, nsid).to_db_bytes()?)?
+
.map(db_complete::<CountsValue>)
+
counts_by_hour.push((hour, counts));
+
let step_hours = step / (HOUR_IN_MICROS / 1_000_000);
+
let mut output_hours = Vec::with_capacity(step_hours as usize);
+
let mut output_series: CollectionSerieses = collections
+
.map(|c| (c.clone(), Vec::with_capacity(step_hours as usize)))
+
for chunk in counts_by_hour.chunks(step_hours as usize) {
+
output_hours.push(chunk[0].0); // always guaranteed to have at least one element in a chunks chunk
+
for (i, collection) in collections.iter().enumerate() {
+
let mut c = CountsValue::default();
+
for (_, counts) in chunk {
+
.expect("output series is initialized with all collections")
+
Ok((output_hours, output_series))
+
fn get_collection_counts(
+
since: HourTruncatedCursor,
+
until: Option<HourTruncatedCursor>,
+
) -> StorageResult<JustCount> {
+
// grab snapshots in case rollups happen while we're working
+
let rollups = self.rollups.snapshot();
+
let until = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into());
+
let buckets = CursorBucket::buckets_spanning(since, until);
+
let mut total_counts = CountsValue::default();
+
for bucket in buckets {
+
let key = match bucket {
+
CursorBucket::Hour(t) => HourlyRollupKey::new(t, collection).to_db_bytes()?,
+
CursorBucket::Week(t) => WeeklyRollupKey::new(t, collection).to_db_bytes()?,
+
CursorBucket::AllTime => unreachable!(), // TODO: fall back on this if the time span spans the whole dataset?
+
.map(db_complete::<CountsValue>)
+
total_counts.merge(&count);
+
Ok((&total_counts).into())
fn get_records_by_collections(
+
collections: HashSet<Nsid>,
expand_each_collection: bool,
) -> StorageResult<Vec<UFOsRecord>> {
···
let mut record_iterators = Vec::new();
for collection in collections {
+
let iter = RecordIterator::new(&self.feeds, self.records.clone(), &collection, limit)?;
record_iterators.push(iter.peekable());
let mut merged = Vec::new();
···
tokio::task::spawn_blocking(move || FjallReader::get_consumer_info(&s)).await?
+
async fn get_collections(
+
order: OrderCollectionsBy,
+
since: Option<HourTruncatedCursor>,
+
until: Option<HourTruncatedCursor>,
+
) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> {
+
tokio::task::spawn_blocking(move || {
+
FjallReader::get_collections(&s, limit, order, since, until)
+
order: OrderCollectionsBy,
+
since: Option<HourTruncatedCursor>,
+
until: Option<HourTruncatedCursor>,
+
) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)> {
+
tokio::task::spawn_blocking(move || {
+
FjallReader::get_prefix(&s, prefix, limit, order, since, until)
+
async fn get_timeseries(
+
collections: Vec<Nsid>,
+
since: HourTruncatedCursor,
+
until: Option<HourTruncatedCursor>,
+
) -> StorageResult<(Vec<HourTruncatedCursor>, CollectionSerieses)> {
+
tokio::task::spawn_blocking(move || {
+
FjallReader::get_timeseries(&s, collections, since, until, step)
+
async fn get_collection_counts(
+
since: HourTruncatedCursor,
+
until: Option<HourTruncatedCursor>,
+
) -> StorageResult<JustCount> {
let collection = collection.clone();
+
tokio::task::spawn_blocking(move || {
+
FjallReader::get_collection_counts(&s, &collection, since, until)
async fn get_records_by_collections(
+
collections: HashSet<Nsid>,
expand_each_collection: bool,
) -> StorageResult<Vec<UFOsRecord>> {
tokio::task::spawn_blocking(move || {
+
FjallReader::get_records_by_collections(&s, collections, limit, expand_each_collection)
+
bg_taken: Arc<AtomicBool>,
···
timelies: impl Iterator<Item = Result<(fjall::Slice, fjall::Slice), fjall::Error>>,
cursor_exclusive_limit: Option<Cursor>,
+
) -> StorageResult<(usize, HashSet<Nsid>)> {
// current strategy is to buffer counts in mem before writing the rollups
// we *could* read+write every single batch to rollup.. but their merge is associative so
// ...so save the db some work up front? is this worth it? who knows...
+
let mut dirty_nsids = HashSet::new();
#[derive(Eq, Hash, PartialEq)]
···
+
dirty_nsids.insert(key.collection().clone());
batch.remove(&self.rollups, key_bytes);
let val = db_complete::<CountsValue>(&val_bytes)?;
···
last_cursor = key.cursor();
+
// go through each new rollup thing and merge it with whatever might already be in the db
for ((nsid, rollup), counts) in counts_by_rollup {
+
let rollup_key_bytes = match rollup {
Rollup::Hourly(hourly_cursor) => {
+
HourlyRollupKey::new(hourly_cursor, &nsid).to_db_bytes()?
Rollup::Weekly(weekly_cursor) => {
+
WeeklyRollupKey::new(weekly_cursor, &nsid).to_db_bytes()?
+
Rollup::AllTime => AllTimeRollupKey::new(&nsid).to_db_bytes()?,
let mut rolled: CountsValue = self
+
.get(&rollup_key_bytes)?
.map(db_complete::<CountsValue>)
+
// now that we have values, we can know the exising ranks
+
let before_creates_count = rolled.counts().creates;
+
let before_dids_estimate = rolled.dids().estimate() as u64;
+
let new_creates_count = rolled.counts().creates;
+
let new_dids_estimate = rolled.dids().estimate() as u64;
+
// update create-ranked secondary index if rank changed
+
if new_creates_count != before_creates_count {
+
let (old_k, new_k) = match rollup {
+
Rollup::Hourly(cursor) => (
+
HourlyRecordsKey::new(cursor, before_creates_count.into(), &nsid)
+
HourlyRecordsKey::new(cursor, new_creates_count.into(), &nsid)
+
Rollup::Weekly(cursor) => (
+
WeeklyRecordsKey::new(cursor, before_creates_count.into(), &nsid)
+
WeeklyRecordsKey::new(cursor, new_creates_count.into(), &nsid)
+
AllTimeRecordsKey::new(before_creates_count.into(), &nsid).to_db_bytes()?,
+
AllTimeRecordsKey::new(new_creates_count.into(), &nsid).to_db_bytes()?,
+
batch.remove(&self.rollups, &old_k); // TODO: when fjall gets weak delete, this will hopefully work way better
+
batch.insert(&self.rollups, &new_k, "");
+
// update dids-ranked secondary index if rank changed
+
if new_dids_estimate != before_dids_estimate {
+
let (old_k, new_k) = match rollup {
+
Rollup::Hourly(cursor) => (
+
HourlyDidsKey::new(cursor, before_dids_estimate.into(), &nsid)
+
HourlyDidsKey::new(cursor, new_dids_estimate.into(), &nsid)
+
Rollup::Weekly(cursor) => (
+
WeeklyDidsKey::new(cursor, before_dids_estimate.into(), &nsid)
+
WeeklyDidsKey::new(cursor, new_dids_estimate.into(), &nsid)
+
AllTimeDidsKey::new(before_dids_estimate.into(), &nsid).to_db_bytes()?,
+
AllTimeDidsKey::new(new_dids_estimate.into(), &nsid).to_db_bytes()?,
+
batch.remove(&self.rollups, &old_k); // TODO: when fjall gets weak delete, this will hopefully work way better
+
batch.insert(&self.rollups, &new_k, "");
+
// replace the main counts rollup
+
batch.insert(&self.rollups, &rollup_key_bytes, &rolled.to_db_bytes()?);
insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, last_cursor)?;
+
Ok((cursors_advanced, dirty_nsids))
+
impl StoreWriter<FjallBackground> for FjallWriter {
+
fn background_tasks(&mut self, reroll: bool) -> StorageResult<FjallBackground> {
+
if self.bg_taken.swap(true, Ordering::SeqCst) {
+
Err(StorageError::BackgroundAlreadyStarted)
+
log::info!("reroll: resetting rollup cursor...");
+
insert_static_neu::<NewRollupCursorKey>(&self.global, Cursor::from_start())?;
+
log::info!("reroll: clearing trim cursors...");
+
let mut batch = self.keyspace.batch();
+
.prefix(TrimCollectionCursorKey::from_prefix_to_db_bytes(
+
batch.remove(&self.global, k);
+
log::info!("reroll: cleared {n} trim cursors.");
+
Ok(FjallBackground(self.clone()))
fn insert_batch<const LIMIT: usize>(
event_batch: EventBatch<LIMIT>,
···
let live_counts_key: LiveCountsKey = (latest, &nsid).into();
+
let counts_value = CountsValue::new(
+
creates: commits.creates as u64,
+
updates: commits.updates as u64,
+
deletes: commits.deletes as u64,
&live_counts_key.to_db_bytes()?,
···
+
fn step_rollup(&mut self) -> StorageResult<(usize, HashSet<Nsid>)> {
+
let mut dirty_nsids = HashSet::new();
get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?.ok_or(
StorageError::BadStateError("Could not find current rollup cursor".to_string()),
···
let live_counts_range = LiveCountsKey::range_from_cursor(rollup_cursor)?;
let mut timely_iter = self.rollups.range(live_counts_range).peekable();
+
let timely_next = timely_iter
+
.map(|kv| -> StorageResult<LiveCountsKey> {
Err(e) => Err(std::mem::replace(e, fjall::Error::Poisoned))?,
let key = db_complete::<LiveCountsKey>(key_bytes)?;
···
+
let cursors_stepped = match (timely_next, next_delete) {
+
(Some(timely), Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => {
+
if timely.cursor() < delete_cursor {
+
let (n, dirty) = self.rollup_live_counts(
MAX_BATCHED_ROLLUP_COUNTS,
+
dirty_nsids.extend(dirty);
self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)?
+
self.rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS)?;
+
dirty_nsids.extend(dirty);
(None, Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => {
self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)?
···
+
Ok((cursors_stepped, dirty_nsids))
+
) -> StorageResult<(usize, usize, bool)> {
let mut dangling_feed_keys_cleaned = 0;
let mut records_deleted = 0;
+
let live_range = if full_scan {
+
let start = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?;
+
let end = NsidRecordFeedKey::prefix_range_end(collection)?;
+
let feed_trim_cursor_key =
+
TrimCollectionCursorKey::new(collection.clone()).to_db_bytes()?;
+
.get(&feed_trim_cursor_key)?
+
.map(|value_bytes| db_complete(&value_bytes))
+
.unwrap_or(Cursor::from_start());
+
NsidRecordFeedKey::from_pair(collection.clone(), trim_cursor).range_to_prefix_end()?
+
let mut live_records_found = 0;
+
let mut candidate_new_feed_lower_cursor = None;
+
let ended_early = false;
+
let mut current_cursor: Option<Cursor> = None;
+
for (i, kv) in self.feeds.range(live_range).rev().enumerate() {
+
if i > 0 && i % 500_000 == 0 {
+
"trim: at {i} for {:?} (now at {})",
+
collection.to_string(),
+
.unwrap_or("[not past]".into()))
+
.unwrap_or("??".into()),
let (key_bytes, val_bytes) = kv?;
let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?;
let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?;
···
let Some(location_val_bytes) = self.records.get(&location_key_bytes)? else {
// record was deleted (hopefully)
+
self.feeds.remove(&*key_bytes)?;
dangling_feed_keys_cleaned += 1;
let (meta, _) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?;
+
current_cursor = Some(meta.cursor());
if meta.cursor() != feed_key.cursor() {
// older/different version
+
self.feeds.remove(&*key_bytes)?;
dangling_feed_keys_cleaned += 1;
if meta.rev != feed_val.rev() {
log::warn!("record lookup: cursor match but rev did not...? removing.");
+
self.records.remove(&location_key_bytes)?;
+
self.feeds.remove(&*key_bytes)?;
dangling_feed_keys_cleaned += 1;
+
live_records_found += 1;
+
if live_records_found <= limit {
+
if candidate_new_feed_lower_cursor.is_none() {
+
candidate_new_feed_lower_cursor = Some(feed_key.cursor());
+
self.feeds.remove(&location_key_bytes)?;
+
self.feeds.remove(key_bytes)?;
+
if let Some(new_cursor) = candidate_new_feed_lower_cursor {
+
&TrimCollectionCursorKey::new(collection.clone()).to_db_bytes()?,
+
&new_cursor.to_db_bytes()?,
+
log::trace!("trim_collection ({collection:?}) removed {dangling_feed_keys_cleaned} dangling feed entries and {records_deleted} records (ended early? {ended_early})");
+
Ok((dangling_feed_keys_cleaned, records_deleted, ended_early))
fn delete_account(&mut self, did: &Did) -> Result<usize, StorageError> {
···
+
pub struct FjallBackground(FjallWriter);
+
impl StoreBackground for FjallBackground {
+
async fn run(mut self, backfill: bool) -> StorageResult<()> {
+
let mut dirty_nsids = HashSet::new();
+
// backfill condition here is iffy -- longer is good when doing the main ingest and then collection trims
+
// shorter once those are done helps things catch up
+
// the best setting for non-backfill is non-obvious.. it can be pretty slow and still be fine
+
tokio::time::interval(Duration::from_micros(if backfill { 100 } else { 32_000 }));
+
rollup.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
+
// backfill condition again iffy. collection trims should probably happen in their own phase.
+
let mut trim = tokio::time::interval(Duration::from_secs(if backfill { 18 } else { 9 }));
+
trim.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
+
let mut db = self.0.clone();
+
let (n, dirty) = tokio::task::spawn_blocking(move || db.step_rollup()).await??;
+
rollup.reset_after(Duration::from_millis(1_200)); // we're caught up, take a break
+
dirty_nsids.extend(dirty);
+
log::trace!("rolled up {n} items ({} collections now dirty)", dirty_nsids.len());
+
let n = dirty_nsids.len();
+
log::trace!("trimming {n} nsids: {dirty_nsids:?}");
+
let t0 = Instant::now();
+
let (mut total_danglers, mut total_deleted) = (0, 0);
+
let mut completed = HashSet::new();
+
for collection in &dirty_nsids {
+
let mut db = self.0.clone();
+
let c = collection.clone();
+
let (danglers, deleted, ended_early) = tokio::task::spawn_blocking(move || db.trim_collection(&c, 512, false)).await??;
+
total_danglers += danglers;
+
total_deleted += deleted;
+
completed.insert(collection.clone());
+
if total_deleted > 10_000_000 {
+
log::info!("trim stopped early, more than 10M records already deleted.");
+
dirty_nsids.remove(&c);
+
log::info!("finished trimming {n} nsids in {:?}: {total_danglers} dangling and {total_deleted} total removed.", t0.elapsed());
···
+
/// Set a value to a fixed key, erroring if the value already exists
+
/// Intended for single-threaded init: not safe under concurrency, since there
+
/// is no transaction between checking if the already exists and writing it.
+
fn init_static_neu<K: StaticStr>(
+
global: &PartitionHandle,
+
) -> StorageResult<()> {
+
let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
+
if global.get(&key_bytes)?.is_some() {
+
return Err(StorageError::InitError(format!(
+
"init failed: value for key {key_bytes:?} already exists"
+
let value_bytes = value.to_db_bytes()?;
+
global.insert(&key_bytes, &value_bytes)?;
/// Set a value to a fixed key
fn insert_batch_static_neu<K: StaticStr>(
···
////////// temp stuff to remove:
···
use serde_json::value::RawValue;
fn fjall_db() -> (FjallReader, FjallWriter) {
+
let (read, write, _, _) = FjallStorage::init(
tempfile::tempdir().unwrap(),
"offline test (no real jetstream endpoint)".to_string(),
···
const TEST_BATCH_LIMIT: usize = 16;
+
fn beginning() -> HourTruncatedCursor {
+
Cursor::from_start().into()
#[derive(Debug, Default)]
···
.entry(collection.clone())
+
.truncating_insert(commit, &[0u8; 16])
···
.entry(collection.clone())
+
.truncating_insert(commit, &[0u8; 16])
···
.entry(collection.clone())
+
.truncating_insert(commit, &[0u8; 16])
···
fn test_hello() -> anyhow::Result<()> {
let (read, mut write) = fjall_db();
write.insert_batch::<TEST_BATCH_LIMIT>(EventBatch::default())?;
+
} = read.get_collection_counts(
+
&Nsid::new("a.b.c".to_string()).unwrap(),
+
assert_eq!(creates, 0);
+
assert_eq!(dids_estimate, 0);
···
write.insert_batch(batch.batch)?;
+
} = read.get_collection_counts(&collection, beginning(), None)?;
+
assert_eq!(creates, 1);
+
assert_eq!(dids_estimate, 1);
+
} = read.get_collection_counts(
+
&Nsid::new("d.e.f".to_string()).unwrap(),
+
assert_eq!(creates, 0);
+
assert_eq!(dids_estimate, 0);
+
let records = read.get_records_by_collections([collection].into(), 2, false)?;
assert_eq!(records.len(), 1);
assert_eq!(rec.record.get(), "{}");
+
let records = read.get_records_by_collections(
+
[Nsid::new("d.e.f".to_string()).unwrap()].into(),
assert_eq!(records.len(), 0);
···
write.insert_batch(batch.batch)?;
let records = read.get_records_by_collections(
Nsid::new("a.a.a".to_string()).unwrap(),
Nsid::new("a.a.b".to_string()).unwrap(),
Nsid::new("a.a.c".to_string()).unwrap(),
···
write.insert_batch(batch.batch)?;
let records = read.get_records_by_collections(
Nsid::new("a.a.a".to_string()).unwrap(),
Nsid::new("a.a.b".to_string()).unwrap(),
Nsid::new("a.a.c".to_string()).unwrap(),
···
write.insert_batch(batch.batch)?;
+
} = read.get_collection_counts(&collection, beginning(), None)?;
+
assert_eq!(creates, 1);
+
assert_eq!(dids_estimate, 1);
+
let records = read.get_records_by_collections([collection].into(), 2, false)?;
assert_eq!(records.len(), 1);
assert_eq!(rec.record.get(), r#"{"ch": "ch-ch-ch-changes"}"#);
···
write.insert_batch(batch.batch)?;
+
} = read.get_collection_counts(&collection, beginning(), None)?;
+
assert_eq!(creates, 1);
+
assert_eq!(dids_estimate, 1);
+
let records = read.get_records_by_collections([collection].into(), 2, false)?;
assert_eq!(records.len(), 0);
···
write.insert_batch(batch.batch)?;
let records = read.get_records_by_collections(
+
HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]),
assert_eq!(records.len(), 1);
let records = read.get_records_by_collections(
+
HashSet::from([Nsid::new("a.a.b".to_string()).unwrap()]),
assert_eq!(records.len(), 10);
let records = read.get_records_by_collections(
+
HashSet::from([Nsid::new("a.a.c".to_string()).unwrap()]),
assert_eq!(records.len(), 1);
let records = read.get_records_by_collections(
+
HashSet::from([Nsid::new("a.a.d".to_string()).unwrap()]),
assert_eq!(records.len(), 0);
+
write.trim_collection(&Nsid::new("a.a.a".to_string()).unwrap(), 6, false)?;
+
write.trim_collection(&Nsid::new("a.a.b".to_string()).unwrap(), 6, false)?;
+
write.trim_collection(&Nsid::new("a.a.c".to_string()).unwrap(), 6, false)?;
+
write.trim_collection(&Nsid::new("a.a.d".to_string()).unwrap(), 6, false)?;
let records = read.get_records_by_collections(
+
HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]),
assert_eq!(records.len(), 1);
let records = read.get_records_by_collections(
+
HashSet::from([Nsid::new("a.a.b".to_string()).unwrap()]),
assert_eq!(records.len(), 6);
let records = read.get_records_by_collections(
+
HashSet::from([Nsid::new("a.a.c".to_string()).unwrap()]),
assert_eq!(records.len(), 1);
let records = read.get_records_by_collections(
+
HashSet::from([Nsid::new("a.a.d".to_string()).unwrap()]),
···
write.insert_batch(batch.batch)?;
let records = read.get_records_by_collections(
+
HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]),
···
assert_eq!(records_deleted, 2);
let records = read.get_records_by_collections(
+
HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]),
···
+
let records = read.get_records_by_collections(
+
[Nsid::new("a.a.a".to_string()).unwrap()].into(),
assert_eq!(records.len(), 0);
···
write.insert_batch(batch.batch)?;
+
let (n, _) = write.step_rollup()?;
let mut batch = TestBatch::default();
batch.delete_account("did:plc:person-a", 10_001);
write.insert_batch(batch.batch)?;
+
let records = read.get_records_by_collections(
+
[Nsid::new("a.a.a".to_string()).unwrap()].into(),
assert_eq!(records.len(), 1);
+
let (n, _) = write.step_rollup()?;
+
let records = read.get_records_by_collections(
+
[Nsid::new("a.a.a".to_string()).unwrap()].into(),
assert_eq!(records.len(), 0);
let mut batch = TestBatch::default();
batch.delete_account("did:plc:person-a", 9_999);
write.insert_batch(batch.batch)?;
+
let (n, _) = write.step_rollup()?;
···
write.insert_batch(batch.batch)?;
+
let (n, _) = write.step_rollup()?;
+
let (n, _) = write.step_rollup()?;
···
write.insert_batch(batch.batch)?;
+
} = read.get_collection_counts(
+
&Nsid::new("a.a.a".to_string()).unwrap(),
+
assert_eq!(creates, 0);
+
assert_eq!(dids_estimate, 0);
+
let (n, _) = write.step_rollup()?;
+
} = read.get_collection_counts(
+
&Nsid::new("a.a.a".to_string()).unwrap(),
+
assert_eq!(creates, 2);
+
assert_eq!(dids_estimate, 2);
// delete account rolled up
+
let (n, _) = write.step_rollup()?;
+
} = read.get_collection_counts(
+
&Nsid::new("a.a.a".to_string()).unwrap(),
+
assert_eq!(creates, 2);
+
assert_eq!(dids_estimate, 2);
// second batch rolled up
+
let (n, _) = write.step_rollup()?;
+
} = read.get_collection_counts(
+
&Nsid::new("a.a.a".to_string()).unwrap(),
+
assert_eq!(creates, 3);
+
assert_eq!(dids_estimate, 2);
+
let (n, _) = write.step_rollup()?;
+
fn get_prefix_children_lexi_empty() {
+
let (read, _) = fjall_db();
+
NsidPrefix::new("aaa.aaa").unwrap(),
+
OrderCollectionsBy::Lexi { cursor: None },
+
assert_eq!(creates, 0);
+
assert_eq!(dids_estimate, 0);
+
assert_eq!(children, vec![]);
+
assert_eq!(cursor, None);
+
fn get_prefix_excludes_exact_collection() -> anyhow::Result<()> {
let (read, mut write) = fjall_db();
let mut batch = TestBatch::default();
···
+
write.insert_batch(batch.batch)?;
+
NsidPrefix::new("a.a.a").unwrap(),
+
OrderCollectionsBy::Lexi { cursor: None },
+
assert_eq!(creates, 0);
+
assert_eq!(dids_estimate, 0);
+
assert_eq!(children, vec![]);
+
assert_eq!(cursor, None);
+
fn get_prefix_excludes_neighbour_collection() -> anyhow::Result<()> {
+
let (read, mut write) = fjall_db();
+
let mut batch = TestBatch::default();
+
write.insert_batch(batch.batch)?;
+
NsidPrefix::new("a.a.a").unwrap(),
+
OrderCollectionsBy::Lexi { cursor: None },
+
assert_eq!(creates, 0);
+
assert_eq!(dids_estimate, 0);
+
assert_eq!(children, vec![]);
+
assert_eq!(cursor, None);
+
fn get_prefix_includes_child_collection() -> anyhow::Result<()> {
+
let (read, mut write) = fjall_db();
+
let mut batch = TestBatch::default();
+
write.insert_batch(batch.batch)?;
+
NsidPrefix::new("a.a").unwrap(),
+
OrderCollectionsBy::Lexi { cursor: None },
+
assert_eq!(creates, 1);
+
assert_eq!(dids_estimate, 1);
+
vec![PrefixChild::Collection(NsidCount {
+
nsid: "a.a.a".to_string(),
+
assert_eq!(cursor, None);
+
fn get_prefix_includes_child_prefix() -> anyhow::Result<()> {
+
let (read, mut write) = fjall_db();
+
let mut batch = TestBatch::default();
write.insert_batch(batch.batch)?;
+
NsidPrefix::new("a.a").unwrap(),
+
OrderCollectionsBy::Lexi { cursor: None },
+
assert_eq!(creates, 1);
+
assert_eq!(dids_estimate, 1);
+
vec![PrefixChild::Prefix(PrefixCount {
+
prefix: "a.a.a".to_string(),
+
assert_eq!(cursor, None);
+
fn get_prefix_merges_child_prefixes() -> anyhow::Result<()> {
let (read, mut write) = fjall_db();
let mut batch = TestBatch::default();
write.insert_batch(batch.batch)?;
+
NsidPrefix::new("a.a").unwrap(),
+
OrderCollectionsBy::Lexi { cursor: None },
+
assert_eq!(creates, 2);
+
assert_eq!(dids_estimate, 1);
+
vec![PrefixChild::Prefix(PrefixCount {
+
prefix: "a.a.a".to_string(),
+
assert_eq!(cursor, None);
+
fn get_prefix_exact_and_child_and_prefix() -> anyhow::Result<()> {
+
let (read, mut write) = fjall_db();
+
let mut batch = TestBatch::default();
+
write.insert_batch(batch.batch)?;
+
NsidPrefix::new("a.a.a").unwrap(),
+
OrderCollectionsBy::Lexi { cursor: None },
+
assert_eq!(creates, 2);
+
assert_eq!(dids_estimate, 1);
+
PrefixChild::Collection(NsidCount {
+
nsid: "a.a.a.a".to_string(),
+
PrefixChild::Prefix(PrefixCount {
+
prefix: "a.a.a.a".to_string(),
+
assert_eq!(cursor, None);