forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::db_types::{
2 db_complete, DbBytes, DbStaticStr, EncodingResult, StaticStr, SubPrefixBytes,
3};
4use crate::error::StorageError;
5use crate::storage::{StorageResult, StorageWhatever, StoreBackground, StoreReader, StoreWriter};
6use crate::store_types::{
7 AllTimeDidsKey, AllTimeRecordsKey, AllTimeRollupKey, CommitCounts, CountsValue, CursorBucket,
8 DeleteAccountQueueKey, DeleteAccountQueueVal, HourTruncatedCursor, HourlyDidsKey,
9 HourlyRecordsKey, HourlyRollupKey, HourlyRollupStaticPrefix, JetstreamCursorKey,
10 JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey,
11 NewRollupCursorKey, NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal,
12 RecordLocationKey, RecordLocationMeta, RecordLocationVal, RecordRawValue, SketchSecretKey,
13 SketchSecretPrefix, TakeoffKey, TakeoffValue, TrimCollectionCursorKey, WeekTruncatedCursor,
14 WeeklyDidsKey, WeeklyRecordsKey, WeeklyRollupKey, WithCollection, WithRank, HOUR_IN_MICROS,
15 WEEK_IN_MICROS,
16};
17use crate::{
18 nice_duration, CommitAction, ConsumerInfo, Did, EncodingError, EventBatch, JustCount, Nsid,
19 NsidCount, NsidPrefix, OrderCollectionsBy, PrefixChild, PrefixCount, UFOsRecord,
20};
21use async_trait::async_trait;
22use fjall::{
23 Batch as FjallBatch, Config, Keyspace, PartitionCreateOptions, PartitionHandle, Snapshot,
24};
25use jetstream::events::Cursor;
26use lsm_tree::AbstractTree;
27use metrics::{
28 counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram, Unit,
29};
30use std::collections::{HashMap, HashSet};
31use std::iter::Peekable;
32use std::ops::Bound;
33use std::path::Path;
34use std::sync::{
35 atomic::{AtomicBool, Ordering},
36 Arc,
37};
38use std::time::{Duration, Instant, SystemTime};
39
40const MAX_BATCHED_ACCOUNT_DELETE_RECORDS: usize = 1024;
41const MAX_BATCHED_ROLLUP_COUNTS: usize = 256;
42
43///
44/// new data format, roughly:
45///
46/// Partition: 'global'
47///
48/// - Global sequence counter (is the jetstream cursor -- monotonic with many gaps)
49/// - key: "js_cursor" (literal)
50/// - val: u64
51///
52/// - Jetstream server endpoint (persisted because the cursor can't be used on another instance without data loss)
53/// - key: "js_endpoint" (literal)
54/// - val: string (URL of the instance)
55///
56/// - Launch date
57/// - key: "takeoff" (literal)
58/// - val: u64 (micros timestamp, not from jetstream for now so not precise)
59///
60/// - Cardinality estimator secret
61/// - key: "sketch_secret" (literal)
62/// - val: [u8; 16]
63///
64/// - Rollup cursor (bg work: roll stats into hourlies, delete accounts, old record deletes)
65/// - key: "rollup_cursor" (literal)
66/// - val: u64 (tracks behind js_cursor)
67///
68/// - Feed trim cursor (bg work: delete oldest excess records)
69/// - key: "trim_cursor" || nullstr (nsid)
70/// - val: u64 (earliest previously-removed feed entry jetstream cursor)
71///
72/// Partition: 'feed'
73///
74/// - Per-collection list of record references ordered by jetstream cursor
75/// - key: nullstr || u64 (collection nsid null-terminated, jetstream cursor)
76/// - val: nullstr || nullstr || nullstr (did, rkey, rev. rev is mostly a sanity-check for now.)
77///
78///
79/// Partition: 'records'
80///
81/// - Actual records by their atproto location
82/// - key: nullstr || nullstr || nullstr (did, collection, rkey)
83/// - val: u64 || bool || nullstr || rawval (js_cursor, is_update, rev, actual record)
84///
85///
86/// Partition: 'rollups'
87///
88/// - Live (batched) records counts and dids estimate per collection
89/// - key: "live_counts" || u64 || nullstr (js_cursor, nsid)
90/// - val: u64 || HLL (count (not cursor), estimator)
91///
92///
93/// - Hourly total record counts and dids estimate per collection
94/// - key: "hourly_counts" || u64 || nullstr (hour, nsid)
95/// - val: u64 || HLL (count (not cursor), estimator)
96///
97/// - Hourly record count ranking
98/// - key: "hourly_rank_records" || u64 || u64 || nullstr (hour, count, nsid)
99/// - val: [empty]
100///
101/// - Hourly did estimate ranking
102/// - key: "hourly_rank_dids" || u64 || u64 || nullstr (hour, dids estimate, nsid)
103/// - val: [empty]
104///
105///
106/// - Weekly total record counts and dids estimate per collection
107/// - key: "weekly_counts" || u64 || nullstr (week, nsid)
108/// - val: u64 || HLL (count (not cursor), estimator)
109///
110/// - Weekly record count ranking
111/// - key: "weekly_rank_records" || u64 || u64 || nullstr (week, count, nsid)
112/// - val: [empty]
113///
114/// - Weekly did estimate ranking
115/// - key: "weekly_rank_dids" || u64 || u64 || nullstr (week, dids estimate, nsid)
116/// - val: [empty]
117///
118///
119/// - All-time total record counts and dids estimate per collection
120/// - key: "ever_counts" || nullstr (nsid)
121/// - val: u64 || HLL (count (not cursor), estimator)
122///
123/// - All-time total record record count ranking
124/// - key: "ever_rank_records" || u64 || nullstr (count, nsid)
125/// - val: [empty]
126///
127/// - All-time did estimate ranking
128/// - key: "ever_rank_dids" || u64 || nullstr (dids estimate, nsid)
129/// - val: [empty]
130///
131///
132/// Partition: 'queues'
133///
134/// - Delete account queue
135/// - key: "delete_acount" || u64 (js_cursor)
136/// - val: nullstr (did)
137///
138///
139/// TODO: moderation actions
140/// TODO: account privacy preferences. Might wait for the protocol-level (PDS-level?) stuff to land. Will probably do lazy fetching + caching on read.
141#[derive(Debug)]
142pub struct FjallStorage {}
143
144#[derive(Debug, Default)]
145pub struct FjallConfig {
146 /// drop the db when the storage is dropped
147 ///
148 /// this is only meant for tests
149 #[cfg(test)]
150 pub temp: bool,
151}
152
153impl StorageWhatever<FjallReader, FjallWriter, FjallBackground, FjallConfig> for FjallStorage {
154 fn init(
155 path: impl AsRef<Path>,
156 endpoint: String,
157 force_endpoint: bool,
158 _config: FjallConfig,
159 ) -> StorageResult<(FjallReader, FjallWriter, Option<Cursor>, SketchSecretPrefix)> {
160 let keyspace = {
161 let config = Config::new(path);
162
163 // #[cfg(not(test))]
164 // let config = config.fsync_ms(Some(4_000));
165
166 config.open()?
167 };
168
169 let global = keyspace.open_partition("global", PartitionCreateOptions::default())?;
170 let feeds = keyspace.open_partition("feeds", PartitionCreateOptions::default())?;
171 let records = keyspace.open_partition("records", PartitionCreateOptions::default())?;
172 let rollups = keyspace.open_partition("rollups", PartitionCreateOptions::default())?;
173 let queues = keyspace.open_partition("queues", PartitionCreateOptions::default())?;
174
175 let js_cursor = get_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?;
176
177 let sketch_secret = if js_cursor.is_some() {
178 let stored_endpoint =
179 get_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)?;
180 let JetstreamEndpointValue(stored) = stored_endpoint.ok_or(StorageError::InitError(
181 "found cursor but missing js_endpoint, refusing to start.".to_string(),
182 ))?;
183
184 let Some(stored_secret) =
185 get_static_neu::<SketchSecretKey, SketchSecretPrefix>(&global)?
186 else {
187 return Err(StorageError::InitError(
188 "found cursor but missing sketch_secret, refusing to start.".to_string(),
189 ));
190 };
191
192 if stored != endpoint {
193 if force_endpoint {
194 log::warn!("forcing a jetstream switch from {stored:?} to {endpoint:?}");
195 insert_static_neu::<JetstreamEndpointKey>(
196 &global,
197 JetstreamEndpointValue(endpoint.to_string()),
198 )?;
199 } else {
200 return Err(StorageError::InitError(format!(
201 "stored js_endpoint {stored:?} differs from provided {endpoint:?}, refusing to start without --jetstream-force.")));
202 }
203 }
204 stored_secret
205 } else {
206 log::info!("initializing a fresh db!");
207 init_static_neu::<JetstreamEndpointKey>(
208 &global,
209 JetstreamEndpointValue(endpoint.to_string()),
210 )?;
211
212 log::info!("generating new secret for cardinality sketches...");
213 let mut sketch_secret: SketchSecretPrefix = [0u8; 16];
214 getrandom::fill(&mut sketch_secret).map_err(|e| {
215 StorageError::InitError(format!(
216 "failed to get a random secret for cardinality sketches: {e:?}"
217 ))
218 })?;
219 init_static_neu::<SketchSecretKey>(&global, sketch_secret)?;
220
221 init_static_neu::<TakeoffKey>(&global, Cursor::at(SystemTime::now()))?;
222 init_static_neu::<NewRollupCursorKey>(&global, Cursor::from_start())?;
223
224 sketch_secret
225 };
226
227 let reader = FjallReader {
228 keyspace: keyspace.clone(),
229 global: global.clone(),
230 feeds: feeds.clone(),
231 records: records.clone(),
232 rollups: rollups.clone(),
233 queues: queues.clone(),
234 };
235 reader.describe_metrics();
236 let writer = FjallWriter {
237 bg_taken: Arc::new(AtomicBool::new(false)),
238 keyspace,
239 global,
240 feeds,
241 records,
242 rollups,
243 queues,
244 };
245 writer.describe_metrics();
246 Ok((reader, writer, js_cursor, sketch_secret))
247 }
248}
249
250type FjallRKV = fjall::Result<(fjall::Slice, fjall::Slice)>;
251
252#[derive(Clone)]
253pub struct FjallReader {
254 keyspace: Keyspace,
255 global: PartitionHandle,
256 feeds: PartitionHandle,
257 records: PartitionHandle,
258 rollups: PartitionHandle,
259 queues: PartitionHandle,
260}
261
262/// An iterator that knows how to skip over deleted/invalidated records
263struct RecordIterator {
264 db_iter: Box<dyn Iterator<Item = FjallRKV>>,
265 records: PartitionHandle,
266 limit: usize,
267 fetched: usize,
268}
269impl RecordIterator {
270 pub fn new(
271 feeds: &PartitionHandle,
272 records: PartitionHandle,
273 collection: &Nsid,
274 limit: usize,
275 ) -> StorageResult<Self> {
276 let prefix = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?;
277 let db_iter = feeds.prefix(prefix).rev();
278 Ok(Self {
279 db_iter: Box::new(db_iter),
280 records,
281 limit,
282 fetched: 0,
283 })
284 }
285 fn get_record(&self, db_next: FjallRKV) -> StorageResult<Option<UFOsRecord>> {
286 let (key_bytes, val_bytes) = db_next?;
287 let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?;
288 let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?;
289 let location_key: RecordLocationKey = (&feed_key, &feed_val).into();
290
291 let Some(location_val_bytes) = self.records.get(location_key.to_db_bytes()?)? else {
292 // record was deleted (hopefully)
293 return Ok(None);
294 };
295
296 let (meta, n) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?;
297
298 if meta.cursor() != feed_key.cursor() {
299 // older/different version
300 return Ok(None);
301 }
302 if meta.rev != feed_val.rev() {
303 // weird...
304 log::warn!("record lookup: cursor match but rev did not...? excluding.");
305 return Ok(None);
306 }
307 let Some(raw_value_bytes) = location_val_bytes.get(n..) else {
308 log::warn!(
309 "record lookup: found record but could not get bytes to decode the record??"
310 );
311 return Ok(None);
312 };
313 let rawval = db_complete::<RecordRawValue>(raw_value_bytes)?;
314 Ok(Some(UFOsRecord {
315 collection: feed_key.collection().clone(),
316 cursor: feed_key.cursor(),
317 did: feed_val.did().clone(),
318 rkey: feed_val.rkey().clone(),
319 rev: meta.rev.to_string(),
320 record: rawval.try_into()?,
321 is_update: meta.is_update,
322 }))
323 }
324}
325impl Iterator for RecordIterator {
326 type Item = StorageResult<Option<UFOsRecord>>;
327 fn next(&mut self) -> Option<Self::Item> {
328 if self.fetched == self.limit {
329 return Some(Ok(None));
330 }
331 let record = loop {
332 let db_next = self.db_iter.next()?; // None short-circuits here
333 match self.get_record(db_next) {
334 Err(e) => return Some(Err(e)),
335 Ok(Some(record)) => break record,
336 Ok(None) => continue,
337 }
338 };
339 self.fetched += 1;
340 Some(Ok(Some(record)))
341 }
342}
343
344type GetCounts = Box<dyn FnOnce() -> StorageResult<CountsValue>>;
345type GetByterCounts = StorageResult<(Nsid, GetCounts)>;
346type NsidCounter = Box<dyn Iterator<Item = GetByterCounts>>;
347fn get_lexi_iter<T: WithCollection + DbBytes + 'static>(
348 snapshot: &Snapshot,
349 start: Bound<Vec<u8>>,
350 end: Bound<Vec<u8>>,
351) -> StorageResult<NsidCounter> {
352 Ok(Box::new(snapshot.range((start, end)).map(|kv| {
353 let (k_bytes, v_bytes) = kv?;
354 let key = db_complete::<T>(&k_bytes)?;
355 let nsid = key.collection().clone();
356 let get_counts: GetCounts = Box::new(move || Ok(db_complete::<CountsValue>(&v_bytes)?));
357 Ok((nsid, get_counts))
358 })))
359}
360type GetRollupKey = Arc<dyn Fn(&Nsid) -> EncodingResult<Vec<u8>>>;
361fn get_lookup_iter<T: WithCollection + WithRank + DbBytes + 'static>(
362 snapshot: lsm_tree::Snapshot,
363 start: Bound<Vec<u8>>,
364 end: Bound<Vec<u8>>,
365 get_rollup_key: GetRollupKey,
366) -> StorageResult<NsidCounter> {
367 Ok(Box::new(snapshot.range((start, end)).rev().map(
368 move |kv| {
369 let (k_bytes, _) = kv?;
370 let key = db_complete::<T>(&k_bytes)?;
371 let nsid = key.collection().clone();
372 let get_counts: GetCounts = Box::new({
373 let nsid = nsid.clone();
374 let snapshot = snapshot.clone();
375 let get_rollup_key = get_rollup_key.clone();
376 move || {
377 let db_count_bytes = snapshot.get(get_rollup_key(&nsid)?)?.expect(
378 "integrity: all-time rank rollup must have corresponding all-time count rollup",
379 );
380 Ok(db_complete::<CountsValue>(&db_count_bytes)?)
381 }
382 });
383 Ok((nsid, get_counts))
384 },
385 )))
386}
387
388type CollectionSerieses = HashMap<Nsid, Vec<CountsValue>>;
389
390impl FjallReader {
391 fn describe_metrics(&self) {
392 describe_gauge!(
393 "storage_fjall_l0_run_count",
394 Unit::Count,
395 "number of L0 runs in a partition"
396 );
397 describe_gauge!(
398 "storage_fjall_keyspace_disk_space",
399 Unit::Bytes,
400 "total storage used according to fjall"
401 );
402 describe_gauge!(
403 "storage_fjall_journal_count",
404 Unit::Count,
405 "total keyspace journals according to fjall"
406 );
407 describe_gauge!(
408 "storage_fjall_keyspace_sequence",
409 Unit::Count,
410 "fjall keyspace sequence"
411 );
412 }
413
414 fn get_storage_stats(&self) -> StorageResult<serde_json::Value> {
415 let rollup_cursor =
416 get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?
417 .map(|c| c.to_raw_u64());
418
419 Ok(serde_json::json!({
420 "keyspace_disk_space": self.keyspace.disk_space(),
421 "keyspace_journal_count": self.keyspace.journal_count(),
422 "keyspace_sequence": self.keyspace.instant(),
423 "rollup_cursor": rollup_cursor,
424 }))
425 }
426
427 fn get_consumer_info(&self) -> StorageResult<ConsumerInfo> {
428 let global = self.global.snapshot();
429
430 let endpoint =
431 get_snapshot_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)?
432 .ok_or(StorageError::BadStateError(
433 "Could not find jetstream endpoint".to_string(),
434 ))?
435 .0;
436
437 let started_at = get_snapshot_static_neu::<TakeoffKey, TakeoffValue>(&global)?
438 .ok_or(StorageError::BadStateError(
439 "Could not find jetstream takeoff time".to_string(),
440 ))?
441 .to_raw_u64();
442
443 let latest_cursor =
444 get_snapshot_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?
445 .map(|c| c.to_raw_u64());
446
447 let rollup_cursor =
448 get_snapshot_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&global)?
449 .map(|c| c.to_raw_u64());
450
451 Ok(ConsumerInfo::Jetstream {
452 endpoint,
453 started_at,
454 latest_cursor,
455 rollup_cursor,
456 })
457 }
458
459 fn get_earliest_hour(&self, rollups: Option<&Snapshot>) -> StorageResult<HourTruncatedCursor> {
460 let cursor = rollups
461 .unwrap_or(&self.rollups.snapshot())
462 .prefix(HourlyRollupStaticPrefix::default().to_db_bytes()?)
463 .next()
464 .transpose()?
465 .map(|(key_bytes, _)| db_complete::<HourlyRollupKey>(&key_bytes))
466 .transpose()?
467 .map(|key| key.cursor())
468 .unwrap_or_else(|| Cursor::from_start().into());
469 Ok(cursor)
470 }
471
472 fn get_lexi_collections(
473 &self,
474 snapshot: Snapshot,
475 limit: usize,
476 cursor: Option<Vec<u8>>,
477 buckets: Vec<CursorBucket>,
478 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> {
479 let cursor_nsid = cursor.as_deref().map(db_complete::<Nsid>).transpose()?;
480 let mut iters: Vec<Peekable<NsidCounter>> = Vec::with_capacity(buckets.len());
481 for bucket in &buckets {
482 let it: NsidCounter = match bucket {
483 CursorBucket::Hour(t) => {
484 let start = cursor_nsid
485 .as_ref()
486 .map(|nsid| HourlyRollupKey::after_nsid(*t, nsid))
487 .unwrap_or_else(|| HourlyRollupKey::start(*t))?;
488 let end = HourlyRollupKey::end(*t)?;
489 get_lexi_iter::<HourlyRollupKey>(&snapshot, start, end)?
490 }
491 CursorBucket::Week(t) => {
492 let start = cursor_nsid
493 .as_ref()
494 .map(|nsid| WeeklyRollupKey::after_nsid(*t, nsid))
495 .unwrap_or_else(|| WeeklyRollupKey::start(*t))?;
496 let end = WeeklyRollupKey::end(*t)?;
497 get_lexi_iter::<WeeklyRollupKey>(&snapshot, start, end)?
498 }
499 CursorBucket::AllTime => {
500 let start = cursor_nsid
501 .as_ref()
502 .map(AllTimeRollupKey::after_nsid)
503 .unwrap_or_else(AllTimeRollupKey::start)?;
504 let end = AllTimeRollupKey::end()?;
505 get_lexi_iter::<AllTimeRollupKey>(&snapshot, start, end)?
506 }
507 };
508 iters.push(it.peekable());
509 }
510
511 let mut out = Vec::new();
512 let mut current_nsid = None;
513 for _ in 0..limit {
514 // double-scan the iters for each element: this could be eliminated but we're starting simple.
515 // first scan: find the lowest nsid
516 // second scan: take + merge, and advance all iters with lowest nsid
517 let mut lowest: Option<Nsid> = None;
518 for iter in &mut iters {
519 if let Some(bla) = iter.peek_mut() {
520 let (nsid, _) = match bla {
521 Ok(v) => v,
522 Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?,
523 };
524 lowest = match lowest {
525 Some(ref current) if nsid.as_str() > current.as_str() => lowest,
526 _ => Some(nsid.clone()),
527 };
528 }
529 }
530 current_nsid = lowest.clone();
531 let Some(nsid) = lowest else { break };
532
533 let mut merged = CountsValue::default();
534 for iter in &mut iters {
535 // unwrap: potential fjall error was already checked & bailed over when peeking in the first loop
536 if let Some(Ok((_, get_counts))) = iter.next_if(|v| v.as_ref().unwrap().0 == nsid) {
537 let counts = get_counts()?;
538 merged.merge(&counts);
539 }
540 }
541 out.push(NsidCount::new(&nsid, &merged));
542 }
543
544 let next_cursor = current_nsid.map(|s| s.to_db_bytes()).transpose()?;
545 Ok((out, next_cursor))
546 }
547
548 fn get_ordered_collections(
549 &self,
550 snapshot: Snapshot,
551 limit: usize,
552 order: OrderCollectionsBy,
553 buckets: Vec<CursorBucket>,
554 ) -> StorageResult<Vec<NsidCount>> {
555 let mut iters: Vec<NsidCounter> = Vec::with_capacity(buckets.len());
556
557 for bucket in buckets {
558 let it: NsidCounter = match (&order, bucket) {
559 (OrderCollectionsBy::RecordsCreated, CursorBucket::Hour(t)) => {
560 get_lookup_iter::<HourlyRecordsKey>(
561 snapshot.clone(),
562 HourlyRecordsKey::start(t)?,
563 HourlyRecordsKey::end(t)?,
564 Arc::new({
565 move |collection| HourlyRollupKey::new(t, collection).to_db_bytes()
566 }),
567 )?
568 }
569 (OrderCollectionsBy::DidsEstimate, CursorBucket::Hour(t)) => {
570 get_lookup_iter::<HourlyDidsKey>(
571 snapshot.clone(),
572 HourlyDidsKey::start(t)?,
573 HourlyDidsKey::end(t)?,
574 Arc::new({
575 move |collection| HourlyRollupKey::new(t, collection).to_db_bytes()
576 }),
577 )?
578 }
579 (OrderCollectionsBy::RecordsCreated, CursorBucket::Week(t)) => {
580 get_lookup_iter::<WeeklyRecordsKey>(
581 snapshot.clone(),
582 WeeklyRecordsKey::start(t)?,
583 WeeklyRecordsKey::end(t)?,
584 Arc::new({
585 move |collection| WeeklyRollupKey::new(t, collection).to_db_bytes()
586 }),
587 )?
588 }
589 (OrderCollectionsBy::DidsEstimate, CursorBucket::Week(t)) => {
590 get_lookup_iter::<WeeklyDidsKey>(
591 snapshot.clone(),
592 WeeklyDidsKey::start(t)?,
593 WeeklyDidsKey::end(t)?,
594 Arc::new({
595 move |collection| WeeklyRollupKey::new(t, collection).to_db_bytes()
596 }),
597 )?
598 }
599 (OrderCollectionsBy::RecordsCreated, CursorBucket::AllTime) => {
600 get_lookup_iter::<AllTimeRecordsKey>(
601 snapshot.clone(),
602 AllTimeRecordsKey::start()?,
603 AllTimeRecordsKey::end()?,
604 Arc::new(|collection| AllTimeRollupKey::new(collection).to_db_bytes()),
605 )?
606 }
607 (OrderCollectionsBy::DidsEstimate, CursorBucket::AllTime) => {
608 get_lookup_iter::<AllTimeDidsKey>(
609 snapshot.clone(),
610 AllTimeDidsKey::start()?,
611 AllTimeDidsKey::end()?,
612 Arc::new(|collection| AllTimeRollupKey::new(collection).to_db_bytes()),
613 )?
614 }
615 (OrderCollectionsBy::Lexi { .. }, _) => unreachable!(),
616 };
617 iters.push(it);
618 }
619
620 // overfetch by taking a bit more than the limit
621 // merge by collection
622 // sort by requested order, take limit, discard all remaining
623 //
624 // this isn't guaranteed to be correct, but it will hopefully be close most of the time:
625 // - it's possible that some NSIDs might score low during some time-buckets, and miss being merged
626 // - overfetching hopefully helps a bit by catching nsids near the threshold more often, but. yeah.
627 //
628 // this thing is heavy, there's probably a better way
629 let mut ranked: HashMap<Nsid, CountsValue> = HashMap::with_capacity(limit * 2);
630 for iter in iters {
631 for pair in iter.take((limit as f64 * 1.3).ceil() as usize) {
632 let (nsid, get_counts) = pair?;
633 let counts = get_counts()?;
634 ranked.entry(nsid).or_default().merge(&counts);
635 }
636 }
637 let mut ranked: Vec<(Nsid, CountsValue)> = ranked.into_iter().collect();
638 match order {
639 OrderCollectionsBy::RecordsCreated => ranked.sort_by_key(|(_, c)| c.counts().creates),
640 OrderCollectionsBy::DidsEstimate => ranked.sort_by_key(|(_, c)| c.dids().estimate()),
641 OrderCollectionsBy::Lexi { .. } => unreachable!(),
642 }
643 let counts = ranked
644 .into_iter()
645 .rev()
646 .take(limit)
647 .map(|(nsid, cv)| NsidCount::new(&nsid, &cv))
648 .collect();
649 Ok(counts)
650 }
651
652 fn get_collections(
653 &self,
654 limit: usize,
655 order: OrderCollectionsBy,
656 since: Option<HourTruncatedCursor>,
657 until: Option<HourTruncatedCursor>,
658 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> {
659 let snapshot = self.rollups.snapshot();
660 let buckets = if let (None, None) = (since, until) {
661 vec![CursorBucket::AllTime]
662 } else {
663 let mut lower = self.get_earliest_hour(Some(&snapshot))?;
664 if let Some(specified) = since {
665 if specified > lower {
666 lower = specified;
667 }
668 }
669 let upper = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into());
670 CursorBucket::buckets_spanning(lower, upper)
671 };
672 match order {
673 OrderCollectionsBy::Lexi { cursor } => {
674 self.get_lexi_collections(snapshot, limit, cursor, buckets)
675 }
676 _ => Ok((
677 self.get_ordered_collections(snapshot, limit, order, buckets)?,
678 None,
679 )),
680 }
681 }
682
683 fn get_lexi_prefix(
684 &self,
685 snapshot: Snapshot,
686 prefix: NsidPrefix,
687 limit: usize,
688 cursor: Option<Vec<u8>>,
689 buckets: Vec<CursorBucket>,
690 ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)> {
691 // let prefix_sub_with_null = prefix.as_str().to_string().to_db_bytes()?;
692 let prefix_sub = String::sub_prefix(&prefix.terminated())?; // with trailing dot to ensure full segment match
693 let cursor_child = cursor
694 .as_deref()
695 .map(|encoded_bytes| {
696 let decoded: String = db_complete(encoded_bytes)?;
697 // TODO: write some tests for cursors, there's probably bugs here
698 let as_sub_prefix_with_null = decoded.to_db_bytes()?;
699 Ok::<_, EncodingError>(as_sub_prefix_with_null)
700 })
701 .transpose()?;
702 let mut iters: Vec<NsidCounter> = Vec::with_capacity(buckets.len());
703 for bucket in &buckets {
704 let it: NsidCounter = match bucket {
705 CursorBucket::Hour(t) => {
706 let start = cursor_child
707 .as_ref()
708 .map(|child| HourlyRollupKey::after_nsid_prefix(*t, child))
709 .unwrap_or_else(|| HourlyRollupKey::after_nsid_prefix(*t, &prefix_sub))?;
710 let end = HourlyRollupKey::nsid_prefix_end(*t, &prefix_sub)?;
711 get_lexi_iter::<HourlyRollupKey>(&snapshot, start, end)?
712 }
713 CursorBucket::Week(t) => {
714 let start = cursor_child
715 .as_ref()
716 .map(|child| WeeklyRollupKey::after_nsid_prefix(*t, child))
717 .unwrap_or_else(|| WeeklyRollupKey::after_nsid_prefix(*t, &prefix_sub))?;
718 let end = WeeklyRollupKey::nsid_prefix_end(*t, &prefix_sub)?;
719 get_lexi_iter::<WeeklyRollupKey>(&snapshot, start, end)?
720 }
721 CursorBucket::AllTime => {
722 let start = cursor_child
723 .as_ref()
724 .map(|child| AllTimeRollupKey::after_nsid_prefix(child))
725 .unwrap_or_else(|| AllTimeRollupKey::after_nsid_prefix(&prefix_sub))?;
726 let end = AllTimeRollupKey::nsid_prefix_end(&prefix_sub)?;
727 get_lexi_iter::<AllTimeRollupKey>(&snapshot, start, end)?
728 }
729 };
730 iters.push(it);
731 }
732
733 // with apologies
734 let mut iters: Vec<_> = iters
735 .into_iter()
736 .map(|it| {
737 it.map(|bla| {
738 bla.map(|(nsid, v)| {
739 let Some(child) = Child::from_prefix(&nsid, &prefix) else {
740 panic!("failed from_prefix: {nsid:?} {prefix:?} (bad iter bounds?)");
741 };
742 (child, v)
743 })
744 })
745 .peekable()
746 })
747 .collect();
748
749 let mut items = Vec::new();
750 let mut prefix_count = CountsValue::default();
751 #[derive(Debug, Clone, PartialEq)]
752 enum Child {
753 FullNsid(Nsid),
754 ChildPrefix(String),
755 }
756 impl Child {
757 fn from_prefix(nsid: &Nsid, prefix: &NsidPrefix) -> Option<Self> {
758 if prefix.is_group_of(nsid) {
759 return Some(Child::FullNsid(nsid.clone()));
760 }
761 let suffix = nsid.as_str().strip_prefix(&format!("{}.", prefix.0))?;
762 let (segment, _) = suffix.split_once('.').unwrap();
763 let child_prefix = format!("{}.{segment}", prefix.0);
764 Some(Child::ChildPrefix(child_prefix))
765 }
766 fn is_before(&self, other: &Child) -> bool {
767 match (self, other) {
768 (Child::FullNsid(s), Child::ChildPrefix(o)) if s.as_str() == o => true,
769 (Child::ChildPrefix(s), Child::FullNsid(o)) if s == o.as_str() => false,
770 (Child::FullNsid(s), Child::FullNsid(o)) => s.as_str() < o.as_str(),
771 (Child::ChildPrefix(s), Child::ChildPrefix(o)) => s < o,
772 (Child::FullNsid(s), Child::ChildPrefix(o)) => s.to_string() < *o,
773 (Child::ChildPrefix(s), Child::FullNsid(o)) => *s < o.to_string(),
774 }
775 }
776 fn into_inner(self) -> String {
777 match self {
778 Child::FullNsid(s) => s.to_string(),
779 Child::ChildPrefix(s) => s,
780 }
781 }
782 }
783 let mut current_child: Option<Child> = None;
784 for _ in 0..limit {
785 // double-scan the iters for each element: this could be eliminated but we're starting simple.
786 // first scan: find the lowest nsid
787 // second scan: take + merge, and advance all iters with lowest nsid
788 let mut lowest: Option<Child> = None;
789 for iter in &mut iters {
790 if let Some(bla) = iter.peek_mut() {
791 let (child, _) = match bla {
792 Ok(v) => v,
793 Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?,
794 };
795
796 lowest = match lowest {
797 Some(ref current) if current.is_before(child) => lowest,
798 _ => Some(child.clone()),
799 };
800 }
801 }
802 current_child = lowest.clone();
803 let Some(child) = lowest else { break };
804
805 let mut merged = CountsValue::default();
806 for iter in &mut iters {
807 // unwrap: potential fjall error was already checked & bailed over when peeking in the first loop
808 while let Some(Ok((_, get_counts))) =
809 iter.next_if(|v| v.as_ref().unwrap().0 == child)
810 {
811 let counts = get_counts()?;
812 prefix_count.merge(&counts);
813 merged.merge(&counts);
814 }
815 }
816 items.push(match child {
817 Child::FullNsid(nsid) => PrefixChild::Collection(NsidCount::new(&nsid, &merged)),
818 Child::ChildPrefix(prefix) => {
819 PrefixChild::Prefix(PrefixCount::new(&prefix, &merged))
820 }
821 });
822 }
823
824 // TODO: could serialize the prefix count (with sketch) into the cursor so that uniqs can actually count up?
825 // ....er the sketch is probably too big
826 // TODO: this is probably buggy on child-type boundaries bleh
827 let next_cursor = current_child
828 .map(|s| s.into_inner().to_db_bytes())
829 .transpose()?;
830
831 Ok(((&prefix_count).into(), items, next_cursor))
832 }
833
834 fn get_prefix(
835 &self,
836 prefix: NsidPrefix,
837 limit: usize,
838 order: OrderCollectionsBy,
839 since: Option<HourTruncatedCursor>,
840 until: Option<HourTruncatedCursor>,
841 ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)> {
842 let snapshot = self.rollups.snapshot();
843 let buckets = if let (None, None) = (since, until) {
844 vec![CursorBucket::AllTime]
845 } else {
846 let mut lower = self.get_earliest_hour(Some(&snapshot))?;
847 if let Some(specified) = since {
848 if specified > lower {
849 lower = specified;
850 }
851 }
852 let upper = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into());
853 CursorBucket::buckets_spanning(lower, upper)
854 };
855 match order {
856 OrderCollectionsBy::Lexi { cursor } => {
857 self.get_lexi_prefix(snapshot, prefix, limit, cursor, buckets)
858 }
859 _ => todo!(),
860 }
861 }
862
863 /// - step: output series time step, in seconds
864 fn get_timeseries(
865 &self,
866 collections: Vec<Nsid>,
867 since: HourTruncatedCursor,
868 until: Option<HourTruncatedCursor>,
869 step: u64,
870 ) -> StorageResult<(Vec<HourTruncatedCursor>, CollectionSerieses)> {
871 if step > WEEK_IN_MICROS {
872 panic!("week-stepping is todo");
873 }
874 let until = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into());
875 let Ok(dt) = Cursor::from(until).duration_since(&Cursor::from(since)) else {
876 return Ok((
877 // empty: until < since
878 vec![],
879 collections.into_iter().map(|c| (c, vec![])).collect(),
880 ));
881 };
882 let n_hours = (dt.as_micros() as u64) / HOUR_IN_MICROS;
883 let mut counts_by_hour = Vec::with_capacity(n_hours as usize);
884 let snapshot = self.rollups.snapshot();
885 for hour in (0..n_hours).map(|i| since.nth_next(i)) {
886 let mut counts = Vec::with_capacity(collections.len());
887 for nsid in &collections {
888 let count = snapshot
889 .get(&HourlyRollupKey::new(hour, nsid).to_db_bytes()?)?
890 .as_deref()
891 .map(db_complete::<CountsValue>)
892 .transpose()?
893 .unwrap_or_default();
894 counts.push(count);
895 }
896 counts_by_hour.push((hour, counts));
897 }
898
899 let step_hours = step / (HOUR_IN_MICROS / 1_000_000);
900 let mut output_hours = Vec::with_capacity(step_hours as usize);
901 let mut output_series: CollectionSerieses = collections
902 .iter()
903 .map(|c| (c.clone(), Vec::with_capacity(step_hours as usize)))
904 .collect();
905
906 for chunk in counts_by_hour.chunks(step_hours as usize) {
907 output_hours.push(chunk[0].0); // always guaranteed to have at least one element in a chunks chunk
908 for (i, collection) in collections.iter().enumerate() {
909 let mut c = CountsValue::default();
910 for (_, counts) in chunk {
911 c.merge(&counts[i]);
912 }
913 output_series
914 .get_mut(collection)
915 .expect("output series is initialized with all collections")
916 .push(c);
917 }
918 }
919
920 Ok((output_hours, output_series))
921 }
922
923 fn get_collection_counts(
924 &self,
925 collection: &Nsid,
926 since: HourTruncatedCursor,
927 until: Option<HourTruncatedCursor>,
928 ) -> StorageResult<JustCount> {
929 // grab snapshots in case rollups happen while we're working
930 let rollups = self.rollups.snapshot();
931
932 let until = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into());
933 let buckets = CursorBucket::buckets_spanning(since, until);
934 let mut total_counts = CountsValue::default();
935
936 for bucket in buckets {
937 let key = match bucket {
938 CursorBucket::Hour(t) => HourlyRollupKey::new(t, collection).to_db_bytes()?,
939 CursorBucket::Week(t) => WeeklyRollupKey::new(t, collection).to_db_bytes()?,
940 CursorBucket::AllTime => unreachable!(), // TODO: fall back on this if the time span spans the whole dataset?
941 };
942 let count = rollups
943 .get(&key)?
944 .as_deref()
945 .map(db_complete::<CountsValue>)
946 .transpose()?
947 .unwrap_or_default();
948 total_counts.merge(&count);
949 }
950
951 Ok((&total_counts).into())
952 }
953
954 fn get_records_by_collections(
955 &self,
956 collections: HashSet<Nsid>,
957 limit: usize,
958 expand_each_collection: bool,
959 ) -> StorageResult<Vec<UFOsRecord>> {
960 if collections.is_empty() {
961 return Ok(vec![]);
962 }
963 let mut record_iterators = Vec::new();
964 for collection in collections {
965 let iter = RecordIterator::new(&self.feeds, self.records.clone(), &collection, limit)?;
966 record_iterators.push(iter.peekable());
967 }
968 let mut merged = Vec::new();
969 loop {
970 let mut latest: Option<(Cursor, usize)> = None; // ugh
971 for (i, iter) in record_iterators.iter_mut().enumerate() {
972 let Some(it) = iter.peek_mut() else {
973 continue;
974 };
975 let it = match it {
976 Ok(v) => v,
977 Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?,
978 };
979 let Some(rec) = it else {
980 if expand_each_collection {
981 continue;
982 } else {
983 break;
984 }
985 };
986 if let Some((cursor, _)) = latest {
987 if rec.cursor > cursor {
988 latest = Some((rec.cursor, i))
989 }
990 } else {
991 latest = Some((rec.cursor, i));
992 }
993 }
994 let Some((_, idx)) = latest else {
995 break;
996 };
997 // yeah yeah whateverrrrrrrrrrrrrrrr
998 merged.push(record_iterators[idx].next().unwrap().unwrap().unwrap());
999 }
1000 Ok(merged)
1001 }
1002
1003 fn search_collections(&self, terms: Vec<String>) -> StorageResult<Vec<NsidCount>> {
1004 let start = AllTimeRollupKey::start()?;
1005 let end = AllTimeRollupKey::end()?;
1006 let mut matches = Vec::new();
1007 let limit = 16; // TODO: param
1008 for kv in self.rollups.range((start, end)) {
1009 let (key_bytes, val_bytes) = kv?;
1010 let key = db_complete::<AllTimeRollupKey>(&key_bytes)?;
1011 let nsid = key.collection();
1012 for term in &terms {
1013 if nsid.contains(term) {
1014 let counts = db_complete::<CountsValue>(&val_bytes)?;
1015 matches.push(NsidCount::new(nsid, &counts));
1016 break;
1017 }
1018 }
1019 if matches.len() >= limit {
1020 break;
1021 }
1022 }
1023 // TODO: indicate incomplete results
1024 Ok(matches)
1025 }
1026}
1027
1028#[async_trait]
1029impl StoreReader for FjallReader {
1030 fn name(&self) -> String {
1031 "fjall storage v2".into()
1032 }
1033 fn update_metrics(&self) {
1034 gauge!("storage_fjall_l0_run_count", "partition" => "global")
1035 .set(self.global.tree.l0_run_count() as f64);
1036 gauge!("storage_fjall_l0_run_count", "partition" => "feeds")
1037 .set(self.feeds.tree.l0_run_count() as f64);
1038 gauge!("storage_fjall_l0_run_count", "partition" => "records")
1039 .set(self.records.tree.l0_run_count() as f64);
1040 gauge!("storage_fjall_l0_run_count", "partition" => "rollups")
1041 .set(self.rollups.tree.l0_run_count() as f64);
1042 gauge!("storage_fjall_l0_run_count", "partition" => "queues")
1043 .set(self.queues.tree.l0_run_count() as f64);
1044 gauge!("storage_fjall_keyspace_disk_space").set(self.keyspace.disk_space() as f64);
1045 gauge!("storage_fjall_journal_count").set(self.keyspace.journal_count() as f64);
1046 gauge!("storage_fjall_keyspace_sequence").set(self.keyspace.instant() as f64);
1047 }
1048 async fn get_storage_stats(&self) -> StorageResult<serde_json::Value> {
1049 let s = self.clone();
1050 tokio::task::spawn_blocking(move || FjallReader::get_storage_stats(&s)).await?
1051 }
1052 async fn get_consumer_info(&self) -> StorageResult<ConsumerInfo> {
1053 let s = self.clone();
1054 tokio::task::spawn_blocking(move || FjallReader::get_consumer_info(&s)).await?
1055 }
1056 async fn get_collections(
1057 &self,
1058 limit: usize,
1059 order: OrderCollectionsBy,
1060 since: Option<HourTruncatedCursor>,
1061 until: Option<HourTruncatedCursor>,
1062 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> {
1063 let s = self.clone();
1064 tokio::task::spawn_blocking(move || {
1065 FjallReader::get_collections(&s, limit, order, since, until)
1066 })
1067 .await?
1068 }
1069 async fn get_prefix(
1070 &self,
1071 prefix: NsidPrefix,
1072 limit: usize,
1073 order: OrderCollectionsBy,
1074 since: Option<HourTruncatedCursor>,
1075 until: Option<HourTruncatedCursor>,
1076 ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)> {
1077 let s = self.clone();
1078 tokio::task::spawn_blocking(move || {
1079 FjallReader::get_prefix(&s, prefix, limit, order, since, until)
1080 })
1081 .await?
1082 }
1083 async fn get_timeseries(
1084 &self,
1085 collections: Vec<Nsid>,
1086 since: HourTruncatedCursor,
1087 until: Option<HourTruncatedCursor>,
1088 step: u64,
1089 ) -> StorageResult<(Vec<HourTruncatedCursor>, CollectionSerieses)> {
1090 let s = self.clone();
1091 tokio::task::spawn_blocking(move || {
1092 FjallReader::get_timeseries(&s, collections, since, until, step)
1093 })
1094 .await?
1095 }
1096 async fn get_collection_counts(
1097 &self,
1098 collection: &Nsid,
1099 since: HourTruncatedCursor,
1100 until: Option<HourTruncatedCursor>,
1101 ) -> StorageResult<JustCount> {
1102 let s = self.clone();
1103 let collection = collection.clone();
1104 tokio::task::spawn_blocking(move || {
1105 FjallReader::get_collection_counts(&s, &collection, since, until)
1106 })
1107 .await?
1108 }
1109 async fn get_records_by_collections(
1110 &self,
1111 collections: HashSet<Nsid>,
1112 limit: usize,
1113 expand_each_collection: bool,
1114 ) -> StorageResult<Vec<UFOsRecord>> {
1115 let s = self.clone();
1116 tokio::task::spawn_blocking(move || {
1117 FjallReader::get_records_by_collections(&s, collections, limit, expand_each_collection)
1118 })
1119 .await?
1120 }
1121 async fn search_collections(&self, terms: Vec<String>) -> StorageResult<Vec<NsidCount>> {
1122 let s = self.clone();
1123 tokio::task::spawn_blocking(move || FjallReader::search_collections(&s, terms)).await?
1124 }
1125}
1126
1127#[derive(Clone)]
1128pub struct FjallWriter {
1129 bg_taken: Arc<AtomicBool>,
1130 keyspace: Keyspace,
1131 global: PartitionHandle,
1132 feeds: PartitionHandle,
1133 records: PartitionHandle,
1134 rollups: PartitionHandle,
1135 queues: PartitionHandle,
1136}
1137
1138impl FjallWriter {
1139 fn describe_metrics(&self) {
1140 describe_histogram!(
1141 "storage_insert_batch_db_batch_items",
1142 Unit::Count,
1143 "how many items are in the fjall batch for batched inserts"
1144 );
1145 describe_histogram!(
1146 "storage_rollup_counts_db_batch_items",
1147 Unit::Count,
1148 "how many items are in the fjall batch for a timlies rollup"
1149 );
1150 describe_counter!(
1151 "storage_delete_account_partial_commits",
1152 Unit::Count,
1153 "fjall checkpoint commits for cleaning up accounts with too many records"
1154 );
1155 describe_counter!(
1156 "storage_delete_account_completions",
1157 Unit::Count,
1158 "total count of account deletes handled"
1159 );
1160 describe_counter!(
1161 "storage_delete_account_records_deleted",
1162 Unit::Count,
1163 "total records deleted when handling account deletes"
1164 );
1165 describe_histogram!(
1166 "storage_trim_dirty_nsids",
1167 Unit::Count,
1168 "number of NSIDs trimmed"
1169 );
1170 describe_histogram!(
1171 "storage_trim_duration",
1172 Unit::Microseconds,
1173 "how long it took to trim the dirty NSIDs"
1174 );
1175 describe_counter!(
1176 "storage_trim_removed",
1177 Unit::Count,
1178 "how many records were removed during trim"
1179 );
1180 }
1181 fn rollup_delete_account(
1182 &mut self,
1183 cursor: Cursor,
1184 key_bytes: &[u8],
1185 val_bytes: &[u8],
1186 ) -> StorageResult<usize> {
1187 let did = db_complete::<DeleteAccountQueueVal>(val_bytes)?;
1188 self.delete_account(&did)?;
1189 let mut batch = self.keyspace.batch();
1190 batch.remove(&self.queues, key_bytes);
1191 insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, cursor)?;
1192 batch.commit()?;
1193 Ok(1)
1194 }
1195
1196 fn rollup_live_counts(
1197 &mut self,
1198 timelies: impl Iterator<Item = Result<(fjall::Slice, fjall::Slice), fjall::Error>>,
1199 cursor_exclusive_limit: Option<Cursor>,
1200 rollup_limit: usize,
1201 ) -> StorageResult<(usize, HashSet<Nsid>)> {
1202 // current strategy is to buffer counts in mem before writing the rollups
1203 // we *could* read+write every single batch to rollup.. but their merge is associative so
1204 // ...so save the db some work up front? is this worth it? who knows...
1205
1206 let mut dirty_nsids = HashSet::new();
1207
1208 #[derive(Eq, Hash, PartialEq)]
1209 enum Rollup {
1210 Hourly(HourTruncatedCursor),
1211 Weekly(WeekTruncatedCursor),
1212 AllTime,
1213 }
1214
1215 let mut batch = self.keyspace.batch();
1216 let mut cursors_advanced = 0;
1217 let mut last_cursor = Cursor::from_start();
1218 let mut counts_by_rollup: HashMap<(Nsid, Rollup), CountsValue> = HashMap::new();
1219
1220 for (i, kv) in timelies.enumerate() {
1221 if i >= rollup_limit {
1222 break;
1223 }
1224
1225 let (key_bytes, val_bytes) = kv?;
1226 let key = db_complete::<LiveCountsKey>(&key_bytes)?;
1227
1228 if cursor_exclusive_limit
1229 .map(|limit| key.cursor() > limit)
1230 .unwrap_or(false)
1231 {
1232 break;
1233 }
1234
1235 dirty_nsids.insert(key.collection().clone());
1236
1237 batch.remove(&self.rollups, key_bytes);
1238 let val = db_complete::<CountsValue>(&val_bytes)?;
1239 counts_by_rollup
1240 .entry((
1241 key.collection().clone(),
1242 Rollup::Hourly(key.cursor().into()),
1243 ))
1244 .or_default()
1245 .merge(&val);
1246 counts_by_rollup
1247 .entry((
1248 key.collection().clone(),
1249 Rollup::Weekly(key.cursor().into()),
1250 ))
1251 .or_default()
1252 .merge(&val);
1253 counts_by_rollup
1254 .entry((key.collection().clone(), Rollup::AllTime))
1255 .or_default()
1256 .merge(&val);
1257
1258 cursors_advanced += 1;
1259 last_cursor = key.cursor();
1260 }
1261
1262 // go through each new rollup thing and merge it with whatever might already be in the db
1263 for ((nsid, rollup), counts) in counts_by_rollup {
1264 let rollup_key_bytes = match rollup {
1265 Rollup::Hourly(hourly_cursor) => {
1266 HourlyRollupKey::new(hourly_cursor, &nsid).to_db_bytes()?
1267 }
1268 Rollup::Weekly(weekly_cursor) => {
1269 WeeklyRollupKey::new(weekly_cursor, &nsid).to_db_bytes()?
1270 }
1271 Rollup::AllTime => AllTimeRollupKey::new(&nsid).to_db_bytes()?,
1272 };
1273 let mut rolled: CountsValue = self
1274 .rollups
1275 .get(&rollup_key_bytes)?
1276 .as_deref()
1277 .map(db_complete::<CountsValue>)
1278 .transpose()?
1279 .unwrap_or_default();
1280
1281 // now that we have values, we can know the exising ranks
1282 let before_creates_count = rolled.counts().creates;
1283 let before_dids_estimate = rolled.dids().estimate() as u64;
1284
1285 // update the rollup
1286 rolled.merge(&counts);
1287
1288 // new ranks
1289 let new_creates_count = rolled.counts().creates;
1290 let new_dids_estimate = rolled.dids().estimate() as u64;
1291
1292 // update create-ranked secondary index if rank changed
1293 if new_creates_count != before_creates_count {
1294 let (old_k, new_k) = match rollup {
1295 Rollup::Hourly(cursor) => (
1296 HourlyRecordsKey::new(cursor, before_creates_count.into(), &nsid)
1297 .to_db_bytes()?,
1298 HourlyRecordsKey::new(cursor, new_creates_count.into(), &nsid)
1299 .to_db_bytes()?,
1300 ),
1301 Rollup::Weekly(cursor) => (
1302 WeeklyRecordsKey::new(cursor, before_creates_count.into(), &nsid)
1303 .to_db_bytes()?,
1304 WeeklyRecordsKey::new(cursor, new_creates_count.into(), &nsid)
1305 .to_db_bytes()?,
1306 ),
1307 Rollup::AllTime => (
1308 AllTimeRecordsKey::new(before_creates_count.into(), &nsid).to_db_bytes()?,
1309 AllTimeRecordsKey::new(new_creates_count.into(), &nsid).to_db_bytes()?,
1310 ),
1311 };
1312 // remove_weak is allowed here because the secondary ranking index only ever inserts once at a key
1313 batch.remove_weak(&self.rollups, &old_k);
1314 batch.insert(&self.rollups, &new_k, "");
1315 }
1316
1317 // update dids-ranked secondary index if rank changed
1318 if new_dids_estimate != before_dids_estimate {
1319 let (old_k, new_k) = match rollup {
1320 Rollup::Hourly(cursor) => (
1321 HourlyDidsKey::new(cursor, before_dids_estimate.into(), &nsid)
1322 .to_db_bytes()?,
1323 HourlyDidsKey::new(cursor, new_dids_estimate.into(), &nsid)
1324 .to_db_bytes()?,
1325 ),
1326 Rollup::Weekly(cursor) => (
1327 WeeklyDidsKey::new(cursor, before_dids_estimate.into(), &nsid)
1328 .to_db_bytes()?,
1329 WeeklyDidsKey::new(cursor, new_dids_estimate.into(), &nsid)
1330 .to_db_bytes()?,
1331 ),
1332 Rollup::AllTime => (
1333 AllTimeDidsKey::new(before_dids_estimate.into(), &nsid).to_db_bytes()?,
1334 AllTimeDidsKey::new(new_dids_estimate.into(), &nsid).to_db_bytes()?,
1335 ),
1336 };
1337 // remove_weak is allowed here because the secondary ranking index only ever inserts once at a key
1338 batch.remove_weak(&self.rollups, &old_k);
1339 batch.insert(&self.rollups, &new_k, "");
1340 }
1341
1342 // replace the main counts rollup
1343 batch.insert(&self.rollups, &rollup_key_bytes, &rolled.to_db_bytes()?);
1344 }
1345
1346 insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, last_cursor)?;
1347
1348 histogram!("storage_rollup_counts_db_batch_items").record(batch.len() as f64);
1349 batch.commit()?;
1350 Ok((cursors_advanced, dirty_nsids))
1351 }
1352}
1353
1354impl StoreWriter<FjallBackground> for FjallWriter {
1355 fn background_tasks(&mut self, reroll: bool) -> StorageResult<FjallBackground> {
1356 if self.bg_taken.swap(true, Ordering::SeqCst) {
1357 return Err(StorageError::BackgroundAlreadyStarted);
1358 }
1359 if reroll {
1360 log::info!("reroll: resetting rollup cursor...");
1361 insert_static_neu::<NewRollupCursorKey>(&self.global, Cursor::from_start())?;
1362 log::info!("reroll: clearing trim cursors...");
1363 let mut batch = self.keyspace.batch();
1364 for kv in self
1365 .global
1366 .prefix(TrimCollectionCursorKey::from_prefix_to_db_bytes(
1367 &Default::default(),
1368 )?)
1369 {
1370 let (k, _) = kv?;
1371 batch.remove(&self.global, k);
1372 }
1373 let n = batch.len();
1374 batch.commit()?;
1375 log::info!("reroll: cleared {n} trim cursors.");
1376 }
1377 Ok(FjallBackground(self.clone()))
1378 }
1379
1380 fn insert_batch<const LIMIT: usize>(
1381 &mut self,
1382 event_batch: EventBatch<LIMIT>,
1383 ) -> StorageResult<()> {
1384 if event_batch.is_empty() {
1385 return Ok(());
1386 }
1387
1388 let mut batch = self.keyspace.batch();
1389
1390 // would be nice not to have to iterate everything at once here
1391 let latest = event_batch.latest_cursor().unwrap();
1392
1393 for (nsid, commits) in event_batch.commits_by_nsid {
1394 for commit in commits.commits {
1395 let location_key: RecordLocationKey = (&commit, &nsid).into();
1396
1397 match commit.action {
1398 CommitAction::Cut => {
1399 batch.remove(&self.records, &location_key.to_db_bytes()?);
1400 }
1401 CommitAction::Put(put_action) => {
1402 let feed_key = NsidRecordFeedKey::from_pair(nsid.clone(), commit.cursor);
1403 let feed_val: NsidRecordFeedVal =
1404 (&commit.did, &commit.rkey, commit.rev.as_str()).into();
1405 batch.insert(
1406 &self.feeds,
1407 feed_key.to_db_bytes()?,
1408 feed_val.to_db_bytes()?,
1409 );
1410
1411 let location_val: RecordLocationVal =
1412 (commit.cursor, commit.rev.as_str(), put_action).into();
1413 batch.insert(
1414 &self.records,
1415 &location_key.to_db_bytes()?,
1416 &location_val.to_db_bytes()?,
1417 );
1418 }
1419 }
1420 }
1421 let live_counts_key: LiveCountsKey = (latest, &nsid).into();
1422 let counts_value = CountsValue::new(
1423 CommitCounts {
1424 creates: commits.creates as u64,
1425 updates: commits.updates as u64,
1426 deletes: commits.deletes as u64,
1427 },
1428 commits.dids_estimate,
1429 );
1430 batch.insert(
1431 &self.rollups,
1432 &live_counts_key.to_db_bytes()?,
1433 &counts_value.to_db_bytes()?,
1434 );
1435 }
1436
1437 for remove in event_batch.account_removes {
1438 let queue_key = DeleteAccountQueueKey::new(remove.cursor);
1439 let queue_val: DeleteAccountQueueVal = remove.did;
1440 batch.insert(
1441 &self.queues,
1442 &queue_key.to_db_bytes()?,
1443 &queue_val.to_db_bytes()?,
1444 );
1445 }
1446
1447 batch.insert(
1448 &self.global,
1449 DbStaticStr::<JetstreamCursorKey>::default().to_db_bytes()?,
1450 latest.to_db_bytes()?,
1451 );
1452
1453 histogram!("storage_insert_batch_db_batch_items").record(batch.len() as f64);
1454 batch.commit()?;
1455 Ok(())
1456 }
1457
1458 fn step_rollup(&mut self) -> StorageResult<(usize, HashSet<Nsid>)> {
1459 let mut dirty_nsids = HashSet::new();
1460
1461 let rollup_cursor =
1462 get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?.ok_or(
1463 StorageError::BadStateError("Could not find current rollup cursor".to_string()),
1464 )?;
1465
1466 // timelies
1467 let live_counts_range = LiveCountsKey::range_from_cursor(rollup_cursor)?;
1468 let mut timely_iter = self.rollups.range(live_counts_range).peekable();
1469
1470 let timely_next = timely_iter
1471 .peek_mut()
1472 .map(|kv| -> StorageResult<LiveCountsKey> {
1473 match kv {
1474 Err(e) => Err(std::mem::replace(e, fjall::Error::Poisoned))?,
1475 Ok((key_bytes, _)) => {
1476 let key = db_complete::<LiveCountsKey>(key_bytes)?;
1477 Ok(key)
1478 }
1479 }
1480 })
1481 .transpose()?;
1482
1483 // delete accounts
1484 let delete_accounts_range =
1485 DeleteAccountQueueKey::new(rollup_cursor).range_to_prefix_end()?;
1486
1487 let next_delete = self
1488 .queues
1489 .range(delete_accounts_range)
1490 .next()
1491 .transpose()?
1492 .map(|(key_bytes, val_bytes)| {
1493 db_complete::<DeleteAccountQueueKey>(&key_bytes)
1494 .map(|k| (k.suffix, key_bytes, val_bytes))
1495 })
1496 .transpose()?;
1497
1498 let cursors_stepped = match (timely_next, next_delete) {
1499 (Some(timely), Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => {
1500 if timely.cursor() < delete_cursor {
1501 let (n, dirty) = self.rollup_live_counts(
1502 timely_iter,
1503 Some(delete_cursor),
1504 MAX_BATCHED_ROLLUP_COUNTS,
1505 )?;
1506 dirty_nsids.extend(dirty);
1507 n
1508 } else {
1509 self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)?
1510 }
1511 }
1512 (Some(_), None) => {
1513 let (n, dirty) =
1514 self.rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS)?;
1515 dirty_nsids.extend(dirty);
1516 n
1517 }
1518 (None, Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => {
1519 self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)?
1520 }
1521 (None, None) => 0,
1522 };
1523
1524 Ok((cursors_stepped, dirty_nsids))
1525 }
1526
1527 fn trim_collection(
1528 &mut self,
1529 collection: &Nsid,
1530 limit: usize,
1531 full_scan: bool,
1532 ) -> StorageResult<(usize, usize, bool)> {
1533 let mut dangling_feed_keys_cleaned = 0;
1534 let mut records_deleted = 0;
1535
1536 let live_range = if full_scan {
1537 let start = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?;
1538 let end = NsidRecordFeedKey::prefix_range_end(collection)?;
1539 start..end
1540 } else {
1541 let feed_trim_cursor_key =
1542 TrimCollectionCursorKey::new(collection.clone()).to_db_bytes()?;
1543 let trim_cursor = self
1544 .global
1545 .get(&feed_trim_cursor_key)?
1546 .map(|value_bytes| db_complete(&value_bytes))
1547 .transpose()?
1548 .unwrap_or(Cursor::from_start());
1549 NsidRecordFeedKey::from_pair(collection.clone(), trim_cursor).range_to_prefix_end()?
1550 };
1551
1552 let mut live_records_found = 0;
1553 let mut candidate_new_feed_lower_cursor = None;
1554 let ended_early = false;
1555 let mut current_cursor: Option<Cursor> = None;
1556 for (i, kv) in self.feeds.range(live_range).rev().enumerate() {
1557 if i > 0 && i % 500_000 == 0 {
1558 log::info!(
1559 "trim: at {i} for {:?} (now at {})",
1560 collection.to_string(),
1561 current_cursor
1562 .map(|c| c
1563 .elapsed()
1564 .map(nice_duration)
1565 .unwrap_or("[not past]".into()))
1566 .unwrap_or("??".into()),
1567 );
1568 }
1569 let (key_bytes, val_bytes) = kv?;
1570 let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?;
1571 let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?;
1572 let location_key: RecordLocationKey = (&feed_key, &feed_val).into();
1573 let location_key_bytes = location_key.to_db_bytes()?;
1574
1575 let Some(location_val_bytes) = self.records.get(&location_key_bytes)? else {
1576 // record was deleted (hopefully)
1577 self.feeds.remove(&*key_bytes)?;
1578 dangling_feed_keys_cleaned += 1;
1579 continue;
1580 };
1581
1582 let (meta, _) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?;
1583 current_cursor = Some(meta.cursor());
1584
1585 if meta.cursor() != feed_key.cursor() {
1586 // older/different version
1587 self.feeds.remove(&*key_bytes)?;
1588 dangling_feed_keys_cleaned += 1;
1589 continue;
1590 }
1591 if meta.rev != feed_val.rev() {
1592 // weird...
1593 log::warn!("record lookup: cursor match but rev did not...? removing.");
1594 self.records.remove(&location_key_bytes)?;
1595 self.feeds.remove(&*key_bytes)?;
1596 dangling_feed_keys_cleaned += 1;
1597 continue;
1598 }
1599
1600 live_records_found += 1;
1601 if live_records_found <= limit {
1602 continue;
1603 }
1604 if candidate_new_feed_lower_cursor.is_none() {
1605 candidate_new_feed_lower_cursor = Some(feed_key.cursor());
1606 }
1607
1608 self.records.remove(&location_key_bytes)?;
1609 self.feeds.remove(key_bytes)?;
1610 records_deleted += 1;
1611 }
1612
1613 if !ended_early {
1614 if let Some(new_cursor) = candidate_new_feed_lower_cursor {
1615 self.global.insert(
1616 &TrimCollectionCursorKey::new(collection.clone()).to_db_bytes()?,
1617 &new_cursor.to_db_bytes()?,
1618 )?;
1619 }
1620 }
1621
1622 log::trace!("trim_collection ({collection:?}) removed {dangling_feed_keys_cleaned} dangling feed entries and {records_deleted} records (ended early? {ended_early})");
1623 Ok((dangling_feed_keys_cleaned, records_deleted, ended_early))
1624 }
1625
1626 fn delete_account(&mut self, did: &Did) -> Result<usize, StorageError> {
1627 let mut records_deleted = 0;
1628 let mut batch = self.keyspace.batch();
1629 let prefix = RecordLocationKey::from_prefix_to_db_bytes(did)?;
1630 for kv in self.records.prefix(prefix) {
1631 let (key_bytes, _) = kv?;
1632 batch.remove(&self.records, key_bytes);
1633 records_deleted += 1;
1634 if batch.len() >= MAX_BATCHED_ACCOUNT_DELETE_RECORDS {
1635 counter!("storage_delete_account_partial_commits").increment(1);
1636 batch.commit()?;
1637 batch = self.keyspace.batch();
1638 }
1639 }
1640 counter!("storage_delete_account_completions").increment(1);
1641 counter!("storage_delete_account_records_deleted").increment(records_deleted as u64);
1642 batch.commit()?;
1643 Ok(records_deleted)
1644 }
1645}
1646
1647pub struct FjallBackground(FjallWriter);
1648
1649#[async_trait]
1650impl StoreBackground for FjallBackground {
1651 async fn run(mut self, backfill: bool) -> StorageResult<()> {
1652 let mut dirty_nsids = HashSet::new();
1653
1654 // backfill condition here is iffy -- longer is good when doing the main ingest and then collection trims
1655 // shorter once those are done helps things catch up
1656 // the best setting for non-backfill is non-obvious.. it can be pretty slow and still be fine
1657 let mut rollup =
1658 tokio::time::interval(Duration::from_micros(if backfill { 100 } else { 32_000 }));
1659 rollup.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1660
1661 // backfill condition again iffy. collection trims should probably happen in their own phase.
1662 let mut trim = tokio::time::interval(Duration::from_secs(if backfill { 18 } else { 9 }));
1663 trim.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1664
1665 loop {
1666 tokio::select! {
1667 _ = rollup.tick() => {
1668 let mut db = self.0.clone();
1669 let (n, dirty) = tokio::task::spawn_blocking(move || db.step_rollup()).await??;
1670 if n == 0 {
1671 rollup.reset_after(Duration::from_millis(1_200)); // we're caught up, take a break
1672 }
1673 dirty_nsids.extend(dirty);
1674 log::trace!("rolled up {n} items ({} collections now dirty)", dirty_nsids.len());
1675 },
1676 _ = trim.tick() => {
1677 let n = dirty_nsids.len();
1678 log::trace!("trimming {n} nsids: {dirty_nsids:?}");
1679 let t0 = Instant::now();
1680 let (mut total_danglers, mut total_deleted) = (0, 0);
1681 let mut completed = HashSet::new();
1682 for collection in &dirty_nsids {
1683 let mut db = self.0.clone();
1684 let c = collection.clone();
1685 let (danglers, deleted, ended_early) = tokio::task::spawn_blocking(move || db.trim_collection(&c, 512, false)).await??;
1686 total_danglers += danglers;
1687 total_deleted += deleted;
1688 if !ended_early {
1689 completed.insert(collection.clone());
1690 }
1691 if total_deleted > 10_000_000 {
1692 log::info!("trim stopped early, more than 10M records already deleted.");
1693 break;
1694 }
1695 }
1696 let dt = t0.elapsed();
1697 log::trace!("finished trimming {n} nsids in {dt:?}: {total_danglers} dangling and {total_deleted} total removed.");
1698 histogram!("storage_trim_dirty_nsids").record(completed.len() as f64);
1699 histogram!("storage_trim_duration").record(dt.as_micros() as f64);
1700 counter!("storage_trim_removed", "dangling" => "true").increment(total_danglers as u64);
1701 if total_deleted >= total_danglers {
1702 counter!("storage_trim_removed", "dangling" => "false").increment((total_deleted - total_danglers) as u64);
1703 } else {
1704 // TODO: probably think through what's happening here
1705 log::warn!("weird trim case: more danglers than deleted? metric will be missing for dangling=false. deleted={total_deleted} danglers={total_danglers}");
1706 }
1707 for c in completed {
1708 dirty_nsids.remove(&c);
1709 }
1710 },
1711 };
1712 }
1713 }
1714}
1715
1716/// Get a value from a fixed key
1717fn get_static_neu<K: StaticStr, V: DbBytes>(global: &PartitionHandle) -> StorageResult<Option<V>> {
1718 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
1719 let value = global
1720 .get(&key_bytes)?
1721 .map(|value_bytes| db_complete(&value_bytes))
1722 .transpose()?;
1723 Ok(value)
1724}
1725
1726/// Get a value from a fixed key
1727fn get_snapshot_static_neu<K: StaticStr, V: DbBytes>(
1728 global: &fjall::Snapshot,
1729) -> StorageResult<Option<V>> {
1730 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
1731 let value = global
1732 .get(&key_bytes)?
1733 .map(|value_bytes| db_complete(&value_bytes))
1734 .transpose()?;
1735 Ok(value)
1736}
1737
1738/// Set a value to a fixed key
1739fn insert_static_neu<K: StaticStr>(
1740 global: &PartitionHandle,
1741 value: impl DbBytes,
1742) -> StorageResult<()> {
1743 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
1744 let value_bytes = value.to_db_bytes()?;
1745 global.insert(&key_bytes, &value_bytes)?;
1746 Ok(())
1747}
1748
1749/// Set a value to a fixed key, erroring if the value already exists
1750///
1751/// Intended for single-threaded init: not safe under concurrency, since there
1752/// is no transaction between checking if the already exists and writing it.
1753fn init_static_neu<K: StaticStr>(
1754 global: &PartitionHandle,
1755 value: impl DbBytes,
1756) -> StorageResult<()> {
1757 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
1758 if global.get(&key_bytes)?.is_some() {
1759 return Err(StorageError::InitError(format!(
1760 "init failed: value for key {key_bytes:?} already exists"
1761 )));
1762 }
1763 let value_bytes = value.to_db_bytes()?;
1764 global.insert(&key_bytes, &value_bytes)?;
1765 Ok(())
1766}
1767
1768/// Set a value to a fixed key
1769fn insert_batch_static_neu<K: StaticStr>(
1770 batch: &mut FjallBatch,
1771 global: &PartitionHandle,
1772 value: impl DbBytes,
1773) -> StorageResult<()> {
1774 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
1775 let value_bytes = value.to_db_bytes()?;
1776 batch.insert(global, &key_bytes, &value_bytes);
1777 Ok(())
1778}
1779
1780#[derive(Debug, serde::Serialize, schemars::JsonSchema)]
1781pub struct StorageInfo {
1782 pub keyspace_disk_space: u64,
1783 pub keyspace_journal_count: usize,
1784 pub keyspace_sequence: u64,
1785 pub global_approximate_len: usize,
1786}
1787
1788////////// temp stuff to remove:
1789
1790#[cfg(test)]
1791mod tests {
1792 use super::*;
1793 use crate::{DeleteAccount, RecordKey, UFOsCommit};
1794 use jetstream::events::{CommitEvent, CommitOp};
1795 use jetstream::exports::Cid;
1796 use serde_json::value::RawValue;
1797
1798 fn fjall_db() -> (FjallReader, FjallWriter) {
1799 let (read, write, _, _) = FjallStorage::init(
1800 tempfile::tempdir().unwrap(),
1801 "offline test (no real jetstream endpoint)".to_string(),
1802 false,
1803 FjallConfig { temp: true },
1804 )
1805 .unwrap();
1806 (read, write)
1807 }
1808
1809 const TEST_BATCH_LIMIT: usize = 16;
1810 fn beginning() -> HourTruncatedCursor {
1811 Cursor::from_start().into()
1812 }
1813
1814 #[derive(Debug, Default)]
1815 struct TestBatch {
1816 pub batch: EventBatch<TEST_BATCH_LIMIT>,
1817 }
1818
1819 impl TestBatch {
1820 #[allow(clippy::too_many_arguments)]
1821 pub fn create(
1822 &mut self,
1823 did: &str,
1824 collection: &str,
1825 rkey: &str,
1826 record: &str,
1827 rev: Option<&str>,
1828 cid: Option<Cid>,
1829 cursor: u64,
1830 ) -> Nsid {
1831 let did = Did::new(did.to_string()).unwrap();
1832 let collection = Nsid::new(collection.to_string()).unwrap();
1833 let record = RawValue::from_string(record.to_string()).unwrap();
1834 let cid = cid.unwrap_or(
1835 "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy"
1836 .parse()
1837 .unwrap(),
1838 );
1839
1840 let event = CommitEvent {
1841 collection,
1842 rkey: RecordKey::new(rkey.to_string()).unwrap(),
1843 rev: rev.unwrap_or("asdf").to_string(),
1844 operation: CommitOp::Create,
1845 record: Some(record),
1846 cid: Some(cid),
1847 };
1848
1849 let (commit, collection) =
1850 UFOsCommit::from_commit_info(event, did.clone(), Cursor::from_raw_u64(cursor))
1851 .unwrap();
1852
1853 self.batch
1854 .commits_by_nsid
1855 .entry(collection.clone())
1856 .or_default()
1857 .truncating_insert(commit, &[0u8; 16])
1858 .unwrap();
1859
1860 collection
1861 }
1862 #[allow(clippy::too_many_arguments)]
1863 pub fn update(
1864 &mut self,
1865 did: &str,
1866 collection: &str,
1867 rkey: &str,
1868 record: &str,
1869 rev: Option<&str>,
1870 cid: Option<Cid>,
1871 cursor: u64,
1872 ) -> Nsid {
1873 let did = Did::new(did.to_string()).unwrap();
1874 let collection = Nsid::new(collection.to_string()).unwrap();
1875 let record = RawValue::from_string(record.to_string()).unwrap();
1876 let cid = cid.unwrap_or(
1877 "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy"
1878 .parse()
1879 .unwrap(),
1880 );
1881
1882 let event = CommitEvent {
1883 collection,
1884 rkey: RecordKey::new(rkey.to_string()).unwrap(),
1885 rev: rev.unwrap_or("asdf").to_string(),
1886 operation: CommitOp::Update,
1887 record: Some(record),
1888 cid: Some(cid),
1889 };
1890
1891 let (commit, collection) =
1892 UFOsCommit::from_commit_info(event, did.clone(), Cursor::from_raw_u64(cursor))
1893 .unwrap();
1894
1895 self.batch
1896 .commits_by_nsid
1897 .entry(collection.clone())
1898 .or_default()
1899 .truncating_insert(commit, &[0u8; 16])
1900 .unwrap();
1901
1902 collection
1903 }
1904 #[allow(clippy::too_many_arguments)]
1905 pub fn delete(
1906 &mut self,
1907 did: &str,
1908 collection: &str,
1909 rkey: &str,
1910 rev: Option<&str>,
1911 cursor: u64,
1912 ) -> Nsid {
1913 let did = Did::new(did.to_string()).unwrap();
1914 let collection = Nsid::new(collection.to_string()).unwrap();
1915 let event = CommitEvent {
1916 collection,
1917 rkey: RecordKey::new(rkey.to_string()).unwrap(),
1918 rev: rev.unwrap_or("asdf").to_string(),
1919 operation: CommitOp::Delete,
1920 record: None,
1921 cid: None,
1922 };
1923
1924 let (commit, collection) =
1925 UFOsCommit::from_commit_info(event, did, Cursor::from_raw_u64(cursor)).unwrap();
1926
1927 self.batch
1928 .commits_by_nsid
1929 .entry(collection.clone())
1930 .or_default()
1931 .truncating_insert(commit, &[0u8; 16])
1932 .unwrap();
1933
1934 collection
1935 }
1936 pub fn delete_account(&mut self, did: &str, cursor: u64) -> Did {
1937 let did = Did::new(did.to_string()).unwrap();
1938 self.batch.account_removes.push(DeleteAccount {
1939 did: did.clone(),
1940 cursor: Cursor::from_raw_u64(cursor),
1941 });
1942 did
1943 }
1944 }
1945
1946 #[test]
1947 fn test_hello() -> anyhow::Result<()> {
1948 let (read, mut write) = fjall_db();
1949 write.insert_batch::<TEST_BATCH_LIMIT>(EventBatch::default())?;
1950 let JustCount {
1951 creates,
1952 dids_estimate,
1953 ..
1954 } = read.get_collection_counts(
1955 &Nsid::new("a.b.c".to_string()).unwrap(),
1956 beginning(),
1957 None,
1958 )?;
1959 assert_eq!(creates, 0);
1960 assert_eq!(dids_estimate, 0);
1961 Ok(())
1962 }
1963
1964 #[test]
1965 fn test_insert_one() -> anyhow::Result<()> {
1966 let (read, mut write) = fjall_db();
1967
1968 let mut batch = TestBatch::default();
1969 let collection = batch.create(
1970 "did:plc:inze6wrmsm7pjl7yta3oig77",
1971 "a.b.c",
1972 "asdf",
1973 "{}",
1974 Some("rev-z"),
1975 None,
1976 100,
1977 );
1978 write.insert_batch(batch.batch)?;
1979 write.step_rollup()?;
1980
1981 let JustCount {
1982 creates,
1983 dids_estimate,
1984 ..
1985 } = read.get_collection_counts(&collection, beginning(), None)?;
1986 assert_eq!(creates, 1);
1987 assert_eq!(dids_estimate, 1);
1988 let JustCount {
1989 creates,
1990 dids_estimate,
1991 ..
1992 } = read.get_collection_counts(
1993 &Nsid::new("d.e.f".to_string()).unwrap(),
1994 beginning(),
1995 None,
1996 )?;
1997 assert_eq!(creates, 0);
1998 assert_eq!(dids_estimate, 0);
1999
2000 let records = read.get_records_by_collections([collection].into(), 2, false)?;
2001 assert_eq!(records.len(), 1);
2002 let rec = &records[0];
2003 assert_eq!(rec.record.get(), "{}");
2004 assert!(!rec.is_update);
2005
2006 let records = read.get_records_by_collections(
2007 [Nsid::new("d.e.f".to_string()).unwrap()].into(),
2008 2,
2009 false,
2010 )?;
2011 assert_eq!(records.len(), 0);
2012
2013 Ok(())
2014 }
2015
2016 #[test]
2017 fn test_get_multi_collection() -> anyhow::Result<()> {
2018 let (read, mut write) = fjall_db();
2019
2020 let mut batch = TestBatch::default();
2021 batch.create(
2022 "did:plc:inze6wrmsm7pjl7yta3oig77",
2023 "a.a.a",
2024 "aaa",
2025 r#""earliest""#,
2026 Some("rev-a"),
2027 None,
2028 100,
2029 );
2030 batch.create(
2031 "did:plc:inze6wrmsm7pjl7yta3oig77",
2032 "a.a.b",
2033 "aab",
2034 r#""in between""#,
2035 Some("rev-ab"),
2036 None,
2037 101,
2038 );
2039 batch.create(
2040 "did:plc:inze6wrmsm7pjl7yta3oig77",
2041 "a.a.a",
2042 "aaa-2",
2043 r#""last""#,
2044 Some("rev-a-2"),
2045 None,
2046 102,
2047 );
2048 write.insert_batch(batch.batch)?;
2049
2050 let records = read.get_records_by_collections(
2051 HashSet::from([
2052 Nsid::new("a.a.a".to_string()).unwrap(),
2053 Nsid::new("a.a.b".to_string()).unwrap(),
2054 Nsid::new("a.a.c".to_string()).unwrap(),
2055 ]),
2056 100,
2057 false,
2058 )?;
2059 assert_eq!(records.len(), 3);
2060 assert_eq!(records[0].record.get(), r#""last""#);
2061 assert_eq!(
2062 records[0].collection,
2063 Nsid::new("a.a.a".to_string()).unwrap()
2064 );
2065 assert_eq!(records[1].record.get(), r#""in between""#);
2066 assert_eq!(
2067 records[1].collection,
2068 Nsid::new("a.a.b".to_string()).unwrap()
2069 );
2070 assert_eq!(records[2].record.get(), r#""earliest""#);
2071 assert_eq!(
2072 records[2].collection,
2073 Nsid::new("a.a.a".to_string()).unwrap()
2074 );
2075
2076 Ok(())
2077 }
2078
2079 #[test]
2080 fn test_get_multi_collection_expanded() -> anyhow::Result<()> {
2081 let (read, mut write) = fjall_db();
2082
2083 let mut batch = TestBatch::default();
2084 // insert some older ones in aab
2085 for i in 1..=3 {
2086 batch.create(
2087 "did:plc:inze6wrmsm7pjl7yta3oig77",
2088 "a.a.b",
2089 &format!("aab-{i}"),
2090 &format!(r#""b {i}""#),
2091 Some(&format!("rev-b-{i}")),
2092 None,
2093 100 + i,
2094 );
2095 }
2096 // and some newer ones in aaa
2097 for i in 1..=3 {
2098 batch.create(
2099 "did:plc:inze6wrmsm7pjl7yta3oig77",
2100 "a.a.a",
2101 &format!("aaa-{i}"),
2102 &format!(r#""a {i}""#),
2103 Some(&format!("rev-a-{i}")),
2104 None,
2105 200 + i,
2106 );
2107 }
2108 write.insert_batch(batch.batch)?;
2109
2110 let records = read.get_records_by_collections(
2111 HashSet::from([
2112 Nsid::new("a.a.a".to_string()).unwrap(),
2113 Nsid::new("a.a.b".to_string()).unwrap(),
2114 Nsid::new("a.a.c".to_string()).unwrap(),
2115 ]),
2116 2,
2117 true,
2118 )?;
2119 assert_eq!(records.len(), 4);
2120 assert_eq!(records[0].record.get(), r#""a 3""#);
2121 assert_eq!(
2122 records[0].collection,
2123 Nsid::new("a.a.a".to_string()).unwrap()
2124 );
2125
2126 assert_eq!(records[3].record.get(), r#""b 2""#);
2127 assert_eq!(
2128 records[3].collection,
2129 Nsid::new("a.a.b".to_string()).unwrap()
2130 );
2131
2132 Ok(())
2133 }
2134
2135 #[test]
2136 fn test_update_one() -> anyhow::Result<()> {
2137 let (read, mut write) = fjall_db();
2138
2139 let mut batch = TestBatch::default();
2140 let collection = batch.create(
2141 "did:plc:inze6wrmsm7pjl7yta3oig77",
2142 "a.b.c",
2143 "rkey-asdf",
2144 "{}",
2145 Some("rev-a"),
2146 None,
2147 100,
2148 );
2149 write.insert_batch(batch.batch)?;
2150
2151 let mut batch = TestBatch::default();
2152 batch.update(
2153 "did:plc:inze6wrmsm7pjl7yta3oig77",
2154 "a.b.c",
2155 "rkey-asdf",
2156 r#"{"ch": "ch-ch-ch-changes"}"#,
2157 Some("rev-z"),
2158 None,
2159 101,
2160 );
2161 write.insert_batch(batch.batch)?;
2162 write.step_rollup()?;
2163
2164 let JustCount {
2165 creates,
2166 dids_estimate,
2167 ..
2168 } = read.get_collection_counts(&collection, beginning(), None)?;
2169 assert_eq!(creates, 1);
2170 assert_eq!(dids_estimate, 1);
2171
2172 let records = read.get_records_by_collections([collection].into(), 2, false)?;
2173 assert_eq!(records.len(), 1);
2174 let rec = &records[0];
2175 assert_eq!(rec.record.get(), r#"{"ch": "ch-ch-ch-changes"}"#);
2176 assert!(rec.is_update);
2177 Ok(())
2178 }
2179
2180 #[test]
2181 fn test_delete_one() -> anyhow::Result<()> {
2182 let (read, mut write) = fjall_db();
2183
2184 let mut batch = TestBatch::default();
2185 let collection = batch.create(
2186 "did:plc:inze6wrmsm7pjl7yta3oig77",
2187 "a.b.c",
2188 "rkey-asdf",
2189 "{}",
2190 Some("rev-a"),
2191 None,
2192 100,
2193 );
2194 write.insert_batch(batch.batch)?;
2195
2196 let mut batch = TestBatch::default();
2197 batch.delete(
2198 "did:plc:inze6wrmsm7pjl7yta3oig77",
2199 "a.b.c",
2200 "rkey-asdf",
2201 Some("rev-z"),
2202 101,
2203 );
2204 write.insert_batch(batch.batch)?;
2205 write.step_rollup()?;
2206
2207 let JustCount {
2208 creates,
2209 dids_estimate,
2210 ..
2211 } = read.get_collection_counts(&collection, beginning(), None)?;
2212 assert_eq!(creates, 1);
2213 assert_eq!(dids_estimate, 1);
2214
2215 let records = read.get_records_by_collections([collection].into(), 2, false)?;
2216 assert_eq!(records.len(), 0);
2217
2218 Ok(())
2219 }
2220
2221 #[test]
2222 fn test_collection_trim() -> anyhow::Result<()> {
2223 let (read, mut write) = fjall_db();
2224
2225 let mut batch = TestBatch::default();
2226 batch.create(
2227 "did:plc:inze6wrmsm7pjl7yta3oig77",
2228 "a.a.a",
2229 "rkey-aaa",
2230 "{}",
2231 Some("rev-aaa"),
2232 None,
2233 10_000,
2234 );
2235 let mut last_b_cursor;
2236 for i in 1..=10 {
2237 last_b_cursor = 11_000 + i;
2238 batch.create(
2239 &format!("did:plc:inze6wrmsm7pjl7yta3oig7{}", i % 3),
2240 "a.a.b",
2241 &format!("rkey-bbb-{i}"),
2242 &format!(r#"{{"n": {i}}}"#),
2243 Some(&format!("rev-bbb-{i}")),
2244 None,
2245 last_b_cursor,
2246 );
2247 }
2248 batch.create(
2249 "did:plc:inze6wrmsm7pjl7yta3oig77",
2250 "a.a.c",
2251 "rkey-ccc",
2252 "{}",
2253 Some("rev-ccc"),
2254 None,
2255 12_000,
2256 );
2257
2258 write.insert_batch(batch.batch)?;
2259
2260 let records = read.get_records_by_collections(
2261 HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]),
2262 100,
2263 false,
2264 )?;
2265 assert_eq!(records.len(), 1);
2266 let records = read.get_records_by_collections(
2267 HashSet::from([Nsid::new("a.a.b".to_string()).unwrap()]),
2268 100,
2269 false,
2270 )?;
2271 assert_eq!(records.len(), 10);
2272 let records = read.get_records_by_collections(
2273 HashSet::from([Nsid::new("a.a.c".to_string()).unwrap()]),
2274 100,
2275 false,
2276 )?;
2277 assert_eq!(records.len(), 1);
2278 let records = read.get_records_by_collections(
2279 HashSet::from([Nsid::new("a.a.d".to_string()).unwrap()]),
2280 100,
2281 false,
2282 )?;
2283 assert_eq!(records.len(), 0);
2284
2285 write.trim_collection(&Nsid::new("a.a.a".to_string()).unwrap(), 6, false)?;
2286 write.trim_collection(&Nsid::new("a.a.b".to_string()).unwrap(), 6, false)?;
2287 write.trim_collection(&Nsid::new("a.a.c".to_string()).unwrap(), 6, false)?;
2288 write.trim_collection(&Nsid::new("a.a.d".to_string()).unwrap(), 6, false)?;
2289
2290 let records = read.get_records_by_collections(
2291 HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]),
2292 100,
2293 false,
2294 )?;
2295 assert_eq!(records.len(), 1);
2296 let records = read.get_records_by_collections(
2297 HashSet::from([Nsid::new("a.a.b".to_string()).unwrap()]),
2298 100,
2299 false,
2300 )?;
2301 assert_eq!(records.len(), 6);
2302 let records = read.get_records_by_collections(
2303 HashSet::from([Nsid::new("a.a.c".to_string()).unwrap()]),
2304 100,
2305 false,
2306 )?;
2307 assert_eq!(records.len(), 1);
2308 let records = read.get_records_by_collections(
2309 HashSet::from([Nsid::new("a.a.d".to_string()).unwrap()]),
2310 100,
2311 false,
2312 )?;
2313 assert_eq!(records.len(), 0);
2314
2315 Ok(())
2316 }
2317
2318 #[test]
2319 fn test_delete_account() -> anyhow::Result<()> {
2320 let (read, mut write) = fjall_db();
2321
2322 let mut batch = TestBatch::default();
2323 batch.create(
2324 "did:plc:person-a",
2325 "a.a.a",
2326 "rkey-aaa",
2327 "{}",
2328 Some("rev-aaa"),
2329 None,
2330 10_000,
2331 );
2332 for i in 1..=2 {
2333 batch.create(
2334 "did:plc:person-b",
2335 "a.a.a",
2336 &format!("rkey-bbb-{i}"),
2337 &format!(r#"{{"n": {i}}}"#),
2338 Some(&format!("rev-bbb-{i}")),
2339 None,
2340 11_000 + i,
2341 );
2342 }
2343 write.insert_batch(batch.batch)?;
2344
2345 let records = read.get_records_by_collections(
2346 HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]),
2347 100,
2348 false,
2349 )?;
2350 assert_eq!(records.len(), 3);
2351
2352 let records_deleted =
2353 write.delete_account(&Did::new("did:plc:person-b".to_string()).unwrap())?;
2354 assert_eq!(records_deleted, 2);
2355
2356 let records = read.get_records_by_collections(
2357 HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]),
2358 100,
2359 false,
2360 )?;
2361 assert_eq!(records.len(), 1);
2362
2363 Ok(())
2364 }
2365
2366 #[test]
2367 fn rollup_delete_account_removes_record() -> anyhow::Result<()> {
2368 let (read, mut write) = fjall_db();
2369
2370 let mut batch = TestBatch::default();
2371 batch.create(
2372 "did:plc:person-a",
2373 "a.a.a",
2374 "rkey-aaa",
2375 "{}",
2376 Some("rev-aaa"),
2377 None,
2378 10_000,
2379 );
2380 write.insert_batch(batch.batch)?;
2381
2382 let mut batch = TestBatch::default();
2383 batch.delete_account("did:plc:person-a", 9_999); // queue it before the rollup
2384 write.insert_batch(batch.batch)?;
2385
2386 write.step_rollup()?;
2387
2388 let records = read.get_records_by_collections(
2389 [Nsid::new("a.a.a".to_string()).unwrap()].into(),
2390 1,
2391 false,
2392 )?;
2393 assert_eq!(records.len(), 0);
2394
2395 Ok(())
2396 }
2397
2398 #[test]
2399 fn rollup_delete_live_count_step() -> anyhow::Result<()> {
2400 let (read, mut write) = fjall_db();
2401
2402 let mut batch = TestBatch::default();
2403 batch.create(
2404 "did:plc:person-a",
2405 "a.a.a",
2406 "rkey-aaa",
2407 "{}",
2408 Some("rev-aaa"),
2409 None,
2410 10_000,
2411 );
2412 write.insert_batch(batch.batch)?;
2413
2414 let (n, _) = write.step_rollup()?;
2415 assert_eq!(n, 1);
2416
2417 let mut batch = TestBatch::default();
2418 batch.delete_account("did:plc:person-a", 10_001);
2419 write.insert_batch(batch.batch)?;
2420
2421 let records = read.get_records_by_collections(
2422 [Nsid::new("a.a.a".to_string()).unwrap()].into(),
2423 1,
2424 false,
2425 )?;
2426 assert_eq!(records.len(), 1);
2427
2428 let (n, _) = write.step_rollup()?;
2429 assert_eq!(n, 1);
2430
2431 let records = read.get_records_by_collections(
2432 [Nsid::new("a.a.a".to_string()).unwrap()].into(),
2433 1,
2434 false,
2435 )?;
2436 assert_eq!(records.len(), 0);
2437
2438 let mut batch = TestBatch::default();
2439 batch.delete_account("did:plc:person-a", 9_999);
2440 write.insert_batch(batch.batch)?;
2441
2442 let (n, _) = write.step_rollup()?;
2443 assert_eq!(n, 0);
2444
2445 Ok(())
2446 }
2447
2448 #[test]
2449 fn rollup_multiple_count_batches() -> anyhow::Result<()> {
2450 let (_read, mut write) = fjall_db();
2451
2452 let mut batch = TestBatch::default();
2453 batch.create(
2454 "did:plc:person-a",
2455 "a.a.a",
2456 "rkey-aaa",
2457 "{}",
2458 Some("rev-aaa"),
2459 None,
2460 10_000,
2461 );
2462 write.insert_batch(batch.batch)?;
2463
2464 let mut batch = TestBatch::default();
2465 batch.create(
2466 "did:plc:person-a",
2467 "a.a.a",
2468 "rkey-aab",
2469 "{}",
2470 Some("rev-aab"),
2471 None,
2472 10_001,
2473 );
2474 write.insert_batch(batch.batch)?;
2475
2476 let (n, _) = write.step_rollup()?;
2477 assert_eq!(n, 2);
2478
2479 let (n, _) = write.step_rollup()?;
2480 assert_eq!(n, 0);
2481
2482 Ok(())
2483 }
2484
2485 #[test]
2486 fn counts_before_and_after_rollup() -> anyhow::Result<()> {
2487 let (read, mut write) = fjall_db();
2488
2489 let mut batch = TestBatch::default();
2490 batch.create(
2491 "did:plc:person-a",
2492 "a.a.a",
2493 "rkey-aaa",
2494 "{}",
2495 Some("rev-aaa"),
2496 None,
2497 10_000,
2498 );
2499 batch.create(
2500 "did:plc:person-b",
2501 "a.a.a",
2502 "rkey-bbb",
2503 "{}",
2504 Some("rev-bbb"),
2505 None,
2506 10_001,
2507 );
2508 write.insert_batch(batch.batch)?;
2509
2510 let mut batch = TestBatch::default();
2511 batch.delete_account("did:plc:person-a", 11_000);
2512 write.insert_batch(batch.batch)?;
2513
2514 let mut batch = TestBatch::default();
2515 batch.create(
2516 "did:plc:person-a",
2517 "a.a.a",
2518 "rkey-aac",
2519 "{}",
2520 Some("rev-aac"),
2521 None,
2522 12_000,
2523 );
2524 write.insert_batch(batch.batch)?;
2525
2526 // before any rollup
2527 let JustCount {
2528 creates,
2529 dids_estimate,
2530 ..
2531 } = read.get_collection_counts(
2532 &Nsid::new("a.a.a".to_string()).unwrap(),
2533 beginning(),
2534 None,
2535 )?;
2536 assert_eq!(creates, 0);
2537 assert_eq!(dids_estimate, 0);
2538
2539 // first batch rolled up
2540 let (n, _) = write.step_rollup()?;
2541 assert_eq!(n, 1);
2542
2543 let JustCount {
2544 creates,
2545 dids_estimate,
2546 ..
2547 } = read.get_collection_counts(
2548 &Nsid::new("a.a.a".to_string()).unwrap(),
2549 beginning(),
2550 None,
2551 )?;
2552 assert_eq!(creates, 2);
2553 assert_eq!(dids_estimate, 2);
2554
2555 // delete account rolled up
2556 let (n, _) = write.step_rollup()?;
2557 assert_eq!(n, 1);
2558
2559 let JustCount {
2560 creates,
2561 dids_estimate,
2562 ..
2563 } = read.get_collection_counts(
2564 &Nsid::new("a.a.a".to_string()).unwrap(),
2565 beginning(),
2566 None,
2567 )?;
2568 assert_eq!(creates, 2);
2569 assert_eq!(dids_estimate, 2);
2570
2571 // second batch rolled up
2572 let (n, _) = write.step_rollup()?;
2573 assert_eq!(n, 1);
2574
2575 let JustCount {
2576 creates,
2577 dids_estimate,
2578 ..
2579 } = read.get_collection_counts(
2580 &Nsid::new("a.a.a".to_string()).unwrap(),
2581 beginning(),
2582 None,
2583 )?;
2584 assert_eq!(creates, 3);
2585 assert_eq!(dids_estimate, 2);
2586
2587 // no more rollups left
2588 let (n, _) = write.step_rollup()?;
2589 assert_eq!(n, 0);
2590
2591 Ok(())
2592 }
2593
2594 #[test]
2595 fn get_prefix_children_lexi_empty() {
2596 let (read, _) = fjall_db();
2597 let (
2598 JustCount {
2599 creates,
2600 dids_estimate,
2601 ..
2602 },
2603 children,
2604 cursor,
2605 ) = read
2606 .get_prefix(
2607 NsidPrefix::new("aaa.aaa").unwrap(),
2608 10,
2609 OrderCollectionsBy::Lexi { cursor: None },
2610 None,
2611 None,
2612 )
2613 .unwrap();
2614
2615 assert_eq!(creates, 0);
2616 assert_eq!(dids_estimate, 0);
2617 assert_eq!(children, vec![]);
2618 assert_eq!(cursor, None);
2619 }
2620
2621 #[test]
2622 fn get_prefix_excludes_exact_collection() -> anyhow::Result<()> {
2623 let (read, mut write) = fjall_db();
2624
2625 let mut batch = TestBatch::default();
2626 batch.create(
2627 "did:plc:person-a",
2628 "a.a.a",
2629 "rkey-aaa",
2630 "{}",
2631 Some("rev-aaa"),
2632 None,
2633 10_000,
2634 );
2635 write.insert_batch(batch.batch)?;
2636 write.step_rollup()?;
2637
2638 let (
2639 JustCount {
2640 creates,
2641 dids_estimate,
2642 ..
2643 },
2644 children,
2645 cursor,
2646 ) = read.get_prefix(
2647 NsidPrefix::new("a.a.a").unwrap(),
2648 10,
2649 OrderCollectionsBy::Lexi { cursor: None },
2650 None,
2651 None,
2652 )?;
2653 assert_eq!(creates, 0);
2654 assert_eq!(dids_estimate, 0);
2655 assert_eq!(children, vec![]);
2656 assert_eq!(cursor, None);
2657 Ok(())
2658 }
2659
2660 #[test]
2661 fn get_prefix_excludes_neighbour_collection() -> anyhow::Result<()> {
2662 let (read, mut write) = fjall_db();
2663
2664 let mut batch = TestBatch::default();
2665 batch.create(
2666 "did:plc:person-a",
2667 "a.a.aa",
2668 "rkey-aaa",
2669 "{}",
2670 Some("rev-aaa"),
2671 None,
2672 10_000,
2673 );
2674 write.insert_batch(batch.batch)?;
2675 write.step_rollup()?;
2676
2677 let (
2678 JustCount {
2679 creates,
2680 dids_estimate,
2681 ..
2682 },
2683 children,
2684 cursor,
2685 ) = read.get_prefix(
2686 NsidPrefix::new("a.a.a").unwrap(),
2687 10,
2688 OrderCollectionsBy::Lexi { cursor: None },
2689 None,
2690 None,
2691 )?;
2692 assert_eq!(creates, 0);
2693 assert_eq!(dids_estimate, 0);
2694 assert_eq!(children, vec![]);
2695 assert_eq!(cursor, None);
2696 Ok(())
2697 }
2698
2699 #[test]
2700 fn get_prefix_includes_child_collection() -> anyhow::Result<()> {
2701 let (read, mut write) = fjall_db();
2702
2703 let mut batch = TestBatch::default();
2704 batch.create(
2705 "did:plc:person-a",
2706 "a.a.a",
2707 "rkey-aaa",
2708 "{}",
2709 Some("rev-aaa"),
2710 None,
2711 10_000,
2712 );
2713 write.insert_batch(batch.batch)?;
2714 write.step_rollup()?;
2715
2716 let (
2717 JustCount {
2718 creates,
2719 dids_estimate,
2720 ..
2721 },
2722 children,
2723 cursor,
2724 ) = read.get_prefix(
2725 NsidPrefix::new("a.a").unwrap(),
2726 10,
2727 OrderCollectionsBy::Lexi { cursor: None },
2728 None,
2729 None,
2730 )?;
2731 assert_eq!(creates, 1);
2732 assert_eq!(dids_estimate, 1);
2733 assert_eq!(
2734 children,
2735 vec![PrefixChild::Collection(NsidCount {
2736 nsid: "a.a.a".to_string(),
2737 creates: 1,
2738 updates: 0,
2739 deletes: 0,
2740 dids_estimate: 1
2741 }),]
2742 );
2743 assert_eq!(cursor, None);
2744 Ok(())
2745 }
2746
2747 #[test]
2748 fn get_prefix_includes_child_prefix() -> anyhow::Result<()> {
2749 let (read, mut write) = fjall_db();
2750
2751 let mut batch = TestBatch::default();
2752 batch.create(
2753 "did:plc:person-a",
2754 "a.a.a.a",
2755 "rkey-aaaa",
2756 "{}",
2757 Some("rev-aaaa"),
2758 None,
2759 10_000,
2760 );
2761 write.insert_batch(batch.batch)?;
2762 write.step_rollup()?;
2763
2764 let (
2765 JustCount {
2766 creates,
2767 dids_estimate,
2768 ..
2769 },
2770 children,
2771 cursor,
2772 ) = read.get_prefix(
2773 NsidPrefix::new("a.a").unwrap(),
2774 10,
2775 OrderCollectionsBy::Lexi { cursor: None },
2776 None,
2777 None,
2778 )?;
2779 assert_eq!(creates, 1);
2780 assert_eq!(dids_estimate, 1);
2781 assert_eq!(
2782 children,
2783 vec![PrefixChild::Prefix(PrefixCount {
2784 prefix: "a.a.a".to_string(),
2785 creates: 1,
2786 updates: 0,
2787 deletes: 0,
2788 dids_estimate: 1,
2789 }),]
2790 );
2791 assert_eq!(cursor, None);
2792 Ok(())
2793 }
2794
2795 #[test]
2796 fn get_prefix_merges_child_prefixes() -> anyhow::Result<()> {
2797 let (read, mut write) = fjall_db();
2798
2799 let mut batch = TestBatch::default();
2800 batch.create(
2801 "did:plc:person-a",
2802 "a.a.a.a",
2803 "rkey-aaaa",
2804 "{}",
2805 Some("rev-aaaa"),
2806 None,
2807 10_000,
2808 );
2809 batch.create(
2810 "did:plc:person-a",
2811 "a.a.a.b",
2812 "rkey-aaab",
2813 "{}",
2814 Some("rev-aaab"),
2815 None,
2816 10_001,
2817 );
2818 write.insert_batch(batch.batch)?;
2819 write.step_rollup()?;
2820
2821 let (
2822 JustCount {
2823 creates,
2824 dids_estimate,
2825 ..
2826 },
2827 children,
2828 cursor,
2829 ) = read.get_prefix(
2830 NsidPrefix::new("a.a").unwrap(),
2831 10,
2832 OrderCollectionsBy::Lexi { cursor: None },
2833 None,
2834 None,
2835 )?;
2836 assert_eq!(creates, 2);
2837 assert_eq!(dids_estimate, 1);
2838 assert_eq!(
2839 children,
2840 vec![PrefixChild::Prefix(PrefixCount {
2841 prefix: "a.a.a".to_string(),
2842 creates: 2,
2843 updates: 0,
2844 deletes: 0,
2845 dids_estimate: 1
2846 }),]
2847 );
2848 assert_eq!(cursor, None);
2849 Ok(())
2850 }
2851
2852 #[test]
2853 fn get_prefix_exact_and_child_and_prefix() -> anyhow::Result<()> {
2854 let (read, mut write) = fjall_db();
2855
2856 let mut batch = TestBatch::default();
2857 // exact:
2858 batch.create(
2859 "did:plc:person-a",
2860 "a.a.a",
2861 "rkey-aaa",
2862 "{}",
2863 Some("rev-aaa"),
2864 None,
2865 10_000,
2866 );
2867 // child:
2868 batch.create(
2869 "did:plc:person-a",
2870 "a.a.a.a",
2871 "rkey-aaaa",
2872 "{}",
2873 Some("rev-aaaa"),
2874 None,
2875 10_001,
2876 );
2877 // prefix:
2878 batch.create(
2879 "did:plc:person-a",
2880 "a.a.a.a.a",
2881 "rkey-aaaaa",
2882 "{}",
2883 Some("rev-aaaaa"),
2884 None,
2885 10_002,
2886 );
2887 write.insert_batch(batch.batch)?;
2888 write.step_rollup()?;
2889
2890 let (
2891 JustCount {
2892 creates,
2893 dids_estimate,
2894 ..
2895 },
2896 children,
2897 cursor,
2898 ) = read.get_prefix(
2899 NsidPrefix::new("a.a.a").unwrap(),
2900 10,
2901 OrderCollectionsBy::Lexi { cursor: None },
2902 None,
2903 None,
2904 )?;
2905 assert_eq!(creates, 2);
2906 assert_eq!(dids_estimate, 1);
2907 assert_eq!(
2908 children,
2909 vec![
2910 PrefixChild::Collection(NsidCount {
2911 nsid: "a.a.a.a".to_string(),
2912 creates: 1,
2913 updates: 0,
2914 deletes: 0,
2915 dids_estimate: 1
2916 }),
2917 PrefixChild::Prefix(PrefixCount {
2918 prefix: "a.a.a.a".to_string(),
2919 creates: 1,
2920 updates: 0,
2921 deletes: 0,
2922 dids_estimate: 1
2923 }),
2924 ]
2925 );
2926 assert_eq!(cursor, None);
2927 Ok(())
2928 }
2929}