Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Merge pull request #22 from at-microcosm/collection-stats

collection stats

+14
Cargo.lock
···
[[package]]
+
name = "serde_qs"
+
version = "1.0.0-rc.3"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "4cb0b9062a400c31442e67d1f2b1e7746bebd691110ebee1b7d0c7293b04fab1"
+
dependencies = [
+
"itoa",
+
"percent-encoding",
+
"ryu",
+
"serde",
+
"thiserror 2.0.12",
+
]
+
+
[[package]]
name = "serde_spanned"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
"semver",
"serde",
"serde_json",
+
"serde_qs",
"sha2",
"tempfile",
"thiserror 2.0.12",
+1
ufos/Cargo.toml
···
semver = "1.0.26"
serde = "1.0.219"
serde_json = "1.0.140"
+
serde_qs = "1.0.0-rc.3"
sha2 = "0.10.9"
thiserror = "2.0.12"
tokio = { version = "1.44.2", features = ["full", "sync", "time"] }
+2 -1
ufos/src/lib.rs
···
pub mod server;
pub mod storage;
pub mod storage_fjall;
-
pub mod storage_mem;
pub mod store_types;
use crate::error::BatchInsertError;
···
#[derive(Debug, Serialize, JsonSchema)]
pub struct JustCount {
creates: u64,
+
updates: u64,
+
deletes: u64,
dids_estimate: u64,
}
+17 -57
ufos/src/main.rs
···
use ufos::server;
use ufos::storage::{StorageWhatever, StoreBackground, StoreReader, StoreWriter};
use ufos::storage_fjall::FjallStorage;
-
use ufos::storage_mem::MemStorage;
use ufos::store_types::SketchSecretPrefix;
use ufos::{nice_duration, ConsumerInfo};
···
static GLOBAL: Jemalloc = Jemalloc;
/// Aggregate links in the at-mosphere
-
#[derive(Parser, Debug)]
+
#[derive(Parser, Debug, Clone)]
#[command(version, about, long_about = None)]
struct Args {
/// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:
···
/// todo: restore this
#[arg(long, action)]
pause_rw: bool,
-
/// DEBUG: use an in-memory store instead of fjall
-
#[arg(long, action)]
-
in_mem: bool,
/// reset the rollup cursor, scrape through missed things in the past (backfill)
#[arg(long, action)]
reroll: bool,
···
let args = Args::parse();
let jetstream = args.jetstream.clone();
-
if args.in_mem {
-
let (read_store, write_store, cursor, sketch_secret) = MemStorage::init(
-
args.data,
-
jetstream,
-
args.jetstream_force,
-
Default::default(),
-
)?;
-
go(
-
args.jetstream,
-
args.jetstream_fixture,
-
args.pause_writer,
-
args.backfill,
-
args.reroll,
-
read_store,
-
write_store,
-
cursor,
-
sketch_secret,
-
)
-
.await?;
-
} else {
-
let (read_store, write_store, cursor, sketch_secret) = FjallStorage::init(
-
args.data,
-
jetstream,
-
args.jetstream_force,
-
Default::default(),
-
)?;
-
go(
-
args.jetstream,
-
args.jetstream_fixture,
-
args.pause_writer,
-
args.backfill,
-
args.reroll,
-
read_store,
-
write_store,
-
cursor,
-
sketch_secret,
-
)
-
.await?;
-
}
-
+
let (read_store, write_store, cursor, sketch_secret) = FjallStorage::init(
+
args.data.clone(),
+
jetstream,
+
args.jetstream_force,
+
Default::default(),
+
)?;
+
go(args, read_store, write_store, cursor, sketch_secret).await?;
Ok(())
}
-
#[allow(clippy::too_many_arguments)]
async fn go<B: StoreBackground>(
-
jetstream: String,
-
jetstream_fixture: bool,
-
pause_writer: bool,
-
backfill: bool,
-
reroll: bool,
+
args: Args,
read_store: impl StoreReader + 'static + Clone,
mut write_store: impl StoreWriter<B> + 'static,
cursor: Option<Cursor>,
···
println!("starting server with storage...");
let serving = server::serve(read_store.clone());
-
if pause_writer {
+
if args.pause_writer {
log::info!("not starting jetstream or the write loop.");
serving.await.map_err(|e| anyhow::anyhow!(e))?;
return Ok(());
}
-
let batches = if jetstream_fixture {
-
log::info!("starting with jestream file fixture: {jetstream:?}");
-
file_consumer::consume(jetstream.into(), sketch_secret, cursor).await?
+
let batches = if args.jetstream_fixture {
+
log::info!("starting with jestream file fixture: {:?}", args.jetstream);
+
file_consumer::consume(args.jetstream.into(), sketch_secret, cursor).await?
} else {
log::info!(
"starting consumer with cursor: {cursor:?} from {:?} ago",
cursor.map(|c| c.elapsed())
);
-
consumer::consume(&jetstream, cursor, false, sketch_secret).await?
+
consumer::consume(&args.jetstream, cursor, false, sketch_secret).await?
};
-
let rolling = write_store.background_tasks(reroll)?.run(backfill);
+
let rolling = write_store
+
.background_tasks(args.reroll)?
+
.run(args.backfill);
let storing = write_store.receive_batches(batches);
let stating = do_update_stuff(read_store);
+58 -47
ufos/src/server.rs ufos/src/server/mod.rs
···
+
mod collections_query;
+
mod cors;
+
use crate::index_html::INDEX_HTML;
use crate::storage::StoreReader;
use crate::store_types::{HourTruncatedCursor, WeekTruncatedCursor};
use crate::{ConsumerInfo, Cursor, JustCount, Nsid, NsidCount, OrderCollectionsBy, UFOsRecord};
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _};
use chrono::{DateTime, Utc};
+
use collections_query::MultiCollectionQuery;
+
use cors::{OkCors, OkCorsResponse};
use dropshot::endpoint;
use dropshot::ApiDescription;
use dropshot::Body;
···
use dropshot::ConfigLogging;
use dropshot::ConfigLoggingLevel;
use dropshot::HttpError;
-
use dropshot::HttpResponseHeaders;
-
use dropshot::HttpResponseOk;
use dropshot::Query;
use dropshot::RequestContext;
use dropshot::ServerBuilder;
+
use http::{Response, StatusCode};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
···
}]
async fn get_openapi(ctx: RequestContext<Context>) -> OkCorsResponse<serde_json::Value> {
let spec = (*ctx.context().spec).clone();
-
ok_cors(spec)
+
OkCors(spec).into()
}
#[derive(Debug, Serialize, JsonSchema)]
···
storage: serde_json::Value,
consumer: ConsumerInfo,
}
-
/// Get meta information about UFOs itself
+
/// UFOs meta-info
#[endpoint {
method = GET,
path = "/meta"
···
.await
.map_err(failed_to_get("consumer info"))?;
-
ok_cors(MetaInfo {
+
OkCors(MetaInfo {
storage_name: storage.name(),
storage: storage_info,
consumer,
})
+
.into()
}
// TODO: replace with normal (🙃) multi-qs value somehow
···
}
}
}
-
/// Get recent records by collection
+
/// Record samples
+
///
+
/// Get most recent records seen in the firehose, by collection NSID
///
/// Multiple collections are supported. They will be delivered in one big array with no
/// specified order.
···
.map(|r| r.into())
.collect();
-
ok_cors(records)
+
OkCors(records).into()
}
#[derive(Debug, Deserialize, JsonSchema)]
-
struct TotalSeenCollectionsQuery {
-
collection: String, // JsonSchema not implemented for Nsid :(
+
struct CollectionsStatsQuery {
+
/// Limit stats to those seen after this UTC datetime
+
///
+
/// default: 1 week ago
+
since: Option<DateTime<Utc>>,
+
/// Limit stats to those seen before this UTC datetime
+
///
+
/// default: now
+
until: Option<DateTime<Utc>>,
}
-
#[derive(Debug, Serialize, JsonSchema)]
-
struct TotalCounts {
-
total_creates: u64,
-
dids_estimate: u64,
-
}
-
/// Get total records seen by collection
+
/// Collection stats
+
///
+
/// Get record statistics for collections during a specific time period.
+
///
+
/// Note: the statistics are "rolled up" into hourly buckets in the background,
+
/// so the data here can be as stale as that background task is behind. See the
+
/// meta info endpoint to find out how up-to-date the rollup currently is. (In
+
/// general it sholud be pretty close to live)
#[endpoint {
method = GET,
-
path = "/records/total-seen"
+
path = "/collections/stats"
}]
-
async fn get_records_total_seen(
+
async fn get_collection_stats(
ctx: RequestContext<Context>,
-
collection_query: Query<TotalSeenCollectionsQuery>,
-
) -> OkCorsResponse<HashMap<String, TotalCounts>> {
+
collections_query: MultiCollectionQuery,
+
query: Query<CollectionsStatsQuery>,
+
) -> OkCorsResponse<HashMap<String, JustCount>> {
let Context { storage, .. } = ctx.context();
+
let q = query.into_inner();
+
let collections: HashSet<Nsid> = collections_query.try_into()?;
-
let query = collection_query.into_inner();
-
let collections = to_multiple_nsids(&query.collection)
-
.map_err(|reason| HttpError::for_bad_request(None, reason))?;
+
let since = q.since.map(dt_to_cursor).transpose()?.unwrap_or_else(|| {
+
let week_ago_secs = 7 * 86_400;
+
let week_ago = SystemTime::now() - Duration::from_secs(week_ago_secs);
+
Cursor::at(week_ago).into()
+
});
+
+
let until = q.until.map(dt_to_cursor).transpose()?;
let mut seen_by_collection = HashMap::with_capacity(collections.len());
for collection in &collections {
-
let (total_creates, dids_estimate) = storage
-
.get_counts_by_collection(collection)
+
let counts = storage
+
.get_collection_counts(collection, since, until)
.await
.map_err(|e| HttpError::for_internal_error(format!("boooo: {e:?}")))?;
-
seen_by_collection.insert(
-
collection.to_string(),
-
TotalCounts {
-
total_creates,
-
dids_estimate,
-
},
-
);
+
seen_by_collection.insert(collection.to_string(), counts);
}
-
ok_cors(seen_by_collection)
+
OkCors(seen_by_collection).into()
}
#[derive(Debug, Serialize, JsonSchema)]
···
order: Option<CollectionsQueryOrder>,
}
-
/// Get collection with statistics
+
/// List collections
+
///
+
/// With statistics.
///
/// ## To fetch a full list:
///
···
let next_cursor = next_cursor.map(|c| URL_SAFE_NO_PAD.encode(c));
-
ok_cors(CollectionsResponse {
+
OkCors(CollectionsResponse {
collections,
cursor: next_cursor,
})
+
.into()
}
#[derive(Debug, Deserialize, JsonSchema)]
···
range: Vec<DateTime<Utc>>,
series: HashMap<String, Vec<JustCount>>,
}
-
/// Get timeseries data
+
/// Collection timeseries stats
#[endpoint {
method = GET,
path = "/timeseries"
···
let step = if let Some(secs) = q.step {
if secs < 3600 {
let msg = format!("step is too small: {}", secs);
-
return Err(HttpError::for_bad_request(None, msg));
+
Err(HttpError::for_bad_request(None, msg))?;
}
(secs / 3600) * 3600 // trucate to hour
} else {
···
.map(|(k, v)| (k.to_string(), v.iter().map(Into::into).collect()))
.collect();
-
ok_cors(CollectionTimeseriesResponse { range, series })
+
OkCors(CollectionTimeseriesResponse { range, series }).into()
}
pub async fn serve(storage: impl StoreReader + 'static) -> Result<(), String> {
···
api.register(get_openapi).unwrap();
api.register(get_meta_info).unwrap();
api.register(get_records_by_collections).unwrap();
-
api.register(get_records_total_seen).unwrap();
+
api.register(get_collection_stats).unwrap();
api.register(get_collections).unwrap();
api.register(get_timeseries).unwrap();
···
.map_err(|error| format!("failed to start server: {}", error))?
.await
}
-
-
/// awkward helpers
-
type OkCorsResponse<T> = Result<HttpResponseHeaders<HttpResponseOk<T>>, HttpError>;
-
fn ok_cors<T: Send + Sync + Serialize + JsonSchema>(t: T) -> OkCorsResponse<T> {
-
let mut res = HttpResponseHeaders::new_unnamed(HttpResponseOk(t));
-
res.headers_mut()
-
.insert("access-control-allow-origin", "*".parse().unwrap());
-
Ok(res)
-
}
+72
ufos/src/server/collections_query.rs
···
+
use crate::Nsid;
+
use async_trait::async_trait;
+
use dropshot::{
+
ApiEndpointBodyContentType, ExtractorMetadata, HttpError, Query, RequestContext, ServerContext,
+
SharedExtractor,
+
};
+
use schemars::JsonSchema;
+
use serde::Deserialize;
+
use std::collections::HashSet;
+
+
/// The real type that gets deserialized
+
#[derive(Debug, Deserialize, JsonSchema)]
+
pub struct MultiCollectionQuery {
+
pub collection: Vec<String>,
+
}
+
+
/// The fake corresponding type for docs that dropshot won't freak out about a
+
/// vec for
+
#[derive(Deserialize, JsonSchema)]
+
#[allow(dead_code)]
+
struct MultiCollectionQueryForDocs {
+
/// One or more collection [NSID](https://atproto.com/specs/nsid)s
+
///
+
/// Pass this parameter multiple times to specify multiple collections, like
+
/// `collection=app.bsky.feed.like&collection=app.bsky.feed.post`
+
collection: String,
+
}
+
+
impl TryFrom<MultiCollectionQuery> for HashSet<Nsid> {
+
type Error = HttpError;
+
fn try_from(mcq: MultiCollectionQuery) -> Result<Self, Self::Error> {
+
let mut out = HashSet::with_capacity(mcq.collection.len());
+
for c in mcq.collection {
+
let nsid = Nsid::new(c).map_err(|e| {
+
HttpError::for_bad_request(
+
None,
+
format!("failed to convert collection to an NSID: {e:?}"),
+
)
+
})?;
+
out.insert(nsid);
+
}
+
Ok(out)
+
}
+
}
+
+
// The `SharedExtractor` implementation for Query<QueryType> describes how to
+
// construct an instance of `Query<QueryType>` from an HTTP request: namely, by
+
// parsing the query string to an instance of `QueryType`.
+
#[async_trait]
+
impl SharedExtractor for MultiCollectionQuery {
+
async fn from_request<Context: ServerContext>(
+
ctx: &RequestContext<Context>,
+
) -> Result<MultiCollectionQuery, HttpError> {
+
let raw_query = ctx.request.uri().query().unwrap_or("");
+
let q = serde_qs::from_str(raw_query).map_err(|e| {
+
HttpError::for_bad_request(None, format!("unable to parse query string: {}", e))
+
})?;
+
Ok(q)
+
}
+
+
fn metadata(body_content_type: ApiEndpointBodyContentType) -> ExtractorMetadata {
+
// HACK: query type switcheroo: passing MultiCollectionQuery to
+
// `metadata` would "helpfully" panic because dropshot believes we can
+
// only have scalar types in a query.
+
//
+
// so instead we have a fake second type whose only job is to look the
+
// same as MultiCollectionQuery exept that it has `String` instead of
+
// `Vec<String>`, which dropshot will accept, and generate ~close-enough
+
// docs for.
+
<Query<MultiCollectionQueryForDocs> as SharedExtractor>::metadata(body_content_type)
+
}
+
}
+23
ufos/src/server/cors.rs
···
+
use dropshot::{HttpError, HttpResponseHeaders, HttpResponseOk};
+
use schemars::JsonSchema;
+
use serde::Serialize;
+
+
pub type OkCorsResponse<T> = Result<HttpResponseHeaders<HttpResponseOk<T>>, HttpError>;
+
+
/// Helper for constructing Ok responses: return OkCors(T).into()
+
/// (not happy with this yet)
+
pub struct OkCors<T: Serialize + JsonSchema + Send + Sync>(pub T);
+
+
impl<T> From<OkCors<T>> for OkCorsResponse<T>
+
where
+
T: Serialize + JsonSchema + Send + Sync,
+
{
+
fn from(ok: OkCors<T>) -> OkCorsResponse<T> {
+
let mut res = HttpResponseHeaders::new_unnamed(HttpResponseOk(ok.0));
+
res.headers_mut()
+
.insert("access-control-allow-origin", "*".parse().unwrap());
+
Ok(res)
+
}
+
}
+
+
// TODO: cors for HttpError
+8 -3
ufos/src/storage.rs
···
use crate::store_types::{CountsValue, HourTruncatedCursor, SketchSecretPrefix};
use crate::{
-
error::StorageError, ConsumerInfo, Cursor, EventBatch, NsidCount, OrderCollectionsBy,
-
UFOsRecord,
+
error::StorageError, ConsumerInfo, Cursor, EventBatch, JustCount, NsidCount,
+
OrderCollectionsBy, UFOsRecord,
};
use async_trait::async_trait;
use jetstream::exports::{Did, Nsid};
···
step: u64,
) -> StorageResult<(Vec<HourTruncatedCursor>, HashMap<Nsid, Vec<CountsValue>>)>;
-
async fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)>;
+
async fn get_collection_counts(
+
&self,
+
collection: &Nsid,
+
since: HourTruncatedCursor,
+
until: Option<HourTruncatedCursor>,
+
) -> StorageResult<JustCount>;
async fn get_records_by_collections(
&self,
+129 -67
ufos/src/storage_fjall.rs
···
WEEK_IN_MICROS,
};
use crate::{
-
nice_duration, CommitAction, ConsumerInfo, Did, EventBatch, Nsid, NsidCount,
+
nice_duration, CommitAction, ConsumerInfo, Did, EventBatch, JustCount, Nsid, NsidCount,
OrderCollectionsBy, UFOsRecord,
};
use async_trait::async_trait;
···
Ok((output_hours, output_series))
}
-
fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> {
-
// 0. grab a snapshot in case rollups happen while we're working
-
let instant = self.keyspace.instant();
-
let global = self.global.snapshot_at(instant);
-
let rollups = self.rollups.snapshot_at(instant);
+
fn get_collection_counts(
+
&self,
+
collection: &Nsid,
+
since: HourTruncatedCursor,
+
until: Option<HourTruncatedCursor>,
+
) -> StorageResult<JustCount> {
+
// grab snapshots in case rollups happen while we're working
+
let rollups = self.rollups.snapshot();
-
// 1. all-time counts
-
let all_time_key = AllTimeRollupKey::new(collection).to_db_bytes()?;
-
let mut total_counts = rollups
-
.get(&all_time_key)?
-
.as_deref()
-
.map(db_complete::<CountsValue>)
-
.transpose()?
-
.unwrap_or_default();
+
let until = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into());
+
let buckets = CursorBucket::buckets_spanning(since, until);
+
let mut total_counts = CountsValue::default();
-
// 2. live counts that haven't been rolled into all-time yet.
-
let rollup_cursor =
-
get_snapshot_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&global)?.ok_or(
-
StorageError::BadStateError("Could not find current rollup cursor".to_string()),
-
)?;
-
-
let full_range = LiveCountsKey::range_from_cursor(rollup_cursor)?;
-
for kv in rollups.range(full_range) {
-
let (key_bytes, val_bytes) = kv?;
-
let key = db_complete::<LiveCountsKey>(&key_bytes)?;
-
if key.collection() == collection {
-
let counts = db_complete::<CountsValue>(&val_bytes)?;
-
total_counts.merge(&counts);
-
}
+
for bucket in buckets {
+
let key = match bucket {
+
CursorBucket::Hour(t) => HourlyRollupKey::new(t, collection).to_db_bytes()?,
+
CursorBucket::Week(t) => WeeklyRollupKey::new(t, collection).to_db_bytes()?,
+
CursorBucket::AllTime => unreachable!(), // TODO: fall back on this if the time span spans the whole dataset?
+
};
+
let count = rollups
+
.get(&key)?
+
.as_deref()
+
.map(db_complete::<CountsValue>)
+
.transpose()?
+
.unwrap_or_default();
+
total_counts.merge(&count);
}
-
Ok((
-
total_counts.counts().creates,
-
total_counts.dids().estimate() as u64,
-
))
+
+
Ok((&total_counts).into())
}
fn get_records_by_collections(
···
})
.await?
}
-
async fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> {
+
async fn get_collection_counts(
+
&self,
+
collection: &Nsid,
+
since: HourTruncatedCursor,
+
until: Option<HourTruncatedCursor>,
+
) -> StorageResult<JustCount> {
let s = self.clone();
let collection = collection.clone();
-
tokio::task::spawn_blocking(move || FjallReader::get_counts_by_collection(&s, &collection))
-
.await?
+
tokio::task::spawn_blocking(move || {
+
FjallReader::get_collection_counts(&s, &collection, since, until)
+
})
+
.await?
}
async fn get_records_by_collections(
&self,
···
const TEST_BATCH_LIMIT: usize = 16;
+
fn beginning() -> HourTruncatedCursor {
+
Cursor::from_start().into()
+
}
#[derive(Debug, Default)]
struct TestBatch {
···
fn test_hello() -> anyhow::Result<()> {
let (read, mut write) = fjall_db();
write.insert_batch::<TEST_BATCH_LIMIT>(EventBatch::default())?;
-
let (records, dids) =
-
read.get_counts_by_collection(&Nsid::new("a.b.c".to_string()).unwrap())?;
-
assert_eq!(records, 0);
-
assert_eq!(dids, 0);
+
let JustCount {
+
creates,
+
dids_estimate,
+
..
+
} = read.get_collection_counts(
+
&Nsid::new("a.b.c".to_string()).unwrap(),
+
beginning(),
+
None,
+
)?;
+
assert_eq!(creates, 0);
+
assert_eq!(dids_estimate, 0);
Ok(())
···
100,
);
write.insert_batch(batch.batch)?;
+
write.step_rollup()?;
-
let (records, dids) = read.get_counts_by_collection(&collection)?;
-
assert_eq!(records, 1);
-
assert_eq!(dids, 1);
-
let (records, dids) =
-
read.get_counts_by_collection(&Nsid::new("d.e.f".to_string()).unwrap())?;
-
assert_eq!(records, 0);
-
assert_eq!(dids, 0);
+
let JustCount {
+
creates,
+
dids_estimate,
+
..
+
} = read.get_collection_counts(&collection, beginning(), None)?;
+
assert_eq!(creates, 1);
+
assert_eq!(dids_estimate, 1);
+
let JustCount {
+
creates,
+
dids_estimate,
+
..
+
} = read.get_collection_counts(
+
&Nsid::new("d.e.f".to_string()).unwrap(),
+
beginning(),
+
None,
+
)?;
+
assert_eq!(creates, 0);
+
assert_eq!(dids_estimate, 0);
let records = read.get_records_by_collections([collection].into(), 2, false)?;
assert_eq!(records.len(), 1);
···
101,
);
write.insert_batch(batch.batch)?;
+
write.step_rollup()?;
-
let (records, dids) = read.get_counts_by_collection(&collection)?;
-
assert_eq!(records, 1);
-
assert_eq!(dids, 1);
+
let JustCount {
+
creates,
+
dids_estimate,
+
..
+
} = read.get_collection_counts(&collection, beginning(), None)?;
+
assert_eq!(creates, 1);
+
assert_eq!(dids_estimate, 1);
let records = read.get_records_by_collections([collection].into(), 2, false)?;
assert_eq!(records.len(), 1);
···
101,
);
write.insert_batch(batch.batch)?;
+
write.step_rollup()?;
-
let (creates, dids) = read.get_counts_by_collection(&collection)?;
+
let JustCount {
+
creates,
+
dids_estimate,
+
..
+
} = read.get_collection_counts(&collection, beginning(), None)?;
assert_eq!(creates, 1);
-
assert_eq!(dids, 1);
+
assert_eq!(dids_estimate, 1);
let records = read.get_records_by_collections([collection].into(), 2, false)?;
assert_eq!(records.len(), 0);
···
write.insert_batch(batch.batch)?;
// before any rollup
-
let (records, dids) =
-
read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?;
-
assert_eq!(records, 3);
-
assert_eq!(dids, 2);
+
let JustCount {
+
creates,
+
dids_estimate,
+
..
+
} = read.get_collection_counts(
+
&Nsid::new("a.a.a".to_string()).unwrap(),
+
beginning(),
+
None,
+
)?;
+
assert_eq!(creates, 0);
+
assert_eq!(dids_estimate, 0);
// first batch rolled up
let (n, _) = write.step_rollup()?;
assert_eq!(n, 1);
-
let (records, dids) =
-
read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?;
-
assert_eq!(records, 3);
-
assert_eq!(dids, 2);
+
let JustCount {
+
creates,
+
dids_estimate,
+
..
+
} = read.get_collection_counts(
+
&Nsid::new("a.a.a".to_string()).unwrap(),
+
beginning(),
+
None,
+
)?;
+
assert_eq!(creates, 2);
+
assert_eq!(dids_estimate, 2);
// delete account rolled up
let (n, _) = write.step_rollup()?;
assert_eq!(n, 1);
-
let (records, dids) =
-
read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?;
-
assert_eq!(records, 3);
-
assert_eq!(dids, 2);
+
let JustCount {
+
creates,
+
dids_estimate,
+
..
+
} = read.get_collection_counts(
+
&Nsid::new("a.a.a".to_string()).unwrap(),
+
beginning(),
+
None,
+
)?;
+
assert_eq!(creates, 2);
+
assert_eq!(dids_estimate, 2);
// second batch rolled up
let (n, _) = write.step_rollup()?;
assert_eq!(n, 1);
-
let (records, dids) =
-
read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?;
-
assert_eq!(records, 3);
-
assert_eq!(dids, 2);
+
let JustCount {
+
creates,
+
dids_estimate,
+
..
+
} = read.get_collection_counts(
+
&Nsid::new("a.a.a".to_string()).unwrap(),
+
beginning(),
+
None,
+
)?;
+
assert_eq!(creates, 3);
+
assert_eq!(dids_estimate, 2);
// no more rollups left
let (n, _) = write.step_rollup()?;
-1757
ufos/src/storage_mem.rs
···
-
use std::ops::Bound;
-
use std::sync::Arc;
-
-
use crate::db_types::{db_complete, DbBytes, DbStaticStr, StaticStr};
-
use crate::error::StorageError;
-
use crate::storage::{StorageResult, StorageWhatever, StoreBackground, StoreReader, StoreWriter};
-
use crate::store_types::{
-
AllTimeRollupKey, CommitCounts, CountsValue, DeleteAccountQueueKey, DeleteAccountQueueVal,
-
HourTruncatedCursor, HourlyRollupKey, JetstreamCursorKey, JetstreamCursorValue,
-
JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey, NewRollupCursorKey,
-
NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal, RecordLocationKey,
-
RecordLocationMeta, RecordLocationVal, RecordRawValue, SketchSecretPrefix, TakeoffKey,
-
TakeoffValue, WeekTruncatedCursor, WeeklyRollupKey, WithCollection,
-
};
-
use crate::{
-
CommitAction, ConsumerInfo, Did, EventBatch, Nsid, NsidCount, OrderCollectionsBy, UFOsRecord,
-
};
-
use async_trait::async_trait;
-
use jetstream::events::Cursor;
-
use lsm_tree::range::prefix_to_range;
-
use std::collections::{BTreeMap, HashMap, HashSet};
-
use std::path::Path;
-
use std::sync::{Mutex, RwLock};
-
use std::time::SystemTime;
-
-
const MAX_BATCHED_CLEANUP_SIZE: usize = 1024; // try to commit progress for longer feeds
-
const MAX_BATCHED_ACCOUNT_DELETE_RECORDS: usize = 1024;
-
const MAX_BATCHED_ROLLUP_COUNTS: usize = 256;
-
-
///
-
/// new data format, roughly:
-
///
-
/// Partion: 'global'
-
///
-
/// - Global sequence counter (is the jetstream cursor -- monotonic with many gaps)
-
/// - key: "js_cursor" (literal)
-
/// - val: u64
-
///
-
/// - Jetstream server endpoint (persisted because the cursor can't be used on another instance without data loss)
-
/// - key: "js_endpoint" (literal)
-
/// - val: string (URL of the instance)
-
///
-
/// - Launch date
-
/// - key: "takeoff" (literal)
-
/// - val: u64 (micros timestamp, not from jetstream for now so not precise)
-
///
-
/// - Rollup cursor (bg work: roll stats into hourlies, delete accounts, old record deletes)
-
/// - key: "rollup_cursor" (literal)
-
/// - val: u64 (tracks behind js_cursor)
-
///
-
///
-
/// Partition: 'feed'
-
///
-
/// - Per-collection list of record references ordered by jetstream cursor
-
/// - key: nullstr || u64 (collection nsid null-terminated, jetstream cursor)
-
/// - val: nullstr || nullstr || nullstr (did, rkey, rev. rev is mostly a sanity-check for now.)
-
///
-
///
-
/// Partition: 'records'
-
///
-
/// - Actual records by their atproto location
-
/// - key: nullstr || nullstr || nullstr (did, collection, rkey)
-
/// - val: u64 || bool || nullstr || rawval (js_cursor, is_update, rev, actual record)
-
///
-
///
-
/// Partition: 'rollups'
-
///
-
/// - Live (batched) records counts and dids estimate per collection
-
/// - key: "live_counts" || u64 || nullstr (js_cursor, nsid)
-
/// - val: u64 || HLL (count (not cursor), estimator)
-
///
-
/// - Hourly total record counts and dids estimate per collection
-
/// - key: "hourly_counts" || u64 || nullstr (hour, nsid)
-
/// - val: u64 || HLL (count (not cursor), estimator)
-
///
-
/// - Weekly total record counts and dids estimate per collection
-
/// - key: "weekly_counts" || u64 || nullstr (hour, nsid)
-
/// - val: u64 || HLL (count (not cursor), estimator)
-
///
-
/// - All-time total record counts and dids estimate per collection
-
/// - key: "ever_counts" || nullstr (nsid)
-
/// - val: u64 || HLL (count (not cursor), estimator)
-
///
-
/// - TODO: sorted indexes for all-times?
-
///
-
///
-
/// Partition: 'queues'
-
///
-
/// - Delete account queue
-
/// - key: "delete_acount" || u64 (js_cursor)
-
/// - val: nullstr (did)
-
///
-
///
-
/// TODO: moderation actions
-
/// TODO: account privacy preferences. Might wait for the protocol-level (PDS-level?) stuff to land. Will probably do lazy fetching + caching on read.
-
#[derive(Debug)]
-
pub struct MemStorage {}
-
-
#[derive(Debug, Default)]
-
pub struct MemConfig {
-
/// drop the db when the storage is dropped
-
///
-
/// this is only meant for tests
-
#[cfg(test)]
-
pub temp: bool,
-
}
-
-
////////////
-
////////////
-
////////////
-
////////////
-
////////////
-
////////////
-
-
struct BatchSentinel {}
-
-
#[derive(Clone)]
-
struct MemKeyspace {
-
keyspace_guard: Arc<RwLock<BatchSentinel>>,
-
}
-
-
impl MemKeyspace {
-
pub fn open() -> Self {
-
Self {
-
keyspace_guard: Arc::new(RwLock::new(BatchSentinel {})),
-
}
-
}
-
pub fn open_partition(&self, _name: &str) -> StorageResult<MemPartion> {
-
Ok(MemPartion {
-
// name: name.to_string(),
-
keyspace_guard: self.keyspace_guard.clone(),
-
contents: Default::default(),
-
})
-
}
-
pub fn batch(&self) -> MemBatch {
-
MemBatch {
-
keyspace_guard: self.keyspace_guard.clone(),
-
tasks: Vec::new(),
-
}
-
}
-
pub fn instant(&self) -> u64 {
-
1
-
}
-
}
-
-
enum BatchTask {
-
Insert {
-
p: MemPartion,
-
key: Vec<u8>,
-
val: Vec<u8>,
-
},
-
Remove {
-
p: MemPartion,
-
key: Vec<u8>,
-
},
-
}
-
struct MemBatch {
-
keyspace_guard: Arc<RwLock<BatchSentinel>>,
-
tasks: Vec<BatchTask>,
-
}
-
impl MemBatch {
-
pub fn insert(&mut self, p: &MemPartion, key: &[u8], val: &[u8]) {
-
self.tasks.push(BatchTask::Insert {
-
p: p.clone(),
-
key: key.to_vec(),
-
val: val.to_vec(),
-
});
-
}
-
pub fn remove(&mut self, p: &MemPartion, key: &[u8]) {
-
self.tasks.push(BatchTask::Remove {
-
p: p.clone(),
-
key: key.to_vec(),
-
});
-
}
-
pub fn len(&self) -> usize {
-
self.tasks.len()
-
}
-
pub fn commit(&mut self) -> StorageResult<()> {
-
let _guard = self.keyspace_guard.write().unwrap();
-
for task in &mut self.tasks {
-
match task {
-
BatchTask::Insert { p, key, val } => p
-
.contents
-
.try_lock()
-
.unwrap()
-
.insert(key.to_vec(), val.to_vec()),
-
BatchTask::Remove { p, key } => p.contents.try_lock().unwrap().remove(key),
-
};
-
}
-
Ok(())
-
}
-
}
-
-
#[derive(Clone)]
-
struct MemPartion {
-
// name: String,
-
keyspace_guard: Arc<RwLock<BatchSentinel>>,
-
contents: Arc<Mutex<BTreeMap<Vec<u8>, Vec<u8>>>>,
-
}
-
impl MemPartion {
-
pub fn get(&self, key: &[u8]) -> StorageResult<Option<Vec<u8>>> {
-
let _guard = self.keyspace_guard.read().unwrap();
-
Ok(self.contents.lock().unwrap().get(key).cloned())
-
}
-
pub fn prefix(&self, pre: &[u8]) -> Vec<StorageResult<(Vec<u8>, Vec<u8>)>> {
-
// let prefix_bytes = prefix.to_db_bytes()?;
-
let (_, Bound::Excluded(range_end)) = prefix_to_range(pre) else {
-
panic!("bad range thing");
-
};
-
-
return self.range(pre.to_vec()..range_end.to_vec());
-
}
-
pub fn range(&self, r: std::ops::Range<Vec<u8>>) -> Vec<StorageResult<(Vec<u8>, Vec<u8>)>> {
-
let _guard = self.keyspace_guard.read().unwrap();
-
self.contents
-
.lock()
-
.unwrap()
-
.range(r)
-
.map(|(k, v)| Ok((k.clone(), v.clone())))
-
.collect()
-
}
-
pub fn insert(&self, key: &[u8], val: &[u8]) -> StorageResult<()> {
-
let _guard = self.keyspace_guard.read().unwrap();
-
self.contents
-
.lock()
-
.unwrap()
-
.insert(key.to_vec(), val.to_vec());
-
Ok(())
-
}
-
// pub fn remove(&self, key: &[u8]) -> StorageResult<()> {
-
// let _guard = self.keyspace_guard.read().unwrap();
-
// self.contents
-
// .lock()
-
// .unwrap()
-
// .remove(key);
-
// Ok(())
-
// }
-
pub fn snapshot_at(&self, _instant: u64) -> Self {
-
self.clone()
-
}
-
pub fn snapshot(&self) -> Self {
-
self.clone()
-
}
-
}
-
-
////////////
-
////////////
-
////////////
-
////////////
-
////////////
-
////////////
-
-
impl StorageWhatever<MemReader, MemWriter, MemBackground, MemConfig> for MemStorage {
-
fn init(
-
_path: impl AsRef<Path>,
-
endpoint: String,
-
force_endpoint: bool,
-
_config: MemConfig,
-
) -> StorageResult<(MemReader, MemWriter, Option<Cursor>, SketchSecretPrefix)> {
-
let keyspace = MemKeyspace::open();
-
-
let global = keyspace.open_partition("global")?;
-
let feeds = keyspace.open_partition("feeds")?;
-
let records = keyspace.open_partition("records")?;
-
let rollups = keyspace.open_partition("rollups")?;
-
let queues = keyspace.open_partition("queues")?;
-
-
let js_cursor = get_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?;
-
-
if js_cursor.is_some() {
-
let stored_endpoint =
-
get_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)?;
-
-
let JetstreamEndpointValue(stored) = stored_endpoint.ok_or(StorageError::InitError(
-
"found cursor but missing js_endpoint, refusing to start.".to_string(),
-
))?;
-
-
if stored != endpoint {
-
if force_endpoint {
-
log::warn!("forcing a jetstream switch from {stored:?} to {endpoint:?}");
-
insert_static_neu::<JetstreamEndpointKey>(
-
&global,
-
JetstreamEndpointValue(endpoint.to_string()),
-
)?;
-
} else {
-
return Err(StorageError::InitError(format!(
-
"stored js_endpoint {stored:?} differs from provided {endpoint:?}, refusing to start.")));
-
}
-
}
-
} else {
-
insert_static_neu::<JetstreamEndpointKey>(
-
&global,
-
JetstreamEndpointValue(endpoint.to_string()),
-
)?;
-
insert_static_neu::<TakeoffKey>(&global, Cursor::at(SystemTime::now()))?;
-
insert_static_neu::<NewRollupCursorKey>(&global, Cursor::from_start())?;
-
}
-
-
let reader = MemReader {
-
keyspace: keyspace.clone(),
-
global: global.clone(),
-
feeds: feeds.clone(),
-
records: records.clone(),
-
rollups: rollups.clone(),
-
};
-
let writer = MemWriter {
-
keyspace,
-
global,
-
feeds,
-
records,
-
rollups,
-
queues,
-
};
-
let secret_prefix = [0u8; 16]; // in-mem store is always deterministic: no secret
-
Ok((reader, writer, js_cursor, secret_prefix))
-
}
-
}
-
-
type MemRKV = StorageResult<(Vec<u8>, Vec<u8>)>;
-
-
#[derive(Clone)]
-
pub struct MemReader {
-
keyspace: MemKeyspace,
-
global: MemPartion,
-
feeds: MemPartion,
-
records: MemPartion,
-
rollups: MemPartion,
-
}
-
-
/// An iterator that knows how to skip over deleted/invalidated records
-
struct RecordIterator {
-
db_iter: Box<dyn Iterator<Item = MemRKV>>,
-
records: MemPartion,
-
limit: usize,
-
fetched: usize,
-
}
-
impl RecordIterator {
-
pub fn new(
-
feeds: &MemPartion,
-
records: MemPartion,
-
collection: &Nsid,
-
limit: usize,
-
) -> StorageResult<Self> {
-
let prefix = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?;
-
let db_iter = feeds.prefix(&prefix).into_iter().rev();
-
Ok(Self {
-
db_iter: Box::new(db_iter),
-
records,
-
limit,
-
fetched: 0,
-
})
-
}
-
fn get_record(&self, db_next: MemRKV) -> StorageResult<Option<UFOsRecord>> {
-
let (key_bytes, val_bytes) = db_next?;
-
let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?;
-
let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?;
-
let location_key: RecordLocationKey = (&feed_key, &feed_val).into();
-
-
let Some(location_val_bytes) = self.records.get(&location_key.to_db_bytes()?)? else {
-
// record was deleted (hopefully)
-
return Ok(None);
-
};
-
-
let (meta, n) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?;
-
-
if meta.cursor() != feed_key.cursor() {
-
// older/different version
-
return Ok(None);
-
}
-
if meta.rev != feed_val.rev() {
-
// weird...
-
log::warn!("record lookup: cursor match but rev did not...? excluding.");
-
return Ok(None);
-
}
-
let Some(raw_value_bytes) = location_val_bytes.get(n..) else {
-
log::warn!(
-
"record lookup: found record but could not get bytes to decode the record??"
-
);
-
return Ok(None);
-
};
-
let rawval = db_complete::<RecordRawValue>(raw_value_bytes)?;
-
Ok(Some(UFOsRecord {
-
collection: feed_key.collection().clone(),
-
cursor: feed_key.cursor(),
-
did: feed_val.did().clone(),
-
rkey: feed_val.rkey().clone(),
-
rev: meta.rev.to_string(),
-
record: rawval.try_into()?,
-
is_update: meta.is_update,
-
}))
-
}
-
}
-
impl Iterator for RecordIterator {
-
type Item = StorageResult<Option<UFOsRecord>>;
-
fn next(&mut self) -> Option<Self::Item> {
-
if self.fetched == self.limit {
-
return Some(Ok(None));
-
}
-
let record = loop {
-
let db_next = self.db_iter.next()?; // None short-circuits here
-
match self.get_record(db_next) {
-
Err(e) => return Some(Err(e)),
-
Ok(Some(record)) => break record,
-
Ok(None) => continue,
-
}
-
};
-
self.fetched += 1;
-
Some(Ok(Some(record)))
-
}
-
}
-
-
impl MemReader {
-
fn get_storage_stats(&self) -> StorageResult<serde_json::Value> {
-
let rollup_cursor =
-
get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?
-
.map(|c| c.to_raw_u64());
-
-
Ok(serde_json::json!({
-
"rollup_cursor": rollup_cursor,
-
}))
-
}
-
-
fn get_consumer_info(&self) -> StorageResult<ConsumerInfo> {
-
let global = self.global.snapshot();
-
-
let endpoint =
-
get_snapshot_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)?
-
.ok_or(StorageError::BadStateError(
-
"Could not find jetstream endpoint".to_string(),
-
))?
-
.0;
-
-
let started_at = get_snapshot_static_neu::<TakeoffKey, TakeoffValue>(&global)?
-
.ok_or(StorageError::BadStateError(
-
"Could not find jetstream takeoff time".to_string(),
-
))?
-
.to_raw_u64();
-
-
let latest_cursor =
-
get_snapshot_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?
-
.map(|c| c.to_raw_u64());
-
-
let rollup_cursor =
-
get_snapshot_static_neu::<NewRollupCursorKey, JetstreamCursorValue>(&global)?
-
.map(|c| c.to_raw_u64());
-
-
Ok(ConsumerInfo::Jetstream {
-
endpoint,
-
started_at,
-
latest_cursor,
-
rollup_cursor,
-
})
-
}
-
-
fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> {
-
// 0. grab a snapshot in case rollups happen while we're working
-
let instant = self.keyspace.instant();
-
let global = self.global.snapshot_at(instant);
-
let rollups = self.rollups.snapshot_at(instant);
-
-
// 1. all-time counts
-
let all_time_key = AllTimeRollupKey::new(collection).to_db_bytes()?;
-
let mut total_counts = rollups
-
.get(&all_time_key)?
-
.as_deref()
-
.map(db_complete::<CountsValue>)
-
.transpose()?
-
.unwrap_or_default();
-
-
// 2. live counts that haven't been rolled into all-time yet.
-
let rollup_cursor =
-
get_snapshot_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&global)?.ok_or(
-
StorageError::BadStateError("Could not find current rollup cursor".to_string()),
-
)?;
-
-
let full_range = LiveCountsKey::range_from_cursor(rollup_cursor)?;
-
for kv in rollups.range(full_range) {
-
let (key_bytes, val_bytes) = kv?;
-
let key = db_complete::<LiveCountsKey>(&key_bytes)?;
-
if key.collection() == collection {
-
let counts = db_complete::<CountsValue>(&val_bytes)?;
-
total_counts.merge(&counts);
-
}
-
}
-
Ok((
-
total_counts.counts().creates,
-
total_counts.dids().estimate() as u64,
-
))
-
}
-
-
fn get_records_by_collections(
-
&self,
-
collections: HashSet<Nsid>,
-
limit: usize,
-
_expand_each_collection: bool,
-
) -> StorageResult<Vec<UFOsRecord>> {
-
if collections.is_empty() {
-
return Ok(vec![]);
-
}
-
let mut record_iterators = Vec::new();
-
for collection in collections {
-
let iter = RecordIterator::new(&self.feeds, self.records.clone(), &collection, limit)?;
-
record_iterators.push(iter.peekable());
-
}
-
let mut merged = Vec::new();
-
loop {
-
let mut latest: Option<(Cursor, usize)> = None; // ugh
-
for (i, iter) in record_iterators.iter_mut().enumerate() {
-
let Some(it) = iter.peek_mut() else {
-
continue;
-
};
-
let it = match it {
-
Ok(v) => v,
-
Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?,
-
};
-
let Some(rec) = it else {
-
break;
-
};
-
if let Some((cursor, _)) = latest {
-
if rec.cursor > cursor {
-
latest = Some((rec.cursor, i))
-
}
-
} else {
-
latest = Some((rec.cursor, i));
-
}
-
}
-
let Some((_, idx)) = latest else {
-
break;
-
};
-
// yeah yeah whateverrrrrrrrrrrrrrrr
-
merged.push(record_iterators[idx].next().unwrap().unwrap().unwrap());
-
}
-
Ok(merged)
-
}
-
}
-
-
#[async_trait]
-
impl StoreReader for MemReader {
-
fn name(&self) -> String {
-
"in-memory store".into()
-
}
-
async fn get_storage_stats(&self) -> StorageResult<serde_json::Value> {
-
let s = self.clone();
-
tokio::task::spawn_blocking(move || MemReader::get_storage_stats(&s)).await?
-
}
-
async fn get_consumer_info(&self) -> StorageResult<ConsumerInfo> {
-
let s = self.clone();
-
tokio::task::spawn_blocking(move || MemReader::get_consumer_info(&s)).await?
-
}
-
async fn get_collections(
-
&self,
-
_: usize,
-
_: OrderCollectionsBy,
-
_: Option<HourTruncatedCursor>,
-
_: Option<HourTruncatedCursor>,
-
) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> {
-
todo!()
-
}
-
async fn get_timeseries(
-
&self,
-
_: Vec<Nsid>,
-
_: HourTruncatedCursor,
-
_: Option<HourTruncatedCursor>,
-
_: u64,
-
) -> StorageResult<(Vec<HourTruncatedCursor>, HashMap<Nsid, Vec<CountsValue>>)> {
-
todo!()
-
}
-
async fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> {
-
let s = self.clone();
-
let collection = collection.clone();
-
tokio::task::spawn_blocking(move || MemReader::get_counts_by_collection(&s, &collection))
-
.await?
-
}
-
async fn get_records_by_collections(
-
&self,
-
collections: HashSet<Nsid>,
-
limit: usize,
-
expand_each_collection: bool,
-
) -> StorageResult<Vec<UFOsRecord>> {
-
let s = self.clone();
-
tokio::task::spawn_blocking(move || {
-
MemReader::get_records_by_collections(&s, collections, limit, expand_each_collection)
-
})
-
.await?
-
}
-
}
-
-
pub struct MemWriter {
-
keyspace: MemKeyspace,
-
global: MemPartion,
-
feeds: MemPartion,
-
records: MemPartion,
-
rollups: MemPartion,
-
queues: MemPartion,
-
}
-
-
impl MemWriter {
-
fn rollup_delete_account(
-
&mut self,
-
cursor: Cursor,
-
key_bytes: &[u8],
-
val_bytes: &[u8],
-
) -> StorageResult<usize> {
-
let did = db_complete::<DeleteAccountQueueVal>(val_bytes)?;
-
self.delete_account(&did)?;
-
let mut batch = self.keyspace.batch();
-
batch.remove(&self.queues, key_bytes);
-
insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, cursor)?;
-
batch.commit()?;
-
Ok(1)
-
}
-
-
fn rollup_live_counts(
-
&mut self,
-
timelies: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), StorageError>>,
-
cursor_exclusive_limit: Option<Cursor>,
-
rollup_limit: usize,
-
) -> StorageResult<usize> {
-
// current strategy is to buffer counts in mem before writing the rollups
-
// we *could* read+write every single batch to rollup.. but their merge is associative so
-
// ...so save the db some work up front? is this worth it? who knows...
-
-
log::warn!("sup!!!");
-
-
#[derive(Eq, Hash, PartialEq)]
-
enum Rollup {
-
Hourly(HourTruncatedCursor),
-
Weekly(WeekTruncatedCursor),
-
AllTime,
-
}
-
-
let mut batch = self.keyspace.batch();
-
let mut cursors_advanced = 0;
-
let mut last_cursor = Cursor::from_start();
-
let mut counts_by_rollup: HashMap<(Nsid, Rollup), CountsValue> = HashMap::new();
-
-
log::warn!("about to loop....");
-
for (i, kv) in timelies.enumerate() {
-
log::warn!("loop {i} {kv:?}...");
-
if i >= rollup_limit {
-
break;
-
}
-
-
let (key_bytes, val_bytes) = kv?;
-
let key = db_complete::<LiveCountsKey>(&key_bytes)
-
.inspect_err(|e| log::warn!("rlc: key: {e:?}"))?;
-
-
if cursor_exclusive_limit
-
.map(|limit| key.cursor() > limit)
-
.unwrap_or(false)
-
{
-
break;
-
}
-
-
batch.remove(&self.rollups, &key_bytes);
-
let val = db_complete::<CountsValue>(&val_bytes)
-
.inspect_err(|e| log::warn!("rlc: val: {e:?}"))?;
-
counts_by_rollup
-
.entry((
-
key.collection().clone(),
-
Rollup::Hourly(key.cursor().into()),
-
))
-
.or_default()
-
.merge(&val);
-
counts_by_rollup
-
.entry((
-
key.collection().clone(),
-
Rollup::Weekly(key.cursor().into()),
-
))
-
.or_default()
-
.merge(&val);
-
counts_by_rollup
-
.entry((key.collection().clone(), Rollup::AllTime))
-
.or_default()
-
.merge(&val);
-
-
cursors_advanced += 1;
-
last_cursor = key.cursor();
-
}
-
log::warn!("done looping. looping cbr counts(?)..");
-
-
for ((nsid, rollup), counts) in counts_by_rollup {
-
log::warn!(
-
"######################## cbr loop {nsid:?} {counts:?} ########################"
-
);
-
let key_bytes = match rollup {
-
Rollup::Hourly(hourly_cursor) => {
-
let k = HourlyRollupKey::new(hourly_cursor, &nsid);
-
log::info!("hrly k: {k:?}");
-
k.to_db_bytes()?
-
}
-
Rollup::Weekly(weekly_cursor) => {
-
let k = WeeklyRollupKey::new(weekly_cursor, &nsid);
-
log::info!("weekly k: {k:?}");
-
k.to_db_bytes()?
-
}
-
Rollup::AllTime => {
-
let k = AllTimeRollupKey::new(&nsid);
-
log::info!("alltime k: {k:?}");
-
k.to_db_bytes()?
-
}
-
};
-
// log::info!("key bytes: {key_bytes:?}");
-
let mut rolled: CountsValue = self
-
.rollups
-
.get(&key_bytes)?
-
.inspect(|v| {
-
let lax = CountsValue::from_db_bytes(v);
-
log::info!(
-
"val: len={}, lax={lax:?} first32={:?}",
-
v.len(),
-
v.get(..32)
-
);
-
})
-
.as_deref()
-
.map(db_complete::<CountsValue>)
-
.transpose()
-
.inspect_err(|e| log::warn!("oooh did we break on the rolled thing? {e:?}"))?
-
.unwrap_or_default();
-
-
// try to round-trip before inserting, for funsies
-
let tripppin = counts.to_db_bytes()?;
-
let (and_back, n) = CountsValue::from_db_bytes(&tripppin)?;
-
assert_eq!(n, tripppin.len());
-
assert_eq!(counts.prefix, and_back.prefix);
-
assert_eq!(counts.dids().estimate(), and_back.dids().estimate());
-
if counts.counts().creates > 20000000 {
-
panic!("COUNTS maybe wtf? {counts:?}")
-
}
-
// assert_eq!(rolled, and_back);
-
-
rolled.merge(&counts);
-
-
// try to round-trip before inserting, for funsies
-
let tripppin = rolled.to_db_bytes()?;
-
let (and_back, n) = CountsValue::from_db_bytes(&tripppin)?;
-
assert_eq!(n, tripppin.len());
-
assert_eq!(rolled.prefix, and_back.prefix);
-
assert_eq!(rolled.dids().estimate(), and_back.dids().estimate());
-
if rolled.counts().creates > 20000000 {
-
panic!("maybe wtf? {rolled:?}")
-
}
-
// assert_eq!(rolled, and_back);
-
-
batch.insert(&self.rollups, &key_bytes, &rolled.to_db_bytes()?);
-
}
-
-
log::warn!("done cbr loop.");
-
-
insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, last_cursor)
-
.inspect_err(|e| log::warn!("insert neu: {e:?}"))?;
-
-
batch.commit()?;
-
-
log::warn!("ok finished rlc stuff. huh.");
-
Ok(cursors_advanced)
-
}
-
}
-
-
impl StoreWriter<MemBackground> for MemWriter {
-
fn background_tasks(&mut self, _reroll: bool) -> StorageResult<MemBackground> {
-
Ok(MemBackground {})
-
}
-
-
fn insert_batch<const LIMIT: usize>(
-
&mut self,
-
event_batch: EventBatch<LIMIT>,
-
) -> StorageResult<()> {
-
if event_batch.is_empty() {
-
return Ok(());
-
}
-
-
let mut batch = self.keyspace.batch();
-
-
// would be nice not to have to iterate everything at once here
-
let latest = event_batch.latest_cursor().unwrap();
-
-
for (nsid, commits) in event_batch.commits_by_nsid {
-
for commit in commits.commits {
-
let location_key: RecordLocationKey = (&commit, &nsid).into();
-
-
match commit.action {
-
CommitAction::Cut => {
-
batch.remove(&self.records, &location_key.to_db_bytes()?);
-
}
-
CommitAction::Put(put_action) => {
-
let feed_key = NsidRecordFeedKey::from_pair(nsid.clone(), commit.cursor);
-
let feed_val: NsidRecordFeedVal =
-
(&commit.did, &commit.rkey, commit.rev.as_str()).into();
-
batch.insert(
-
&self.feeds,
-
&feed_key.to_db_bytes()?,
-
&feed_val.to_db_bytes()?,
-
);
-
-
let location_val: RecordLocationVal =
-
(commit.cursor, commit.rev.as_str(), put_action).into();
-
batch.insert(
-
&self.records,
-
&location_key.to_db_bytes()?,
-
&location_val.to_db_bytes()?,
-
);
-
}
-
}
-
}
-
let live_counts_key: LiveCountsKey = (latest, &nsid).into();
-
let counts_value = CountsValue::new(
-
CommitCounts {
-
creates: commits.creates as u64,
-
updates: commits.updates as u64,
-
deletes: commits.deletes as u64,
-
},
-
commits.dids_estimate,
-
);
-
batch.insert(
-
&self.rollups,
-
&live_counts_key.to_db_bytes()?,
-
&counts_value.to_db_bytes()?,
-
);
-
}
-
-
for remove in event_batch.account_removes {
-
let queue_key = DeleteAccountQueueKey::new(remove.cursor);
-
let queue_val: DeleteAccountQueueVal = remove.did;
-
batch.insert(
-
&self.queues,
-
&queue_key.to_db_bytes()?,
-
&queue_val.to_db_bytes()?,
-
);
-
}
-
-
batch.insert(
-
&self.global,
-
&DbStaticStr::<JetstreamCursorKey>::default().to_db_bytes()?,
-
&latest.to_db_bytes()?,
-
);
-
-
batch.commit()?;
-
Ok(())
-
}
-
-
fn step_rollup(&mut self) -> StorageResult<(usize, HashSet<Nsid>)> {
-
let mut dirty_nsids = HashSet::new();
-
-
let rollup_cursor =
-
get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?
-
.ok_or(StorageError::BadStateError(
-
"Could not find current rollup cursor".to_string(),
-
))
-
.inspect_err(|e| log::warn!("failed getting rollup cursor: {e:?}"))?;
-
-
// timelies
-
let live_counts_range = LiveCountsKey::range_from_cursor(rollup_cursor)
-
.inspect_err(|e| log::warn!("live counts range: {e:?}"))?;
-
let mut timely_iter = self.rollups.range(live_counts_range).into_iter().peekable();
-
-
let timely_next = timely_iter
-
.peek_mut()
-
.map(|kv| -> StorageResult<LiveCountsKey> {
-
match kv {
-
Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?,
-
Ok((key_bytes, _)) => {
-
let key = db_complete::<LiveCountsKey>(key_bytes).inspect_err(|e| {
-
log::warn!("failed getting key for next timely: {e:?}")
-
})?;
-
Ok(key)
-
}
-
}
-
})
-
.transpose()
-
.inspect_err(|e| log::warn!("something about timely: {e:?}"))?;
-
-
// delete accounts
-
let delete_accounts_range =
-
DeleteAccountQueueKey::new(rollup_cursor).range_to_prefix_end()?;
-
-
let next_delete = self
-
.queues
-
.range(delete_accounts_range)
-
.into_iter()
-
.next()
-
.transpose()
-
.inspect_err(|e| log::warn!("range for next delete: {e:?}"))?
-
.map(|(key_bytes, val_bytes)| {
-
db_complete::<DeleteAccountQueueKey>(&key_bytes)
-
.inspect_err(|e| log::warn!("failed inside next delete thing????: {e:?}"))
-
.map(|k| (k.suffix, key_bytes, val_bytes))
-
})
-
.transpose()
-
.inspect_err(|e| log::warn!("failed getting next delete: {e:?}"))?;
-
-
let cursors_stepped = match (timely_next, next_delete) {
-
(Some(timely), Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => {
-
if timely.cursor() < delete_cursor {
-
let n = self
-
.rollup_live_counts(
-
timely_iter,
-
Some(delete_cursor),
-
MAX_BATCHED_ROLLUP_COUNTS,
-
)
-
.inspect_err(|e| log::warn!("rolling up live counts: {e:?}"))?;
-
dirty_nsids.insert(timely.collection().clone());
-
n
-
} else {
-
self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)
-
.inspect_err(|e| log::warn!("deleting acocunt: {e:?}"))?
-
}
-
}
-
(Some(timely), None) => {
-
let n = self
-
.rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS)
-
.inspect_err(|e| log::warn!("rolling up (lasjdflkajs): {e:?}"))?;
-
dirty_nsids.insert(timely.collection().clone());
-
n
-
}
-
(None, Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => self
-
.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)
-
.inspect_err(|e| log::warn!("deleting acocunt other branch: {e:?}"))?,
-
(None, None) => 0,
-
};
-
-
Ok((cursors_stepped, dirty_nsids))
-
}
-
-
fn trim_collection(
-
&mut self,
-
collection: &Nsid,
-
limit: usize,
-
_full_scan: bool,
-
// TODO: could add a start cursor limit to avoid iterating deleted stuff at the start (/end)
-
) -> StorageResult<(usize, usize, bool)> {
-
let mut dangling_feed_keys_cleaned = 0;
-
let mut records_deleted = 0;
-
-
let mut batch = self.keyspace.batch();
-
-
let prefix = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?;
-
let mut found = 0;
-
for kv in self.feeds.prefix(&prefix).into_iter().rev() {
-
let (key_bytes, val_bytes) = kv?;
-
let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?;
-
let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?;
-
let location_key: RecordLocationKey = (&feed_key, &feed_val).into();
-
let location_key_bytes = location_key.to_db_bytes()?;
-
-
let Some(location_val_bytes) = self.records.get(&location_key_bytes)? else {
-
// record was deleted (hopefully)
-
batch.remove(&self.feeds, &location_key_bytes);
-
dangling_feed_keys_cleaned += 1;
-
continue;
-
};
-
-
let (meta, _) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?;
-
-
if meta.cursor() != feed_key.cursor() {
-
// older/different version
-
batch.remove(&self.feeds, &location_key_bytes);
-
dangling_feed_keys_cleaned += 1;
-
continue;
-
}
-
if meta.rev != feed_val.rev() {
-
// weird...
-
log::warn!("record lookup: cursor match but rev did not...? removing.");
-
batch.remove(&self.feeds, &location_key_bytes);
-
dangling_feed_keys_cleaned += 1;
-
continue;
-
}
-
-
if batch.len() >= MAX_BATCHED_CLEANUP_SIZE {
-
batch.commit()?;
-
batch = self.keyspace.batch();
-
}
-
-
found += 1;
-
if found <= limit {
-
continue;
-
}
-
-
batch.remove(&self.feeds, &location_key_bytes);
-
batch.remove(&self.records, &location_key_bytes);
-
records_deleted += 1;
-
}
-
-
batch.commit()?;
-
-
log::info!("trim_collection ({collection:?}) removed {dangling_feed_keys_cleaned} dangling feed entries and {records_deleted} records");
-
Ok((dangling_feed_keys_cleaned, records_deleted, false))
-
}
-
-
fn delete_account(&mut self, did: &Did) -> Result<usize, StorageError> {
-
let mut records_deleted = 0;
-
let mut batch = self.keyspace.batch();
-
let prefix = RecordLocationKey::from_prefix_to_db_bytes(did)?;
-
for kv in self.records.prefix(&prefix) {
-
let (key_bytes, _) = kv?;
-
batch.remove(&self.records, &key_bytes);
-
records_deleted += 1;
-
if batch.len() >= MAX_BATCHED_ACCOUNT_DELETE_RECORDS {
-
batch.commit()?;
-
batch = self.keyspace.batch();
-
}
-
}
-
batch.commit()?;
-
Ok(records_deleted)
-
}
-
}
-
-
pub struct MemBackground;
-
-
#[async_trait]
-
impl StoreBackground for MemBackground {
-
async fn run(mut self, _backfill: bool) -> StorageResult<()> {
-
// noop for mem (is there a nicer way to do this?)
-
loop {
-
tokio::time::sleep(std::time::Duration::from_secs_f64(10.)).await;
-
}
-
}
-
}
-
-
/// Get a value from a fixed key
-
fn get_static_neu<K: StaticStr, V: DbBytes>(global: &MemPartion) -> StorageResult<Option<V>> {
-
let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
-
let value = global
-
.get(&key_bytes)?
-
.map(|value_bytes| db_complete(&value_bytes))
-
.transpose()?;
-
Ok(value)
-
}
-
-
/// Get a value from a fixed key
-
fn get_snapshot_static_neu<K: StaticStr, V: DbBytes>(
-
global: &MemPartion,
-
) -> StorageResult<Option<V>> {
-
let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
-
let value = global
-
.get(&key_bytes)?
-
.map(|value_bytes| db_complete(&value_bytes))
-
.transpose()?;
-
Ok(value)
-
}
-
-
/// Set a value to a fixed key
-
fn insert_static_neu<K: StaticStr>(global: &MemPartion, value: impl DbBytes) -> StorageResult<()> {
-
let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
-
let value_bytes = value.to_db_bytes()?;
-
global.insert(&key_bytes, &value_bytes)?;
-
Ok(())
-
}
-
-
/// Set a value to a fixed key
-
fn insert_batch_static_neu<K: StaticStr>(
-
batch: &mut MemBatch,
-
global: &MemPartion,
-
value: impl DbBytes,
-
) -> StorageResult<()> {
-
let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
-
let value_bytes = value.to_db_bytes()?;
-
batch.insert(global, &key_bytes, &value_bytes);
-
Ok(())
-
}
-
-
#[derive(Debug, serde::Serialize, schemars::JsonSchema)]
-
pub struct StorageInfo {
-
pub keyspace_disk_space: u64,
-
pub keyspace_journal_count: usize,
-
pub keyspace_sequence: u64,
-
pub global_approximate_len: usize,
-
}
-
-
#[cfg(test)]
-
mod tests {
-
use super::*;
-
use crate::{DeleteAccount, RecordKey, UFOsCommit};
-
use jetstream::events::{CommitEvent, CommitOp};
-
use jetstream::exports::Cid;
-
use serde_json::value::RawValue;
-
-
fn fjall_db() -> (MemReader, MemWriter) {
-
let (read, write, _, _) = MemStorage::init(
-
tempfile::tempdir().unwrap(),
-
"offline test (no real jetstream endpoint)".to_string(),
-
false,
-
MemConfig { temp: true },
-
)
-
.unwrap();
-
(read, write)
-
}
-
-
const TEST_BATCH_LIMIT: usize = 16;
-
-
#[derive(Debug, Default)]
-
struct TestBatch {
-
pub batch: EventBatch<TEST_BATCH_LIMIT>,
-
}
-
-
impl TestBatch {
-
#[allow(clippy::too_many_arguments)]
-
pub fn create(
-
&mut self,
-
did: &str,
-
collection: &str,
-
rkey: &str,
-
record: &str,
-
rev: Option<&str>,
-
cid: Option<Cid>,
-
cursor: u64,
-
) -> Nsid {
-
let did = Did::new(did.to_string()).unwrap();
-
let collection = Nsid::new(collection.to_string()).unwrap();
-
let record = RawValue::from_string(record.to_string()).unwrap();
-
let cid = cid.unwrap_or(
-
"bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy"
-
.parse()
-
.unwrap(),
-
);
-
-
let event = CommitEvent {
-
collection,
-
rkey: RecordKey::new(rkey.to_string()).unwrap(),
-
rev: rev.unwrap_or("asdf").to_string(),
-
operation: CommitOp::Create,
-
record: Some(record),
-
cid: Some(cid),
-
};
-
-
let (commit, collection) =
-
UFOsCommit::from_commit_info(event, did.clone(), Cursor::from_raw_u64(cursor))
-
.unwrap();
-
-
self.batch
-
.commits_by_nsid
-
.entry(collection.clone())
-
.or_default()
-
.truncating_insert(commit, &[0u8; 16])
-
.unwrap();
-
-
collection
-
}
-
#[allow(clippy::too_many_arguments)]
-
pub fn update(
-
&mut self,
-
did: &str,
-
collection: &str,
-
rkey: &str,
-
record: &str,
-
rev: Option<&str>,
-
cid: Option<Cid>,
-
cursor: u64,
-
) -> Nsid {
-
let did = Did::new(did.to_string()).unwrap();
-
let collection = Nsid::new(collection.to_string()).unwrap();
-
let record = RawValue::from_string(record.to_string()).unwrap();
-
let cid = cid.unwrap_or(
-
"bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy"
-
.parse()
-
.unwrap(),
-
);
-
-
let event = CommitEvent {
-
collection,
-
rkey: RecordKey::new(rkey.to_string()).unwrap(),
-
rev: rev.unwrap_or("asdf").to_string(),
-
operation: CommitOp::Update,
-
record: Some(record),
-
cid: Some(cid),
-
};
-
-
let (commit, collection) =
-
UFOsCommit::from_commit_info(event, did.clone(), Cursor::from_raw_u64(cursor))
-
.unwrap();
-
-
self.batch
-
.commits_by_nsid
-
.entry(collection.clone())
-
.or_default()
-
.truncating_insert(commit, &[0u8; 16])
-
.unwrap();
-
-
collection
-
}
-
#[allow(clippy::too_many_arguments)]
-
pub fn delete(
-
&mut self,
-
did: &str,
-
collection: &str,
-
rkey: &str,
-
rev: Option<&str>,
-
cursor: u64,
-
) -> Nsid {
-
let did = Did::new(did.to_string()).unwrap();
-
let collection = Nsid::new(collection.to_string()).unwrap();
-
let event = CommitEvent {
-
collection,
-
rkey: RecordKey::new(rkey.to_string()).unwrap(),
-
rev: rev.unwrap_or("asdf").to_string(),
-
operation: CommitOp::Delete,
-
record: None,
-
cid: None,
-
};
-
-
let (commit, collection) =
-
UFOsCommit::from_commit_info(event, did, Cursor::from_raw_u64(cursor)).unwrap();
-
-
self.batch
-
.commits_by_nsid
-
.entry(collection.clone())
-
.or_default()
-
.truncating_insert(commit, &[0u8; 16])
-
.unwrap();
-
-
collection
-
}
-
pub fn delete_account(&mut self, did: &str, cursor: u64) -> Did {
-
let did = Did::new(did.to_string()).unwrap();
-
self.batch.account_removes.push(DeleteAccount {
-
did: did.clone(),
-
cursor: Cursor::from_raw_u64(cursor),
-
});
-
did
-
}
-
}
-
-
#[test]
-
fn test_hello() -> anyhow::Result<()> {
-
let (read, mut write) = fjall_db();
-
write.insert_batch::<TEST_BATCH_LIMIT>(EventBatch::default())?;
-
let (records, dids) =
-
read.get_counts_by_collection(&Nsid::new("a.b.c".to_string()).unwrap())?;
-
assert_eq!(records, 0);
-
assert_eq!(dids, 0);
-
Ok(())
-
}
-
-
#[test]
-
fn test_insert_one() -> anyhow::Result<()> {
-
let (read, mut write) = fjall_db();
-
-
let mut batch = TestBatch::default();
-
let collection = batch.create(
-
"did:plc:inze6wrmsm7pjl7yta3oig77",
-
"a.b.c",
-
"asdf",
-
"{}",
-
Some("rev-z"),
-
None,
-
100,
-
);
-
write.insert_batch(batch.batch)?;
-
-
let (records, dids) = read.get_counts_by_collection(&collection)?;
-
assert_eq!(records, 1);
-
assert_eq!(dids, 1);
-
let (records, dids) =
-
read.get_counts_by_collection(&Nsid::new("d.e.f".to_string()).unwrap())?;
-
assert_eq!(records, 0);
-
assert_eq!(dids, 0);
-
-
let records = read.get_records_by_collections(HashSet::from([collection]), 2, false)?;
-
assert_eq!(records.len(), 1);
-
let rec = &records[0];
-
assert_eq!(rec.record.get(), "{}");
-
assert!(!rec.is_update);
-
-
let records = read.get_records_by_collections(
-
HashSet::from([Nsid::new("d.e.f".to_string()).unwrap()]),
-
2,
-
false,
-
)?;
-
assert_eq!(records.len(), 0);
-
-
Ok(())
-
}
-
-
#[test]
-
fn test_get_multi_collection() -> anyhow::Result<()> {
-
let (read, mut write) = fjall_db();
-
-
let mut batch = TestBatch::default();
-
batch.create(
-
"did:plc:inze6wrmsm7pjl7yta3oig77",
-
"a.a.a",
-
"aaa",
-
r#""earliest""#,
-
Some("rev-a"),
-
None,
-
100,
-
);
-
batch.create(
-
"did:plc:inze6wrmsm7pjl7yta3oig77",
-
"a.a.b",
-
"aab",
-
r#""in between""#,
-
Some("rev-ab"),
-
None,
-
101,
-
);
-
batch.create(
-
"did:plc:inze6wrmsm7pjl7yta3oig77",
-
"a.a.a",
-
"aaa-2",
-
r#""last""#,
-
Some("rev-a-2"),
-
None,
-
102,
-
);
-
write.insert_batch(batch.batch)?;
-
-
let records = read.get_records_by_collections(
-
HashSet::from([
-
Nsid::new("a.a.a".to_string()).unwrap(),
-
Nsid::new("a.a.b".to_string()).unwrap(),
-
Nsid::new("a.a.c".to_string()).unwrap(),
-
]),
-
100,
-
false,
-
)?;
-
assert_eq!(records.len(), 3);
-
assert_eq!(records[0].record.get(), r#""last""#);
-
assert_eq!(
-
records[0].collection,
-
Nsid::new("a.a.a".to_string()).unwrap()
-
);
-
assert_eq!(records[1].record.get(), r#""in between""#);
-
assert_eq!(
-
records[1].collection,
-
Nsid::new("a.a.b".to_string()).unwrap()
-
);
-
assert_eq!(records[2].record.get(), r#""earliest""#);
-
assert_eq!(
-
records[2].collection,
-
Nsid::new("a.a.a".to_string()).unwrap()
-
);
-
-
Ok(())
-
}
-
-
#[test]
-
fn test_update_one() -> anyhow::Result<()> {
-
let (read, mut write) = fjall_db();
-
-
let mut batch = TestBatch::default();
-
let collection = batch.create(
-
"did:plc:inze6wrmsm7pjl7yta3oig77",
-
"a.b.c",
-
"rkey-asdf",
-
"{}",
-
Some("rev-a"),
-
None,
-
100,
-
);
-
write.insert_batch(batch.batch)?;
-
-
let mut batch = TestBatch::default();
-
batch.update(
-
"did:plc:inze6wrmsm7pjl7yta3oig77",
-
"a.b.c",
-
"rkey-asdf",
-
r#"{"ch": "ch-ch-ch-changes"}"#,
-
Some("rev-z"),
-
None,
-
101,
-
);
-
write.insert_batch(batch.batch)?;
-
-
let (records, dids) = read.get_counts_by_collection(&collection)?;
-
assert_eq!(records, 1);
-
assert_eq!(dids, 1);
-
-
let records = read.get_records_by_collections(HashSet::from([collection]), 2, false)?;
-
assert_eq!(records.len(), 1);
-
let rec = &records[0];
-
assert_eq!(rec.record.get(), r#"{"ch": "ch-ch-ch-changes"}"#);
-
assert!(rec.is_update);
-
Ok(())
-
}
-
-
#[test]
-
fn test_delete_one() -> anyhow::Result<()> {
-
let (read, mut write) = fjall_db();
-
-
let mut batch = TestBatch::default();
-
let collection = batch.create(
-
"did:plc:inze6wrmsm7pjl7yta3oig77",
-
"a.b.c",
-
"rkey-asdf",
-
"{}",
-
Some("rev-a"),
-
None,
-
100,
-
);
-
write.insert_batch(batch.batch)?;
-
-
let mut batch = TestBatch::default();
-
batch.delete(
-
"did:plc:inze6wrmsm7pjl7yta3oig77",
-
"a.b.c",
-
"rkey-asdf",
-
Some("rev-z"),
-
101,
-
);
-
write.insert_batch(batch.batch)?;
-
-
let (records, dids) = read.get_counts_by_collection(&collection)?;
-
assert_eq!(records, 1);
-
assert_eq!(dids, 1);
-
-
let records = read.get_records_by_collections(HashSet::from([collection]), 2, false)?;
-
assert_eq!(records.len(), 0);
-
-
Ok(())
-
}
-
-
#[test]
-
fn test_collection_trim() -> anyhow::Result<()> {
-
let (read, mut write) = fjall_db();
-
-
let mut batch = TestBatch::default();
-
batch.create(
-
"did:plc:inze6wrmsm7pjl7yta3oig77",
-
"a.a.a",
-
"rkey-aaa",
-
"{}",
-
Some("rev-aaa"),
-
None,
-
10_000,
-
);
-
let mut last_b_cursor;
-
for i in 1..=10 {
-
last_b_cursor = 11_000 + i;
-
batch.create(
-
&format!("did:plc:inze6wrmsm7pjl7yta3oig7{}", i % 3),
-
"a.a.b",
-
&format!("rkey-bbb-{i}"),
-
&format!(r#"{{"n": {i}}}"#),
-
Some(&format!("rev-bbb-{i}")),
-
None,
-
last_b_cursor,
-
);
-
}
-
batch.create(
-
"did:plc:inze6wrmsm7pjl7yta3oig77",
-
"a.a.c",
-
"rkey-ccc",
-
"{}",
-
Some("rev-ccc"),
-
None,
-
12_000,
-
);
-
-
write.insert_batch(batch.batch)?;
-
-
let records = read.get_records_by_collections(
-
HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]),
-
100,
-
false,
-
)?;
-
assert_eq!(records.len(), 1);
-
let records = read.get_records_by_collections(
-
HashSet::from([Nsid::new("a.a.b".to_string()).unwrap()]),
-
100,
-
false,
-
)?;
-
assert_eq!(records.len(), 10);
-
let records = read.get_records_by_collections(
-
HashSet::from([Nsid::new("a.a.c".to_string()).unwrap()]),
-
100,
-
false,
-
)?;
-
assert_eq!(records.len(), 1);
-
let records = read.get_records_by_collections(
-
HashSet::from([Nsid::new("a.a.d".to_string()).unwrap()]),
-
100,
-
false,
-
)?;
-
assert_eq!(records.len(), 0);
-
-
write.trim_collection(&Nsid::new("a.a.a".to_string()).unwrap(), 6, false)?;
-
write.trim_collection(&Nsid::new("a.a.b".to_string()).unwrap(), 6, false)?;
-
write.trim_collection(&Nsid::new("a.a.c".to_string()).unwrap(), 6, false)?;
-
write.trim_collection(&Nsid::new("a.a.d".to_string()).unwrap(), 6, false)?;
-
-
let records = read.get_records_by_collections(
-
HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]),
-
100,
-
false,
-
)?;
-
assert_eq!(records.len(), 1);
-
let records = read.get_records_by_collections(
-
HashSet::from([Nsid::new("a.a.b".to_string()).unwrap()]),
-
100,
-
false,
-
)?;
-
assert_eq!(records.len(), 6);
-
let records = read.get_records_by_collections(
-
HashSet::from([Nsid::new("a.a.c".to_string()).unwrap()]),
-
100,
-
false,
-
)?;
-
assert_eq!(records.len(), 1);
-
let records = read.get_records_by_collections(
-
HashSet::from([Nsid::new("a.a.d".to_string()).unwrap()]),
-
100,
-
false,
-
)?;
-
assert_eq!(records.len(), 0);
-
-
Ok(())
-
}
-
-
#[test]
-
fn test_delete_account() -> anyhow::Result<()> {
-
let (read, mut write) = fjall_db();
-
-
let mut batch = TestBatch::default();
-
batch.create(
-
"did:plc:person-a",
-
"a.a.a",
-
"rkey-aaa",
-
"{}",
-
Some("rev-aaa"),
-
None,
-
10_000,
-
);
-
for i in 1..=2 {
-
batch.create(
-
"did:plc:person-b",
-
"a.a.a",
-
&format!("rkey-bbb-{i}"),
-
&format!(r#"{{"n": {i}}}"#),
-
Some(&format!("rev-bbb-{i}")),
-
None,
-
11_000 + i,
-
);
-
}
-
write.insert_batch(batch.batch)?;
-
-
let records = read.get_records_by_collections(
-
HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]),
-
100,
-
false,
-
)?;
-
assert_eq!(records.len(), 3);
-
-
let records_deleted =
-
write.delete_account(&Did::new("did:plc:person-b".to_string()).unwrap())?;
-
assert_eq!(records_deleted, 2);
-
-
let records = read.get_records_by_collections(
-
HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]),
-
100,
-
false,
-
)?;
-
assert_eq!(records.len(), 1);
-
-
Ok(())
-
}
-
-
#[test]
-
fn rollup_delete_account_removes_record() -> anyhow::Result<()> {
-
let (read, mut write) = fjall_db();
-
-
let mut batch = TestBatch::default();
-
batch.create(
-
"did:plc:person-a",
-
"a.a.a",
-
"rkey-aaa",
-
"{}",
-
Some("rev-aaa"),
-
None,
-
10_000,
-
);
-
write.insert_batch(batch.batch)?;
-
-
let mut batch = TestBatch::default();
-
batch.delete_account("did:plc:person-a", 9_999); // queue it before the rollup
-
write.insert_batch(batch.batch)?;
-
-
write.step_rollup()?;
-
-
let records = read.get_records_by_collections(
-
HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]),
-
1,
-
false,
-
)?;
-
assert_eq!(records.len(), 0);
-
-
Ok(())
-
}
-
-
#[test]
-
fn rollup_delete_live_count_step() -> anyhow::Result<()> {
-
let (read, mut write) = fjall_db();
-
-
let mut batch = TestBatch::default();
-
batch.create(
-
"did:plc:person-a",
-
"a.a.a",
-
"rkey-aaa",
-
"{}",
-
Some("rev-aaa"),
-
None,
-
10_000,
-
);
-
write.insert_batch(batch.batch)?;
-
-
let (n, _) = write.step_rollup()?;
-
assert_eq!(n, 1);
-
-
let mut batch = TestBatch::default();
-
batch.delete_account("did:plc:person-a", 10_001);
-
write.insert_batch(batch.batch)?;
-
-
let records = read.get_records_by_collections(
-
HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]),
-
1,
-
false,
-
)?;
-
assert_eq!(records.len(), 1);
-
-
let (n, _) = write.step_rollup()?;
-
assert_eq!(n, 1);
-
-
let records = read.get_records_by_collections(
-
HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]),
-
1,
-
false,
-
)?;
-
assert_eq!(records.len(), 0);
-
-
let mut batch = TestBatch::default();
-
batch.delete_account("did:plc:person-a", 9_999);
-
write.insert_batch(batch.batch)?;
-
-
let (n, _) = write.step_rollup()?;
-
assert_eq!(n, 0);
-
-
Ok(())
-
}
-
-
#[test]
-
fn rollup_multiple_count_batches() -> anyhow::Result<()> {
-
let (_read, mut write) = fjall_db();
-
-
let mut batch = TestBatch::default();
-
batch.create(
-
"did:plc:person-a",
-
"a.a.a",
-
"rkey-aaa",
-
"{}",
-
Some("rev-aaa"),
-
None,
-
10_000,
-
);
-
write.insert_batch(batch.batch)?;
-
-
let mut batch = TestBatch::default();
-
batch.create(
-
"did:plc:person-a",
-
"a.a.a",
-
"rkey-aab",
-
"{}",
-
Some("rev-aab"),
-
None,
-
10_001,
-
);
-
write.insert_batch(batch.batch)?;
-
-
let (n, _) = write.step_rollup()?;
-
assert_eq!(n, 2);
-
-
let (n, _) = write.step_rollup()?;
-
assert_eq!(n, 0);
-
-
Ok(())
-
}
-
-
#[test]
-
fn counts_before_and_after_rollup() -> anyhow::Result<()> {
-
let (read, mut write) = fjall_db();
-
-
let mut batch = TestBatch::default();
-
batch.create(
-
"did:plc:person-a",
-
"a.a.a",
-
"rkey-aaa",
-
"{}",
-
Some("rev-aaa"),
-
None,
-
10_000,
-
);
-
batch.create(
-
"did:plc:person-b",
-
"a.a.a",
-
"rkey-bbb",
-
"{}",
-
Some("rev-bbb"),
-
None,
-
10_001,
-
);
-
write.insert_batch(batch.batch)?;
-
-
let mut batch = TestBatch::default();
-
batch.delete_account("did:plc:person-a", 11_000);
-
write.insert_batch(batch.batch)?;
-
-
let mut batch = TestBatch::default();
-
batch.create(
-
"did:plc:person-a",
-
"a.a.a",
-
"rkey-aac",
-
"{}",
-
Some("rev-aac"),
-
None,
-
12_000,
-
);
-
write.insert_batch(batch.batch)?;
-
-
// before any rollup
-
let (records, dids) =
-
read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?;
-
assert_eq!(records, 3);
-
assert_eq!(dids, 2);
-
-
// first batch rolled up
-
let (n, _) = write.step_rollup()?;
-
assert_eq!(n, 1);
-
-
let (records, dids) =
-
read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?;
-
assert_eq!(records, 3);
-
assert_eq!(dids, 2);
-
-
// delete account rolled up
-
let (n, _) = write.step_rollup()?;
-
assert_eq!(n, 1);
-
-
let (records, dids) =
-
read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?;
-
assert_eq!(records, 3);
-
assert_eq!(dids, 2);
-
-
// second batch rolled up
-
let (n, _) = write.step_rollup()?;
-
assert_eq!(n, 1);
-
-
let (records, dids) =
-
read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?;
-
assert_eq!(records, 3);
-
assert_eq!(dids, 2);
-
-
// no more rollups left
-
let (n, _) = write.step_rollup()?;
-
assert_eq!(n, 0);
-
-
Ok(())
-
}
-
}
+8 -1
ufos/src/store_types.rs
···
}
impl From<&CountsValue> for JustCount {
fn from(cv: &CountsValue) -> Self {
+
let CommitCounts {
+
creates,
+
updates,
+
deletes,
+
} = cv.counts();
Self {
-
creates: cv.counts().creates,
+
creates,
+
updates,
+
deletes,
dids_estimate: cv.dids().estimate() as u64,
}
}