forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::db_types::{
2 db_complete, DbBytes, DbStaticStr, EncodingResult, StaticStr, SubPrefixBytes,
3};
4use crate::error::StorageError;
5use crate::storage::{StorageResult, StorageWhatever, StoreBackground, StoreReader, StoreWriter};
6use crate::store_types::{
7 AllTimeDidsKey, AllTimeRecordsKey, AllTimeRollupKey, CommitCounts, CountsValue, CursorBucket,
8 DeleteAccountQueueKey, DeleteAccountQueueVal, HourTruncatedCursor, HourlyDidsKey,
9 HourlyRecordsKey, HourlyRollupKey, HourlyRollupStaticPrefix, JetstreamCursorKey,
10 JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey,
11 NewRollupCursorKey, NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal,
12 RecordLocationKey, RecordLocationMeta, RecordLocationVal, RecordRawValue, SketchSecretKey,
13 SketchSecretPrefix, TakeoffKey, TakeoffValue, TrimCollectionCursorKey, WeekTruncatedCursor,
14 WeeklyDidsKey, WeeklyRecordsKey, WeeklyRollupKey, WithCollection, WithRank, HOUR_IN_MICROS,
15 WEEK_IN_MICROS,
16};
17use crate::{
18 nice_duration, CommitAction, ConsumerInfo, Did, EncodingError, EventBatch, JustCount, Nsid,
19 NsidCount, NsidPrefix, OrderCollectionsBy, PrefixChild, PrefixCount, UFOsRecord,
20};
21use async_trait::async_trait;
22use fjall::{
23 Batch as FjallBatch, Config, Keyspace, PartitionCreateOptions, PartitionHandle, Snapshot,
24};
25use jetstream::events::Cursor;
26use lsm_tree::AbstractTree;
27use metrics::{
28 counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram, Unit,
29};
30use std::collections::{HashMap, HashSet};
31use std::iter::Peekable;
32use std::ops::Bound;
33use std::path::Path;
34use std::sync::{
35 atomic::{AtomicBool, Ordering},
36 Arc,
37};
38use std::time::{Duration, Instant, SystemTime};
39
40const MAX_BATCHED_ACCOUNT_DELETE_RECORDS: usize = 1024;
41const MAX_BATCHED_ROLLUP_COUNTS: usize = 256;
42
43///
44/// new data format, roughly:
45///
46/// Partition: 'global'
47///
48/// - Global sequence counter (is the jetstream cursor -- monotonic with many gaps)
49/// - key: "js_cursor" (literal)
50/// - val: u64
51///
52/// - Jetstream server endpoint (persisted because the cursor can't be used on another instance without data loss)
53/// - key: "js_endpoint" (literal)
54/// - val: string (URL of the instance)
55///
56/// - Launch date
57/// - key: "takeoff" (literal)
58/// - val: u64 (micros timestamp, not from jetstream for now so not precise)
59///
60/// - Cardinality estimator secret
61/// - key: "sketch_secret" (literal)
62/// - val: [u8; 16]
63///
64/// - Rollup cursor (bg work: roll stats into hourlies, delete accounts, old record deletes)
65/// - key: "rollup_cursor" (literal)
66/// - val: u64 (tracks behind js_cursor)
67///
68/// - Feed trim cursor (bg work: delete oldest excess records)
69/// - key: "trim_cursor" || nullstr (nsid)
70/// - val: u64 (earliest previously-removed feed entry jetstream cursor)
71///
72/// Partition: 'feed'
73///
74/// - Per-collection list of record references ordered by jetstream cursor
75/// - key: nullstr || u64 (collection nsid null-terminated, jetstream cursor)
76/// - val: nullstr || nullstr || nullstr (did, rkey, rev. rev is mostly a sanity-check for now.)
77///
78///
79/// Partition: 'records'
80///
81/// - Actual records by their atproto location
82/// - key: nullstr || nullstr || nullstr (did, collection, rkey)
83/// - val: u64 || bool || nullstr || rawval (js_cursor, is_update, rev, actual record)
84///
85///
86/// Partition: 'rollups'
87///
88/// - Live (batched) records counts and dids estimate per collection
89/// - key: "live_counts" || u64 || nullstr (js_cursor, nsid)
90/// - val: u64 || HLL (count (not cursor), estimator)
91///
92///
93/// - Hourly total record counts and dids estimate per collection
94/// - key: "hourly_counts" || u64 || nullstr (hour, nsid)
95/// - val: u64 || HLL (count (not cursor), estimator)
96///
97/// - Hourly record count ranking
98/// - key: "hourly_rank_records" || u64 || u64 || nullstr (hour, count, nsid)
99/// - val: [empty]
100///
101/// - Hourly did estimate ranking
102/// - key: "hourly_rank_dids" || u64 || u64 || nullstr (hour, dids estimate, nsid)
103/// - val: [empty]
104///
105///
106/// - Weekly total record counts and dids estimate per collection
107/// - key: "weekly_counts" || u64 || nullstr (week, nsid)
108/// - val: u64 || HLL (count (not cursor), estimator)
109///
110/// - Weekly record count ranking
111/// - key: "weekly_rank_records" || u64 || u64 || nullstr (week, count, nsid)
112/// - val: [empty]
113///
114/// - Weekly did estimate ranking
115/// - key: "weekly_rank_dids" || u64 || u64 || nullstr (week, dids estimate, nsid)
116/// - val: [empty]
117///
118///
119/// - All-time total record counts and dids estimate per collection
120/// - key: "ever_counts" || nullstr (nsid)
121/// - val: u64 || HLL (count (not cursor), estimator)
122///
123/// - All-time total record record count ranking
124/// - key: "ever_rank_records" || u64 || nullstr (count, nsid)
125/// - val: [empty]
126///
127/// - All-time did estimate ranking
128/// - key: "ever_rank_dids" || u64 || nullstr (dids estimate, nsid)
129/// - val: [empty]
130///
131///
132/// Partition: 'queues'
133///
134/// - Delete account queue
135/// - key: "delete_acount" || u64 (js_cursor)
136/// - val: nullstr (did)
137///
138///
139/// TODO: moderation actions
140/// TODO: account privacy preferences. Might wait for the protocol-level (PDS-level?) stuff to land. Will probably do lazy fetching + caching on read.
141#[derive(Debug)]
142pub struct FjallStorage {}
143
144#[derive(Debug, Default)]
145pub struct FjallConfig {
146 /// drop the db when the storage is dropped
147 ///
148 /// this is only meant for tests
149 #[cfg(test)]
150 pub temp: bool,
151}
152
153impl StorageWhatever<FjallReader, FjallWriter, FjallBackground, FjallConfig> for FjallStorage {
154 fn init(
155 path: impl AsRef<Path>,
156 endpoint: String,
157 force_endpoint: bool,
158 _config: FjallConfig,
159 ) -> StorageResult<(FjallReader, FjallWriter, Option<Cursor>, SketchSecretPrefix)> {
160 let keyspace = {
161 let config = Config::new(path);
162
163 // #[cfg(not(test))]
164 // let config = config.fsync_ms(Some(4_000));
165
166 config.open()?
167 };
168
169 let global = keyspace.open_partition("global", PartitionCreateOptions::default())?;
170 let feeds = keyspace.open_partition("feeds", PartitionCreateOptions::default())?;
171 let records = keyspace.open_partition("records", PartitionCreateOptions::default())?;
172 let rollups = keyspace.open_partition("rollups", PartitionCreateOptions::default())?;
173 let queues = keyspace.open_partition("queues", PartitionCreateOptions::default())?;
174
175 let js_cursor = get_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?;
176
177 let sketch_secret = if js_cursor.is_some() {
178 let stored_endpoint =
179 get_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)?;
180 let JetstreamEndpointValue(stored) = stored_endpoint.ok_or(StorageError::InitError(
181 "found cursor but missing js_endpoint, refusing to start.".to_string(),
182 ))?;
183
184 let Some(stored_secret) =
185 get_static_neu::<SketchSecretKey, SketchSecretPrefix>(&global)?
186 else {
187 return Err(StorageError::InitError(
188 "found cursor but missing sketch_secret, refusing to start.".to_string(),
189 ));
190 };
191
192 if stored != endpoint {
193 if force_endpoint {
194 log::warn!("forcing a jetstream switch from {stored:?} to {endpoint:?}");
195 insert_static_neu::<JetstreamEndpointKey>(
196 &global,
197 JetstreamEndpointValue(endpoint.to_string()),
198 )?;
199 } else {
200 return Err(StorageError::InitError(format!(
201 "stored js_endpoint {stored:?} differs from provided {endpoint:?}, refusing to start without --jetstream-force.")));
202 }
203 }
204 stored_secret
205 } else {
206 log::info!("initializing a fresh db!");
207 init_static_neu::<JetstreamEndpointKey>(
208 &global,
209 JetstreamEndpointValue(endpoint.to_string()),
210 )?;
211
212 log::info!("generating new secret for cardinality sketches...");
213 let mut sketch_secret: SketchSecretPrefix = [0u8; 16];
214 getrandom::fill(&mut sketch_secret).map_err(|e| {
215 StorageError::InitError(format!(
216 "failed to get a random secret for cardinality sketches: {e:?}"
217 ))
218 })?;
219 init_static_neu::<SketchSecretKey>(&global, sketch_secret)?;
220
221 init_static_neu::<TakeoffKey>(&global, Cursor::at(SystemTime::now()))?;
222 init_static_neu::<NewRollupCursorKey>(&global, Cursor::from_start())?;
223
224 sketch_secret
225 };
226
227 let reader = FjallReader {
228 keyspace: keyspace.clone(),
229 global: global.clone(),
230 feeds: feeds.clone(),
231 records: records.clone(),
232 rollups: rollups.clone(),
233 queues: queues.clone(),
234 };
235 reader.describe_metrics();
236 let writer = FjallWriter {
237 bg_taken: Arc::new(AtomicBool::new(false)),
238 keyspace,
239 global,
240 feeds,
241 records,
242 rollups,
243 queues,
244 };
245 writer.describe_metrics();
246 Ok((reader, writer, js_cursor, sketch_secret))
247 }
248}
249
250type FjallRKV = fjall::Result<(fjall::Slice, fjall::Slice)>;
251
252#[derive(Clone)]
253pub struct FjallReader {
254 keyspace: Keyspace,
255 global: PartitionHandle,
256 feeds: PartitionHandle,
257 records: PartitionHandle,
258 rollups: PartitionHandle,
259 queues: PartitionHandle,
260}
261
262/// An iterator that knows how to skip over deleted/invalidated records
263struct RecordIterator {
264 db_iter: Box<dyn Iterator<Item = FjallRKV>>,
265 records: PartitionHandle,
266 limit: usize,
267 fetched: usize,
268}
269impl RecordIterator {
270 pub fn new(
271 feeds: &PartitionHandle,
272 records: PartitionHandle,
273 collection: &Nsid,
274 limit: usize,
275 ) -> StorageResult<Self> {
276 let prefix = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?;
277 let db_iter = feeds.prefix(prefix).rev();
278 Ok(Self {
279 db_iter: Box::new(db_iter),
280 records,
281 limit,
282 fetched: 0,
283 })
284 }
285 fn get_record(&self, db_next: FjallRKV) -> StorageResult<Option<UFOsRecord>> {
286 let (key_bytes, val_bytes) = db_next?;
287 let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?;
288 let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?;
289 let location_key: RecordLocationKey = (&feed_key, &feed_val).into();
290
291 let Some(location_val_bytes) = self.records.get(location_key.to_db_bytes()?)? else {
292 // record was deleted (hopefully)
293 return Ok(None);
294 };
295
296 let (meta, n) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?;
297
298 if meta.cursor() != feed_key.cursor() {
299 // older/different version
300 return Ok(None);
301 }
302 if meta.rev != feed_val.rev() {
303 // weird...
304 log::warn!("record lookup: cursor match but rev did not...? excluding.");
305 return Ok(None);
306 }
307 let Some(raw_value_bytes) = location_val_bytes.get(n..) else {
308 log::warn!(
309 "record lookup: found record but could not get bytes to decode the record??"
310 );
311 return Ok(None);
312 };
313 let rawval = db_complete::<RecordRawValue>(raw_value_bytes)?;
314 Ok(Some(UFOsRecord {
315 collection: feed_key.collection().clone(),
316 cursor: feed_key.cursor(),
317 did: feed_val.did().clone(),
318 rkey: feed_val.rkey().clone(),
319 rev: meta.rev.to_string(),
320 record: rawval.try_into()?,
321 is_update: meta.is_update,
322 }))
323 }
324}
325impl Iterator for RecordIterator {
326 type Item = StorageResult<Option<UFOsRecord>>;
327 fn next(&mut self) -> Option<Self::Item> {
328 if self.fetched == self.limit {
329 return Some(Ok(None));
330 }
331 let record = loop {
332 let db_next = self.db_iter.next()?; // None short-circuits here
333 match self.get_record(db_next) {
334 Err(e) => return Some(Err(e)),
335 Ok(Some(record)) => break record,
336 Ok(None) => continue,
337 }
338 };
339 self.fetched += 1;
340 Some(Ok(Some(record)))
341 }
342}
343
344type GetCounts = Box<dyn FnOnce() -> StorageResult<CountsValue>>;
345type GetByterCounts = StorageResult<(Nsid, GetCounts)>;
346type NsidCounter = Box<dyn Iterator<Item = GetByterCounts>>;
347fn get_lexi_iter<T: WithCollection + DbBytes + 'static>(
348 snapshot: &Snapshot,
349 start: Bound<Vec<u8>>,
350 end: Bound<Vec<u8>>,
351) -> StorageResult<NsidCounter> {
352 Ok(Box::new(snapshot.range((start, end)).map(|kv| {
353 let (k_bytes, v_bytes) = kv?;
354 let key = db_complete::<T>(&k_bytes)?;
355 let nsid = key.collection().clone();
356 let get_counts: GetCounts = Box::new(move || Ok(db_complete::<CountsValue>(&v_bytes)?));
357 Ok((nsid, get_counts))
358 })))
359}
360type GetRollupKey = Arc<dyn Fn(&Nsid) -> EncodingResult<Vec<u8>>>;
361fn get_lookup_iter<T: WithCollection + WithRank + DbBytes + 'static>(
362 snapshot: lsm_tree::Snapshot,
363 start: Bound<Vec<u8>>,
364 end: Bound<Vec<u8>>,
365 get_rollup_key: GetRollupKey,
366) -> StorageResult<NsidCounter> {
367 Ok(Box::new(snapshot.range((start, end)).rev().map(
368 move |kv| {
369 let (k_bytes, _) = kv?;
370 let key = db_complete::<T>(&k_bytes)?;
371 let nsid = key.collection().clone();
372 let get_counts: GetCounts = Box::new({
373 let nsid = nsid.clone();
374 let snapshot = snapshot.clone();
375 let get_rollup_key = get_rollup_key.clone();
376 move || {
377 let db_count_bytes = snapshot.get(get_rollup_key(&nsid)?)?.expect(
378 "integrity: all-time rank rollup must have corresponding all-time count rollup",
379 );
380 Ok(db_complete::<CountsValue>(&db_count_bytes)?)
381 }
382 });
383 Ok((nsid, get_counts))
384 },
385 )))
386}
387
388type CollectionSerieses = HashMap<Nsid, Vec<CountsValue>>;
389
390impl FjallReader {
391 fn describe_metrics(&self) {
392 describe_gauge!(
393 "storage_fjall_l0_run_count",
394 Unit::Count,
395 "number of L0 runs in a partition"
396 );
397 describe_gauge!(
398 "storage_fjall_keyspace_disk_space",
399 Unit::Bytes,
400 "total storage used according to fjall"
401 );
402 describe_gauge!(
403 "storage_fjall_journal_count",
404 Unit::Count,
405 "total keyspace journals according to fjall"
406 );
407 describe_gauge!(
408 "storage_fjall_keyspace_sequence",
409 Unit::Count,
410 "fjall keyspace sequence"
411 );
412 }
413
414 fn get_storage_stats(&self) -> StorageResult<serde_json::Value> {
415 let rollup_cursor =
416 get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?
417 .map(|c| c.to_raw_u64());
418
419 Ok(serde_json::json!({
420 "keyspace_disk_space": self.keyspace.disk_space(),
421 "keyspace_journal_count": self.keyspace.journal_count(),
422 "keyspace_sequence": self.keyspace.instant(),
423 "rollup_cursor": rollup_cursor,
424 }))
425 }
426
427 fn get_consumer_info(&self) -> StorageResult<ConsumerInfo> {
428 let global = self.global.snapshot();
429
430 let endpoint =
431 get_snapshot_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)?
432 .ok_or(StorageError::BadStateError(
433 "Could not find jetstream endpoint".to_string(),
434 ))?
435 .0;
436
437 let started_at = get_snapshot_static_neu::<TakeoffKey, TakeoffValue>(&global)?
438 .ok_or(StorageError::BadStateError(
439 "Could not find jetstream takeoff time".to_string(),
440 ))?
441 .to_raw_u64();
442
443 let latest_cursor =
444 get_snapshot_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?
445 .map(|c| c.to_raw_u64());
446
447 let rollup_cursor =
448 get_snapshot_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&global)?
449 .map(|c| c.to_raw_u64());
450
451 Ok(ConsumerInfo::Jetstream {
452 endpoint,
453 started_at,
454 latest_cursor,
455 rollup_cursor,
456 })
457 }
458
459 fn get_earliest_hour(&self, rollups: Option<&Snapshot>) -> StorageResult<HourTruncatedCursor> {
460 let cursor = rollups
461 .unwrap_or(&self.rollups.snapshot())
462 .prefix(HourlyRollupStaticPrefix::default().to_db_bytes()?)
463 .next()
464 .transpose()?
465 .map(|(key_bytes, _)| db_complete::<HourlyRollupKey>(&key_bytes))
466 .transpose()?
467 .map(|key| key.cursor())
468 .unwrap_or_else(|| Cursor::from_start().into());
469 Ok(cursor)
470 }
471
472 fn get_lexi_collections(
473 &self,
474 snapshot: Snapshot,
475 limit: usize,
476 cursor: Option<Vec<u8>>,
477 buckets: Vec<CursorBucket>,
478 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> {
479 let cursor_nsid = cursor.as_deref().map(db_complete::<Nsid>).transpose()?;
480 let mut iters: Vec<Peekable<NsidCounter>> = Vec::with_capacity(buckets.len());
481 for bucket in &buckets {
482 let it: NsidCounter = match bucket {
483 CursorBucket::Hour(t) => {
484 let start = cursor_nsid
485 .as_ref()
486 .map(|nsid| HourlyRollupKey::after_nsid(*t, nsid))
487 .unwrap_or_else(|| HourlyRollupKey::start(*t))?;
488 let end = HourlyRollupKey::end(*t)?;
489 get_lexi_iter::<HourlyRollupKey>(&snapshot, start, end)?
490 }
491 CursorBucket::Week(t) => {
492 let start = cursor_nsid
493 .as_ref()
494 .map(|nsid| WeeklyRollupKey::after_nsid(*t, nsid))
495 .unwrap_or_else(|| WeeklyRollupKey::start(*t))?;
496 let end = WeeklyRollupKey::end(*t)?;
497 get_lexi_iter::<WeeklyRollupKey>(&snapshot, start, end)?
498 }
499 CursorBucket::AllTime => {
500 let start = cursor_nsid
501 .as_ref()
502 .map(AllTimeRollupKey::after_nsid)
503 .unwrap_or_else(AllTimeRollupKey::start)?;
504 let end = AllTimeRollupKey::end()?;
505 get_lexi_iter::<AllTimeRollupKey>(&snapshot, start, end)?
506 }
507 };
508 iters.push(it.peekable());
509 }
510
511 let mut out = Vec::new();
512 let mut current_nsid = None;
513 for _ in 0..limit {
514 // double-scan the iters for each element: this could be eliminated but we're starting simple.
515 // first scan: find the lowest nsid
516 // second scan: take + merge, and advance all iters with lowest nsid
517 let mut lowest: Option<Nsid> = None;
518 for iter in &mut iters {
519 if let Some(bla) = iter.peek_mut() {
520 let (nsid, _) = match bla {
521 Ok(v) => v,
522 Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?,
523 };
524 lowest = match lowest {
525 Some(ref current) if nsid.as_str() > current.as_str() => lowest,
526 _ => Some(nsid.clone()),
527 };
528 }
529 }
530 current_nsid = lowest.clone();
531 let Some(nsid) = lowest else { break };
532
533 let mut merged = CountsValue::default();
534 for iter in &mut iters {
535 // unwrap: potential fjall error was already checked & bailed over when peeking in the first loop
536 if let Some(Ok((_, get_counts))) = iter.next_if(|v| v.as_ref().unwrap().0 == nsid) {
537 let counts = get_counts()?;
538 merged.merge(&counts);
539 }
540 }
541 out.push(NsidCount::new(&nsid, &merged));
542 }
543
544 let next_cursor = current_nsid.map(|s| s.to_db_bytes()).transpose()?;
545 Ok((out, next_cursor))
546 }
547
548 fn get_ordered_collections(
549 &self,
550 snapshot: Snapshot,
551 limit: usize,
552 order: OrderCollectionsBy,
553 buckets: Vec<CursorBucket>,
554 ) -> StorageResult<Vec<NsidCount>> {
555 let mut iters: Vec<NsidCounter> = Vec::with_capacity(buckets.len());
556
557 for bucket in buckets {
558 let it: NsidCounter = match (&order, bucket) {
559 (OrderCollectionsBy::RecordsCreated, CursorBucket::Hour(t)) => {
560 get_lookup_iter::<HourlyRecordsKey>(
561 snapshot.clone(),
562 HourlyRecordsKey::start(t)?,
563 HourlyRecordsKey::end(t)?,
564 Arc::new({
565 move |collection| HourlyRollupKey::new(t, collection).to_db_bytes()
566 }),
567 )?
568 }
569 (OrderCollectionsBy::DidsEstimate, CursorBucket::Hour(t)) => {
570 get_lookup_iter::<HourlyDidsKey>(
571 snapshot.clone(),
572 HourlyDidsKey::start(t)?,
573 HourlyDidsKey::end(t)?,
574 Arc::new({
575 move |collection| HourlyRollupKey::new(t, collection).to_db_bytes()
576 }),
577 )?
578 }
579 (OrderCollectionsBy::RecordsCreated, CursorBucket::Week(t)) => {
580 get_lookup_iter::<WeeklyRecordsKey>(
581 snapshot.clone(),
582 WeeklyRecordsKey::start(t)?,
583 WeeklyRecordsKey::end(t)?,
584 Arc::new({
585 move |collection| WeeklyRollupKey::new(t, collection).to_db_bytes()
586 }),
587 )?
588 }
589 (OrderCollectionsBy::DidsEstimate, CursorBucket::Week(t)) => {
590 get_lookup_iter::<WeeklyDidsKey>(
591 snapshot.clone(),
592 WeeklyDidsKey::start(t)?,
593 WeeklyDidsKey::end(t)?,
594 Arc::new({
595 move |collection| WeeklyRollupKey::new(t, collection).to_db_bytes()
596 }),
597 )?
598 }
599 (OrderCollectionsBy::RecordsCreated, CursorBucket::AllTime) => {
600 get_lookup_iter::<AllTimeRecordsKey>(
601 snapshot.clone(),
602 AllTimeRecordsKey::start()?,
603 AllTimeRecordsKey::end()?,
604 Arc::new(|collection| AllTimeRollupKey::new(collection).to_db_bytes()),
605 )?
606 }
607 (OrderCollectionsBy::DidsEstimate, CursorBucket::AllTime) => {
608 get_lookup_iter::<AllTimeDidsKey>(
609 snapshot.clone(),
610 AllTimeDidsKey::start()?,
611 AllTimeDidsKey::end()?,
612 Arc::new(|collection| AllTimeRollupKey::new(collection).to_db_bytes()),
613 )?
614 }
615 (OrderCollectionsBy::Lexi { .. }, _) => unreachable!(),
616 };
617 iters.push(it);
618 }
619
620 // overfetch by taking a bit more than the limit
621 // merge by collection
622 // sort by requested order, take limit, discard all remaining
623 //
624 // this isn't guaranteed to be correct, but it will hopefully be close most of the time:
625 // - it's possible that some NSIDs might score low during some time-buckets, and miss being merged
626 // - overfetching hopefully helps a bit by catching nsids near the threshold more often, but. yeah.
627 //
628 // this thing is heavy, there's probably a better way
629 let mut ranked: HashMap<Nsid, CountsValue> = HashMap::with_capacity(limit * 2);
630 for iter in iters {
631 for pair in iter.take((limit as f64 * 1.3).ceil() as usize) {
632 let (nsid, get_counts) = pair?;
633 let counts = get_counts()?;
634 ranked.entry(nsid).or_default().merge(&counts);
635 }
636 }
637 let mut ranked: Vec<(Nsid, CountsValue)> = ranked.into_iter().collect();
638 match order {
639 OrderCollectionsBy::RecordsCreated => ranked.sort_by_key(|(_, c)| c.counts().creates),
640 OrderCollectionsBy::DidsEstimate => ranked.sort_by_key(|(_, c)| c.dids().estimate()),
641 OrderCollectionsBy::Lexi { .. } => unreachable!(),
642 }
643 let counts = ranked
644 .into_iter()
645 .rev()
646 .take(limit)
647 .map(|(nsid, cv)| NsidCount::new(&nsid, &cv))
648 .collect();
649 Ok(counts)
650 }
651
652 fn get_collections(
653 &self,
654 limit: usize,
655 order: OrderCollectionsBy,
656 since: Option<HourTruncatedCursor>,
657 until: Option<HourTruncatedCursor>,
658 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> {
659 let snapshot = self.rollups.snapshot();
660 let buckets = if let (None, None) = (since, until) {
661 vec![CursorBucket::AllTime]
662 } else {
663 let mut lower = self.get_earliest_hour(Some(&snapshot))?;
664 if let Some(specified) = since {
665 if specified > lower {
666 lower = specified;
667 }
668 }
669 let upper = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into());
670 CursorBucket::buckets_spanning(lower, upper)
671 };
672 match order {
673 OrderCollectionsBy::Lexi { cursor } => {
674 self.get_lexi_collections(snapshot, limit, cursor, buckets)
675 }
676 _ => Ok((
677 self.get_ordered_collections(snapshot, limit, order, buckets)?,
678 None,
679 )),
680 }
681 }
682
683 fn get_lexi_prefix(
684 &self,
685 snapshot: Snapshot,
686 prefix: NsidPrefix,
687 limit: usize,
688 cursor: Option<Vec<u8>>,
689 buckets: Vec<CursorBucket>,
690 ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)> {
691 // let prefix_sub_with_null = prefix.as_str().to_string().to_db_bytes()?;
692 let prefix_sub = String::sub_prefix(&prefix.terminated())?; // with trailing dot to ensure full segment match
693 let cursor_child = cursor
694 .as_deref()
695 .map(|encoded_bytes| {
696 let decoded: String = db_complete(encoded_bytes)?;
697 // TODO: write some tests for cursors, there's probably bugs here
698 let as_sub_prefix_with_null = decoded.to_db_bytes()?;
699 Ok::<_, EncodingError>(as_sub_prefix_with_null)
700 })
701 .transpose()?;
702 let mut iters: Vec<NsidCounter> = Vec::with_capacity(buckets.len());
703 for bucket in &buckets {
704 let it: NsidCounter = match bucket {
705 CursorBucket::Hour(t) => {
706 let start = cursor_child
707 .as_ref()
708 .map(|child| HourlyRollupKey::after_nsid_prefix(*t, child))
709 .unwrap_or_else(|| HourlyRollupKey::after_nsid_prefix(*t, &prefix_sub))?;
710 let end = HourlyRollupKey::nsid_prefix_end(*t, &prefix_sub)?;
711 get_lexi_iter::<HourlyRollupKey>(&snapshot, start, end)?
712 }
713 CursorBucket::Week(t) => {
714 let start = cursor_child
715 .as_ref()
716 .map(|child| WeeklyRollupKey::after_nsid_prefix(*t, child))
717 .unwrap_or_else(|| WeeklyRollupKey::after_nsid_prefix(*t, &prefix_sub))?;
718 let end = WeeklyRollupKey::nsid_prefix_end(*t, &prefix_sub)?;
719 get_lexi_iter::<WeeklyRollupKey>(&snapshot, start, end)?
720 }
721 CursorBucket::AllTime => {
722 let start = cursor_child
723 .as_ref()
724 .map(|child| AllTimeRollupKey::after_nsid_prefix(child))
725 .unwrap_or_else(|| AllTimeRollupKey::after_nsid_prefix(&prefix_sub))?;
726 let end = AllTimeRollupKey::nsid_prefix_end(&prefix_sub)?;
727 get_lexi_iter::<AllTimeRollupKey>(&snapshot, start, end)?
728 }
729 };
730 iters.push(it);
731 }
732
733 // with apologies
734 let mut iters: Vec<_> = iters
735 .into_iter()
736 .map(|it| {
737 it.map(|bla| {
738 bla.map(|(nsid, v)| {
739 let Some(child) = Child::from_prefix(&nsid, &prefix) else {
740 panic!("failed from_prefix: {nsid:?} {prefix:?} (bad iter bounds?)");
741 };
742 (child, v)
743 })
744 })
745 .peekable()
746 })
747 .collect();
748
749 let mut items = Vec::new();
750 let mut prefix_count = CountsValue::default();
751 #[derive(Debug, Clone, PartialEq)]
752 enum Child {
753 FullNsid(Nsid),
754 ChildPrefix(String),
755 }
756 impl Child {
757 fn from_prefix(nsid: &Nsid, prefix: &NsidPrefix) -> Option<Self> {
758 if prefix.is_group_of(nsid) {
759 return Some(Child::FullNsid(nsid.clone()));
760 }
761 let suffix = nsid.as_str().strip_prefix(&format!("{}.", prefix.0))?;
762 let (segment, _) = suffix.split_once('.').unwrap();
763 let child_prefix = format!("{}.{segment}", prefix.0);
764 Some(Child::ChildPrefix(child_prefix))
765 }
766 fn is_before(&self, other: &Child) -> bool {
767 match (self, other) {
768 (Child::FullNsid(s), Child::ChildPrefix(o)) if s.as_str() == o => true,
769 (Child::ChildPrefix(s), Child::FullNsid(o)) if s == o.as_str() => false,
770 (Child::FullNsid(s), Child::FullNsid(o)) => s.as_str() < o.as_str(),
771 (Child::ChildPrefix(s), Child::ChildPrefix(o)) => s < o,
772 (Child::FullNsid(s), Child::ChildPrefix(o)) => s.to_string() < *o,
773 (Child::ChildPrefix(s), Child::FullNsid(o)) => *s < o.to_string(),
774 }
775 }
776 fn into_inner(self) -> String {
777 match self {
778 Child::FullNsid(s) => s.to_string(),
779 Child::ChildPrefix(s) => s,
780 }
781 }
782 }
783 let mut current_child: Option<Child> = None;
784 for _ in 0..limit {
785 // double-scan the iters for each element: this could be eliminated but we're starting simple.
786 // first scan: find the lowest nsid
787 // second scan: take + merge, and advance all iters with lowest nsid
788 let mut lowest: Option<Child> = None;
789 for iter in &mut iters {
790 if let Some(bla) = iter.peek_mut() {
791 let (child, _) = match bla {
792 Ok(v) => v,
793 Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?,
794 };
795
796 lowest = match lowest {
797 Some(ref current) if current.is_before(child) => lowest,
798 _ => Some(child.clone()),
799 };
800 }
801 }
802 current_child = lowest.clone();
803 let Some(child) = lowest else { break };
804
805 let mut merged = CountsValue::default();
806 for iter in &mut iters {
807 // unwrap: potential fjall error was already checked & bailed over when peeking in the first loop
808 while let Some(Ok((_, get_counts))) =
809 iter.next_if(|v| v.as_ref().unwrap().0 == child)
810 {
811 let counts = get_counts()?;
812 prefix_count.merge(&counts);
813 merged.merge(&counts);
814 }
815 }
816 items.push(match child {
817 Child::FullNsid(nsid) => PrefixChild::Collection(NsidCount::new(&nsid, &merged)),
818 Child::ChildPrefix(prefix) => {
819 PrefixChild::Prefix(PrefixCount::new(&prefix, &merged))
820 }
821 });
822 }
823
824 // TODO: could serialize the prefix count (with sketch) into the cursor so that uniqs can actually count up?
825 // ....er the sketch is probably too big
826 // TODO: this is probably buggy on child-type boundaries bleh
827 let next_cursor = current_child
828 .map(|s| s.into_inner().to_db_bytes())
829 .transpose()?;
830
831 Ok(((&prefix_count).into(), items, next_cursor))
832 }
833
834 fn get_prefix(
835 &self,
836 prefix: NsidPrefix,
837 limit: usize,
838 order: OrderCollectionsBy,
839 since: Option<HourTruncatedCursor>,
840 until: Option<HourTruncatedCursor>,
841 ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)> {
842 let snapshot = self.rollups.snapshot();
843 let buckets = if let (None, None) = (since, until) {
844 vec![CursorBucket::AllTime]
845 } else {
846 let mut lower = self.get_earliest_hour(Some(&snapshot))?;
847 if let Some(specified) = since {
848 if specified > lower {
849 lower = specified;
850 }
851 }
852 let upper = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into());
853 CursorBucket::buckets_spanning(lower, upper)
854 };
855 match order {
856 OrderCollectionsBy::Lexi { cursor } => {
857 self.get_lexi_prefix(snapshot, prefix, limit, cursor, buckets)
858 }
859 _ => todo!(),
860 }
861 }
862
863 /// - step: output series time step, in seconds
864 fn get_timeseries(
865 &self,
866 collections: Vec<Nsid>,
867 since: HourTruncatedCursor,
868 until: Option<HourTruncatedCursor>,
869 step: u64,
870 ) -> StorageResult<(Vec<HourTruncatedCursor>, CollectionSerieses)> {
871 if step > WEEK_IN_MICROS {
872 panic!("week-stepping is todo");
873 }
874 let until = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into());
875 let Ok(dt) = Cursor::from(until).duration_since(&Cursor::from(since)) else {
876 return Ok((
877 // empty: until < since
878 vec![],
879 collections.into_iter().map(|c| (c, vec![])).collect(),
880 ));
881 };
882 let n_hours = (dt.as_micros() as u64) / HOUR_IN_MICROS;
883 let mut counts_by_hour = Vec::with_capacity(n_hours as usize);
884 let snapshot = self.rollups.snapshot();
885 for hour in (0..n_hours).map(|i| since.nth_next(i)) {
886 let mut counts = Vec::with_capacity(collections.len());
887 for nsid in &collections {
888 let count = snapshot
889 .get(&HourlyRollupKey::new(hour, nsid).to_db_bytes()?)?
890 .as_deref()
891 .map(db_complete::<CountsValue>)
892 .transpose()?
893 .unwrap_or_default();
894 counts.push(count);
895 }
896 counts_by_hour.push((hour, counts));
897 }
898
899 let step_hours = step / (HOUR_IN_MICROS / 1_000_000);
900 let mut output_hours = Vec::with_capacity(step_hours as usize);
901 let mut output_series: CollectionSerieses = collections
902 .iter()
903 .map(|c| (c.clone(), Vec::with_capacity(step_hours as usize)))
904 .collect();
905
906 for chunk in counts_by_hour.chunks(step_hours as usize) {
907 output_hours.push(chunk[0].0); // always guaranteed to have at least one element in a chunks chunk
908 for (i, collection) in collections.iter().enumerate() {
909 let mut c = CountsValue::default();
910 for (_, counts) in chunk {
911 c.merge(&counts[i]);
912 }
913 output_series
914 .get_mut(collection)
915 .expect("output series is initialized with all collections")
916 .push(c);
917 }
918 }
919
920 Ok((output_hours, output_series))
921 }
922
923 fn get_collection_counts(
924 &self,
925 collection: &Nsid,
926 since: HourTruncatedCursor,
927 until: Option<HourTruncatedCursor>,
928 ) -> StorageResult<JustCount> {
929 // grab snapshots in case rollups happen while we're working
930 let rollups = self.rollups.snapshot();
931
932 let until = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into());
933 let buckets = CursorBucket::buckets_spanning(since, until);
934 let mut total_counts = CountsValue::default();
935
936 for bucket in buckets {
937 let key = match bucket {
938 CursorBucket::Hour(t) => HourlyRollupKey::new(t, collection).to_db_bytes()?,
939 CursorBucket::Week(t) => WeeklyRollupKey::new(t, collection).to_db_bytes()?,
940 CursorBucket::AllTime => unreachable!(), // TODO: fall back on this if the time span spans the whole dataset?
941 };
942 let count = rollups
943 .get(&key)?
944 .as_deref()
945 .map(db_complete::<CountsValue>)
946 .transpose()?
947 .unwrap_or_default();
948 total_counts.merge(&count);
949 }
950
951 Ok((&total_counts).into())
952 }
953
954 fn get_records_by_collections(
955 &self,
956 collections: HashSet<Nsid>,
957 limit: usize,
958 expand_each_collection: bool,
959 ) -> StorageResult<Vec<UFOsRecord>> {
960 if collections.is_empty() {
961 return Ok(vec![]);
962 }
963 let mut record_iterators = Vec::new();
964 for collection in collections {
965 let iter = RecordIterator::new(&self.feeds, self.records.clone(), &collection, limit)?;
966 record_iterators.push(iter.peekable());
967 }
968 let mut merged = Vec::new();
969 loop {
970 let mut latest: Option<(Cursor, usize)> = None; // ugh
971 for (i, iter) in record_iterators.iter_mut().enumerate() {
972 let Some(it) = iter.peek_mut() else {
973 continue;
974 };
975 let it = match it {
976 Ok(v) => v,
977 Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?,
978 };
979 let Some(rec) = it else {
980 if expand_each_collection {
981 continue;
982 } else {
983 break;
984 }
985 };
986 if let Some((cursor, _)) = latest {
987 if rec.cursor > cursor {
988 latest = Some((rec.cursor, i))
989 }
990 } else {
991 latest = Some((rec.cursor, i));
992 }
993 }
994 let Some((_, idx)) = latest else {
995 break;
996 };
997 // yeah yeah whateverrrrrrrrrrrrrrrr
998 merged.push(record_iterators[idx].next().unwrap().unwrap().unwrap());
999 }
1000 Ok(merged)
1001 }
1002
1003 fn search_collections(&self, terms: Vec<String>) -> StorageResult<Vec<NsidCount>> {
1004 let start = AllTimeRollupKey::start()?;
1005 let end = AllTimeRollupKey::end()?;
1006 let mut matches = Vec::new();
1007 let limit = 16; // TODO: param
1008 for kv in self.rollups.range((start, end)) {
1009 let (key_bytes, val_bytes) = kv?;
1010 let key = db_complete::<AllTimeRollupKey>(&key_bytes)?;
1011 let nsid = key.collection();
1012 for term in &terms {
1013 if nsid.contains(term) {
1014 let counts = db_complete::<CountsValue>(&val_bytes)?;
1015 matches.push(NsidCount::new(nsid, &counts));
1016 break;
1017 }
1018 }
1019 if matches.len() >= limit {
1020 break;
1021 }
1022 }
1023 // TODO: indicate incomplete results
1024 Ok(matches)
1025 }
1026}
1027
1028#[async_trait]
1029impl StoreReader for FjallReader {
1030 fn name(&self) -> String {
1031 "fjall storage v2".into()
1032 }
1033 fn update_metrics(&self) {
1034 gauge!("storage_fjall_l0_run_count", "partition" => "global")
1035 .set(self.global.tree.l0_run_count() as f64);
1036 gauge!("storage_fjall_l0_run_count", "partition" => "feeds")
1037 .set(self.feeds.tree.l0_run_count() as f64);
1038 gauge!("storage_fjall_l0_run_count", "partition" => "records")
1039 .set(self.records.tree.l0_run_count() as f64);
1040 gauge!("storage_fjall_l0_run_count", "partition" => "rollups")
1041 .set(self.rollups.tree.l0_run_count() as f64);
1042 gauge!("storage_fjall_l0_run_count", "partition" => "queues")
1043 .set(self.queues.tree.l0_run_count() as f64);
1044 gauge!("storage_fjall_keyspace_disk_space").set(self.keyspace.disk_space() as f64);
1045 gauge!("storage_fjall_journal_count").set(self.keyspace.journal_count() as f64);
1046 gauge!("storage_fjall_keyspace_sequence").set(self.keyspace.instant() as f64);
1047 }
1048 async fn get_storage_stats(&self) -> StorageResult<serde_json::Value> {
1049 let s = self.clone();
1050 tokio::task::spawn_blocking(move || FjallReader::get_storage_stats(&s)).await?
1051 }
1052 async fn get_consumer_info(&self) -> StorageResult<ConsumerInfo> {
1053 let s = self.clone();
1054 tokio::task::spawn_blocking(move || FjallReader::get_consumer_info(&s)).await?
1055 }
1056 async fn get_collections(
1057 &self,
1058 limit: usize,
1059 order: OrderCollectionsBy,
1060 since: Option<HourTruncatedCursor>,
1061 until: Option<HourTruncatedCursor>,
1062 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> {
1063 let s = self.clone();
1064 tokio::task::spawn_blocking(move || {
1065 FjallReader::get_collections(&s, limit, order, since, until)
1066 })
1067 .await?
1068 }
1069 async fn get_prefix(
1070 &self,
1071 prefix: NsidPrefix,
1072 limit: usize,
1073 order: OrderCollectionsBy,
1074 since: Option<HourTruncatedCursor>,
1075 until: Option<HourTruncatedCursor>,
1076 ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)> {
1077 let s = self.clone();
1078 tokio::task::spawn_blocking(move || {
1079 FjallReader::get_prefix(&s, prefix, limit, order, since, until)
1080 })
1081 .await?
1082 }
1083 async fn get_timeseries(
1084 &self,
1085 collections: Vec<Nsid>,
1086 since: HourTruncatedCursor,
1087 until: Option<HourTruncatedCursor>,
1088 step: u64,
1089 ) -> StorageResult<(Vec<HourTruncatedCursor>, CollectionSerieses)> {
1090 let s = self.clone();
1091 tokio::task::spawn_blocking(move || {
1092 FjallReader::get_timeseries(&s, collections, since, until, step)
1093 })
1094 .await?
1095 }
1096 async fn get_collection_counts(
1097 &self,
1098 collection: &Nsid,
1099 since: HourTruncatedCursor,
1100 until: Option<HourTruncatedCursor>,
1101 ) -> StorageResult<JustCount> {
1102 let s = self.clone();
1103 let collection = collection.clone();
1104 tokio::task::spawn_blocking(move || {
1105 FjallReader::get_collection_counts(&s, &collection, since, until)
1106 })
1107 .await?
1108 }
1109 async fn get_records_by_collections(
1110 &self,
1111 collections: HashSet<Nsid>,
1112 limit: usize,
1113 expand_each_collection: bool,
1114 ) -> StorageResult<Vec<UFOsRecord>> {
1115 let s = self.clone();
1116 tokio::task::spawn_blocking(move || {
1117 FjallReader::get_records_by_collections(&s, collections, limit, expand_each_collection)
1118 })
1119 .await?
1120 }
1121 async fn search_collections(&self, terms: Vec<String>) -> StorageResult<Vec<NsidCount>> {
1122 let s = self.clone();
1123 tokio::task::spawn_blocking(move || FjallReader::search_collections(&s, terms)).await?
1124 }
1125}
1126
1127#[derive(Clone)]
1128pub struct FjallWriter {
1129 bg_taken: Arc<AtomicBool>,
1130 keyspace: Keyspace,
1131 global: PartitionHandle,
1132 feeds: PartitionHandle,
1133 records: PartitionHandle,
1134 rollups: PartitionHandle,
1135 queues: PartitionHandle,
1136}
1137
1138impl FjallWriter {
1139 fn describe_metrics(&self) {
1140 describe_histogram!(
1141 "storage_insert_batch_db_batch_items",
1142 Unit::Count,
1143 "how many items are in the fjall batch for batched inserts"
1144 );
1145 describe_histogram!(
1146 "storage_insert_batch_db_batch_size",
1147 Unit::Count,
1148 "in-memory size of the fjall batch for batched inserts"
1149 );
1150 describe_histogram!(
1151 "storage_rollup_counts_db_batch_items",
1152 Unit::Count,
1153 "how many items are in the fjall batch for a timlies rollup"
1154 );
1155 describe_histogram!(
1156 "storage_rollup_counts_db_batch_size",
1157 Unit::Count,
1158 "in-memory size of the fjall batch for a timelies rollup"
1159 );
1160 describe_counter!(
1161 "storage_delete_account_partial_commits",
1162 Unit::Count,
1163 "fjall checkpoint commits for cleaning up accounts with too many records"
1164 );
1165 describe_counter!(
1166 "storage_delete_account_completions",
1167 Unit::Count,
1168 "total count of account deletes handled"
1169 );
1170 describe_counter!(
1171 "storage_delete_account_records_deleted",
1172 Unit::Count,
1173 "total records deleted when handling account deletes"
1174 );
1175 describe_histogram!(
1176 "storage_trim_dirty_nsids",
1177 Unit::Count,
1178 "number of NSIDs trimmed"
1179 );
1180 describe_histogram!(
1181 "storage_trim_duration",
1182 Unit::Microseconds,
1183 "how long it took to trim the dirty NSIDs"
1184 );
1185 describe_counter!(
1186 "storage_trim_removed",
1187 Unit::Count,
1188 "how many records were removed during trim"
1189 );
1190 }
1191 fn rollup_delete_account(
1192 &mut self,
1193 cursor: Cursor,
1194 key_bytes: &[u8],
1195 val_bytes: &[u8],
1196 ) -> StorageResult<usize> {
1197 let did = db_complete::<DeleteAccountQueueVal>(val_bytes)?;
1198 self.delete_account(&did)?;
1199 let mut batch = self.keyspace.batch();
1200 batch.remove(&self.queues, key_bytes);
1201 insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, cursor)?;
1202 batch.commit()?;
1203 Ok(1)
1204 }
1205
1206 fn rollup_live_counts(
1207 &mut self,
1208 timelies: impl Iterator<Item = Result<(fjall::Slice, fjall::Slice), fjall::Error>>,
1209 cursor_exclusive_limit: Option<Cursor>,
1210 rollup_limit: usize,
1211 ) -> StorageResult<(usize, HashSet<Nsid>)> {
1212 // current strategy is to buffer counts in mem before writing the rollups
1213 // we *could* read+write every single batch to rollup.. but their merge is associative so
1214 // ...so save the db some work up front? is this worth it? who knows...
1215
1216 let mut dirty_nsids = HashSet::new();
1217
1218 #[derive(Eq, Hash, PartialEq)]
1219 enum Rollup {
1220 Hourly(HourTruncatedCursor),
1221 Weekly(WeekTruncatedCursor),
1222 AllTime,
1223 }
1224
1225 let mut batch = self.keyspace.batch();
1226 let mut cursors_advanced = 0;
1227 let mut last_cursor = Cursor::from_start();
1228 let mut counts_by_rollup: HashMap<(Nsid, Rollup), CountsValue> = HashMap::new();
1229
1230 for (i, kv) in timelies.enumerate() {
1231 if i >= rollup_limit {
1232 break;
1233 }
1234
1235 let (key_bytes, val_bytes) = kv?;
1236 let key = db_complete::<LiveCountsKey>(&key_bytes)?;
1237
1238 if cursor_exclusive_limit
1239 .map(|limit| key.cursor() > limit)
1240 .unwrap_or(false)
1241 {
1242 break;
1243 }
1244
1245 dirty_nsids.insert(key.collection().clone());
1246
1247 batch.remove(&self.rollups, key_bytes);
1248 let val = db_complete::<CountsValue>(&val_bytes)?;
1249 counts_by_rollup
1250 .entry((
1251 key.collection().clone(),
1252 Rollup::Hourly(key.cursor().into()),
1253 ))
1254 .or_default()
1255 .merge(&val);
1256 counts_by_rollup
1257 .entry((
1258 key.collection().clone(),
1259 Rollup::Weekly(key.cursor().into()),
1260 ))
1261 .or_default()
1262 .merge(&val);
1263 counts_by_rollup
1264 .entry((key.collection().clone(), Rollup::AllTime))
1265 .or_default()
1266 .merge(&val);
1267
1268 cursors_advanced += 1;
1269 last_cursor = key.cursor();
1270 }
1271
1272 // go through each new rollup thing and merge it with whatever might already be in the db
1273 for ((nsid, rollup), counts) in counts_by_rollup {
1274 let rollup_key_bytes = match rollup {
1275 Rollup::Hourly(hourly_cursor) => {
1276 HourlyRollupKey::new(hourly_cursor, &nsid).to_db_bytes()?
1277 }
1278 Rollup::Weekly(weekly_cursor) => {
1279 WeeklyRollupKey::new(weekly_cursor, &nsid).to_db_bytes()?
1280 }
1281 Rollup::AllTime => AllTimeRollupKey::new(&nsid).to_db_bytes()?,
1282 };
1283 let mut rolled: CountsValue = self
1284 .rollups
1285 .get(&rollup_key_bytes)?
1286 .as_deref()
1287 .map(db_complete::<CountsValue>)
1288 .transpose()?
1289 .unwrap_or_default();
1290
1291 // now that we have values, we can know the exising ranks
1292 let before_creates_count = rolled.counts().creates;
1293 let before_dids_estimate = rolled.dids().estimate() as u64;
1294
1295 // update the rollup
1296 rolled.merge(&counts);
1297
1298 // new ranks
1299 let new_creates_count = rolled.counts().creates;
1300 let new_dids_estimate = rolled.dids().estimate() as u64;
1301
1302 // update create-ranked secondary index if rank changed
1303 if new_creates_count != before_creates_count {
1304 let (old_k, new_k) = match rollup {
1305 Rollup::Hourly(cursor) => (
1306 HourlyRecordsKey::new(cursor, before_creates_count.into(), &nsid)
1307 .to_db_bytes()?,
1308 HourlyRecordsKey::new(cursor, new_creates_count.into(), &nsid)
1309 .to_db_bytes()?,
1310 ),
1311 Rollup::Weekly(cursor) => (
1312 WeeklyRecordsKey::new(cursor, before_creates_count.into(), &nsid)
1313 .to_db_bytes()?,
1314 WeeklyRecordsKey::new(cursor, new_creates_count.into(), &nsid)
1315 .to_db_bytes()?,
1316 ),
1317 Rollup::AllTime => (
1318 AllTimeRecordsKey::new(before_creates_count.into(), &nsid).to_db_bytes()?,
1319 AllTimeRecordsKey::new(new_creates_count.into(), &nsid).to_db_bytes()?,
1320 ),
1321 };
1322 // remove_weak is allowed here because the secondary ranking index only ever inserts once at a key
1323 batch.remove_weak(&self.rollups, &old_k);
1324 batch.insert(&self.rollups, &new_k, "");
1325 }
1326
1327 // update dids-ranked secondary index if rank changed
1328 if new_dids_estimate != before_dids_estimate {
1329 let (old_k, new_k) = match rollup {
1330 Rollup::Hourly(cursor) => (
1331 HourlyDidsKey::new(cursor, before_dids_estimate.into(), &nsid)
1332 .to_db_bytes()?,
1333 HourlyDidsKey::new(cursor, new_dids_estimate.into(), &nsid)
1334 .to_db_bytes()?,
1335 ),
1336 Rollup::Weekly(cursor) => (
1337 WeeklyDidsKey::new(cursor, before_dids_estimate.into(), &nsid)
1338 .to_db_bytes()?,
1339 WeeklyDidsKey::new(cursor, new_dids_estimate.into(), &nsid)
1340 .to_db_bytes()?,
1341 ),
1342 Rollup::AllTime => (
1343 AllTimeDidsKey::new(before_dids_estimate.into(), &nsid).to_db_bytes()?,
1344 AllTimeDidsKey::new(new_dids_estimate.into(), &nsid).to_db_bytes()?,
1345 ),
1346 };
1347 // remove_weak is allowed here because the secondary ranking index only ever inserts once at a key
1348 batch.remove_weak(&self.rollups, &old_k);
1349 batch.insert(&self.rollups, &new_k, "");
1350 }
1351
1352 // replace the main counts rollup
1353 batch.insert(&self.rollups, &rollup_key_bytes, &rolled.to_db_bytes()?);
1354 }
1355
1356 insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, last_cursor)?;
1357
1358 histogram!("storage_rollup_counts_db_batch_items").record(batch.len() as f64);
1359 histogram!("storage_rollup_counts_db_batch_size")
1360 .record(std::mem::size_of_val(&batch) as f64);
1361 batch.commit()?;
1362 Ok((cursors_advanced, dirty_nsids))
1363 }
1364}
1365
1366impl StoreWriter<FjallBackground> for FjallWriter {
1367 fn background_tasks(&mut self, reroll: bool) -> StorageResult<FjallBackground> {
1368 if self.bg_taken.swap(true, Ordering::SeqCst) {
1369 return Err(StorageError::BackgroundAlreadyStarted);
1370 }
1371 if reroll {
1372 log::info!("reroll: resetting rollup cursor...");
1373 insert_static_neu::<NewRollupCursorKey>(&self.global, Cursor::from_start())?;
1374 log::info!("reroll: clearing trim cursors...");
1375 let mut batch = self.keyspace.batch();
1376 for kv in self
1377 .global
1378 .prefix(TrimCollectionCursorKey::from_prefix_to_db_bytes(
1379 &Default::default(),
1380 )?)
1381 {
1382 let (k, _) = kv?;
1383 batch.remove(&self.global, k);
1384 }
1385 let n = batch.len();
1386 batch.commit()?;
1387 log::info!("reroll: cleared {n} trim cursors.");
1388 }
1389 Ok(FjallBackground(self.clone()))
1390 }
1391
1392 fn insert_batch<const LIMIT: usize>(
1393 &mut self,
1394 event_batch: EventBatch<LIMIT>,
1395 ) -> StorageResult<()> {
1396 if event_batch.is_empty() {
1397 return Ok(());
1398 }
1399
1400 let mut batch = self.keyspace.batch();
1401
1402 // would be nice not to have to iterate everything at once here
1403 let latest = event_batch.latest_cursor().unwrap();
1404
1405 for (nsid, commits) in event_batch.commits_by_nsid {
1406 for commit in commits.commits {
1407 let location_key: RecordLocationKey = (&commit, &nsid).into();
1408
1409 match commit.action {
1410 CommitAction::Cut => {
1411 batch.remove(&self.records, &location_key.to_db_bytes()?);
1412 }
1413 CommitAction::Put(put_action) => {
1414 let feed_key = NsidRecordFeedKey::from_pair(nsid.clone(), commit.cursor);
1415 let feed_val: NsidRecordFeedVal =
1416 (&commit.did, &commit.rkey, commit.rev.as_str()).into();
1417 batch.insert(
1418 &self.feeds,
1419 feed_key.to_db_bytes()?,
1420 feed_val.to_db_bytes()?,
1421 );
1422
1423 let location_val: RecordLocationVal =
1424 (commit.cursor, commit.rev.as_str(), put_action).into();
1425 batch.insert(
1426 &self.records,
1427 &location_key.to_db_bytes()?,
1428 &location_val.to_db_bytes()?,
1429 );
1430 }
1431 }
1432 }
1433 let live_counts_key: LiveCountsKey = (latest, &nsid).into();
1434 let counts_value = CountsValue::new(
1435 CommitCounts {
1436 creates: commits.creates as u64,
1437 updates: commits.updates as u64,
1438 deletes: commits.deletes as u64,
1439 },
1440 commits.dids_estimate,
1441 );
1442 batch.insert(
1443 &self.rollups,
1444 &live_counts_key.to_db_bytes()?,
1445 &counts_value.to_db_bytes()?,
1446 );
1447 }
1448
1449 for remove in event_batch.account_removes {
1450 let queue_key = DeleteAccountQueueKey::new(remove.cursor);
1451 let queue_val: DeleteAccountQueueVal = remove.did;
1452 batch.insert(
1453 &self.queues,
1454 &queue_key.to_db_bytes()?,
1455 &queue_val.to_db_bytes()?,
1456 );
1457 }
1458
1459 batch.insert(
1460 &self.global,
1461 DbStaticStr::<JetstreamCursorKey>::default().to_db_bytes()?,
1462 latest.to_db_bytes()?,
1463 );
1464
1465 histogram!("storage_insert_batch_db_batch_items").record(batch.len() as f64);
1466 histogram!("storage_insert_batch_db_batch_size")
1467 .record(std::mem::size_of_val(&batch) as f64);
1468 batch.commit()?;
1469 Ok(())
1470 }
1471
1472 fn step_rollup(&mut self) -> StorageResult<(usize, HashSet<Nsid>)> {
1473 let mut dirty_nsids = HashSet::new();
1474
1475 let rollup_cursor =
1476 get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?.ok_or(
1477 StorageError::BadStateError("Could not find current rollup cursor".to_string()),
1478 )?;
1479
1480 // timelies
1481 let live_counts_range = LiveCountsKey::range_from_cursor(rollup_cursor)?;
1482 let mut timely_iter = self.rollups.range(live_counts_range).peekable();
1483
1484 let timely_next = timely_iter
1485 .peek_mut()
1486 .map(|kv| -> StorageResult<LiveCountsKey> {
1487 match kv {
1488 Err(e) => Err(std::mem::replace(e, fjall::Error::Poisoned))?,
1489 Ok((key_bytes, _)) => {
1490 let key = db_complete::<LiveCountsKey>(key_bytes)?;
1491 Ok(key)
1492 }
1493 }
1494 })
1495 .transpose()?;
1496
1497 // delete accounts
1498 let delete_accounts_range =
1499 DeleteAccountQueueKey::new(rollup_cursor).range_to_prefix_end()?;
1500
1501 let next_delete = self
1502 .queues
1503 .range(delete_accounts_range)
1504 .next()
1505 .transpose()?
1506 .map(|(key_bytes, val_bytes)| {
1507 db_complete::<DeleteAccountQueueKey>(&key_bytes)
1508 .map(|k| (k.suffix, key_bytes, val_bytes))
1509 })
1510 .transpose()?;
1511
1512 let cursors_stepped = match (timely_next, next_delete) {
1513 (Some(timely), Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => {
1514 if timely.cursor() < delete_cursor {
1515 let (n, dirty) = self.rollup_live_counts(
1516 timely_iter,
1517 Some(delete_cursor),
1518 MAX_BATCHED_ROLLUP_COUNTS,
1519 )?;
1520 dirty_nsids.extend(dirty);
1521 n
1522 } else {
1523 self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)?
1524 }
1525 }
1526 (Some(_), None) => {
1527 let (n, dirty) =
1528 self.rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS)?;
1529 dirty_nsids.extend(dirty);
1530 n
1531 }
1532 (None, Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => {
1533 self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)?
1534 }
1535 (None, None) => 0,
1536 };
1537
1538 Ok((cursors_stepped, dirty_nsids))
1539 }
1540
1541 fn trim_collection(
1542 &mut self,
1543 collection: &Nsid,
1544 limit: usize,
1545 full_scan: bool,
1546 ) -> StorageResult<(usize, usize, bool)> {
1547 let mut dangling_feed_keys_cleaned = 0;
1548 let mut records_deleted = 0;
1549
1550 let live_range = if full_scan {
1551 let start = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?;
1552 let end = NsidRecordFeedKey::prefix_range_end(collection)?;
1553 start..end
1554 } else {
1555 let feed_trim_cursor_key =
1556 TrimCollectionCursorKey::new(collection.clone()).to_db_bytes()?;
1557 let trim_cursor = self
1558 .global
1559 .get(&feed_trim_cursor_key)?
1560 .map(|value_bytes| db_complete(&value_bytes))
1561 .transpose()?
1562 .unwrap_or(Cursor::from_start());
1563 NsidRecordFeedKey::from_pair(collection.clone(), trim_cursor).range_to_prefix_end()?
1564 };
1565
1566 let mut live_records_found = 0;
1567 let mut candidate_new_feed_lower_cursor = None;
1568 let ended_early = false;
1569 let mut current_cursor: Option<Cursor> = None;
1570 for (i, kv) in self.feeds.range(live_range).rev().enumerate() {
1571 if i > 0 && i % 500_000 == 0 {
1572 log::info!(
1573 "trim: at {i} for {:?} (now at {})",
1574 collection.to_string(),
1575 current_cursor
1576 .map(|c| c
1577 .elapsed()
1578 .map(nice_duration)
1579 .unwrap_or("[not past]".into()))
1580 .unwrap_or("??".into()),
1581 );
1582 }
1583 let (key_bytes, val_bytes) = kv?;
1584 let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?;
1585 let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?;
1586 let location_key: RecordLocationKey = (&feed_key, &feed_val).into();
1587 let location_key_bytes = location_key.to_db_bytes()?;
1588
1589 let Some(location_val_bytes) = self.records.get(&location_key_bytes)? else {
1590 // record was deleted (hopefully)
1591 self.feeds.remove(&*key_bytes)?;
1592 dangling_feed_keys_cleaned += 1;
1593 continue;
1594 };
1595
1596 let (meta, _) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?;
1597 current_cursor = Some(meta.cursor());
1598
1599 if meta.cursor() != feed_key.cursor() {
1600 // older/different version
1601 self.feeds.remove(&*key_bytes)?;
1602 dangling_feed_keys_cleaned += 1;
1603 continue;
1604 }
1605 if meta.rev != feed_val.rev() {
1606 // weird...
1607 log::warn!("record lookup: cursor match but rev did not...? removing.");
1608 self.records.remove(&location_key_bytes)?;
1609 self.feeds.remove(&*key_bytes)?;
1610 dangling_feed_keys_cleaned += 1;
1611 continue;
1612 }
1613
1614 live_records_found += 1;
1615 if live_records_found <= limit {
1616 continue;
1617 }
1618 if candidate_new_feed_lower_cursor.is_none() {
1619 candidate_new_feed_lower_cursor = Some(feed_key.cursor());
1620 }
1621
1622 self.feeds.remove(&location_key_bytes)?;
1623 self.feeds.remove(key_bytes)?;
1624 records_deleted += 1;
1625 }
1626
1627 if !ended_early {
1628 if let Some(new_cursor) = candidate_new_feed_lower_cursor {
1629 self.global.insert(
1630 &TrimCollectionCursorKey::new(collection.clone()).to_db_bytes()?,
1631 &new_cursor.to_db_bytes()?,
1632 )?;
1633 }
1634 }
1635
1636 log::trace!("trim_collection ({collection:?}) removed {dangling_feed_keys_cleaned} dangling feed entries and {records_deleted} records (ended early? {ended_early})");
1637 Ok((dangling_feed_keys_cleaned, records_deleted, ended_early))
1638 }
1639
1640 fn delete_account(&mut self, did: &Did) -> Result<usize, StorageError> {
1641 let mut records_deleted = 0;
1642 let mut batch = self.keyspace.batch();
1643 let prefix = RecordLocationKey::from_prefix_to_db_bytes(did)?;
1644 for kv in self.records.prefix(prefix) {
1645 let (key_bytes, _) = kv?;
1646 batch.remove(&self.records, key_bytes);
1647 records_deleted += 1;
1648 if batch.len() >= MAX_BATCHED_ACCOUNT_DELETE_RECORDS {
1649 counter!("storage_delete_account_partial_commits").increment(1);
1650 batch.commit()?;
1651 batch = self.keyspace.batch();
1652 }
1653 }
1654 counter!("storage_delete_account_completions").increment(1);
1655 counter!("storage_delete_account_records_deleted").increment(records_deleted as u64);
1656 batch.commit()?;
1657 Ok(records_deleted)
1658 }
1659}
1660
1661pub struct FjallBackground(FjallWriter);
1662
1663#[async_trait]
1664impl StoreBackground for FjallBackground {
1665 async fn run(mut self, backfill: bool) -> StorageResult<()> {
1666 let mut dirty_nsids = HashSet::new();
1667
1668 // backfill condition here is iffy -- longer is good when doing the main ingest and then collection trims
1669 // shorter once those are done helps things catch up
1670 // the best setting for non-backfill is non-obvious.. it can be pretty slow and still be fine
1671 let mut rollup =
1672 tokio::time::interval(Duration::from_micros(if backfill { 100 } else { 32_000 }));
1673 rollup.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1674
1675 // backfill condition again iffy. collection trims should probably happen in their own phase.
1676 let mut trim = tokio::time::interval(Duration::from_secs(if backfill { 18 } else { 9 }));
1677 trim.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1678
1679 loop {
1680 tokio::select! {
1681 _ = rollup.tick() => {
1682 let mut db = self.0.clone();
1683 let (n, dirty) = tokio::task::spawn_blocking(move || db.step_rollup()).await??;
1684 if n == 0 {
1685 rollup.reset_after(Duration::from_millis(1_200)); // we're caught up, take a break
1686 }
1687 dirty_nsids.extend(dirty);
1688 log::trace!("rolled up {n} items ({} collections now dirty)", dirty_nsids.len());
1689 },
1690 _ = trim.tick() => {
1691 let n = dirty_nsids.len();
1692 log::trace!("trimming {n} nsids: {dirty_nsids:?}");
1693 let t0 = Instant::now();
1694 let (mut total_danglers, mut total_deleted) = (0, 0);
1695 let mut completed = HashSet::new();
1696 for collection in &dirty_nsids {
1697 let mut db = self.0.clone();
1698 let c = collection.clone();
1699 let (danglers, deleted, ended_early) = tokio::task::spawn_blocking(move || db.trim_collection(&c, 512, false)).await??;
1700 total_danglers += danglers;
1701 total_deleted += deleted;
1702 if !ended_early {
1703 completed.insert(collection.clone());
1704 }
1705 if total_deleted > 10_000_000 {
1706 log::info!("trim stopped early, more than 10M records already deleted.");
1707 break;
1708 }
1709 }
1710 let dt = t0.elapsed();
1711 log::trace!("finished trimming {n} nsids in {dt:?}: {total_danglers} dangling and {total_deleted} total removed.");
1712 histogram!("storage_trim_dirty_nsids").record(completed.len() as f64);
1713 histogram!("storage_trim_duration").record(dt.as_micros() as f64);
1714 counter!("storage_trim_removed", "dangling" => "true").increment(total_danglers as u64);
1715 if total_deleted >= total_danglers {
1716 counter!("storage_trim_removed", "dangling" => "false").increment((total_deleted - total_danglers) as u64);
1717 } else {
1718 // TODO: probably think through what's happening here
1719 log::warn!("weird trim case: more danglers than deleted? metric will be missing for dangling=false. deleted={total_deleted} danglers={total_danglers}");
1720 }
1721 for c in completed {
1722 dirty_nsids.remove(&c);
1723 }
1724 },
1725 };
1726 }
1727 }
1728}
1729
1730/// Get a value from a fixed key
1731fn get_static_neu<K: StaticStr, V: DbBytes>(global: &PartitionHandle) -> StorageResult<Option<V>> {
1732 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
1733 let value = global
1734 .get(&key_bytes)?
1735 .map(|value_bytes| db_complete(&value_bytes))
1736 .transpose()?;
1737 Ok(value)
1738}
1739
1740/// Get a value from a fixed key
1741fn get_snapshot_static_neu<K: StaticStr, V: DbBytes>(
1742 global: &fjall::Snapshot,
1743) -> StorageResult<Option<V>> {
1744 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
1745 let value = global
1746 .get(&key_bytes)?
1747 .map(|value_bytes| db_complete(&value_bytes))
1748 .transpose()?;
1749 Ok(value)
1750}
1751
1752/// Set a value to a fixed key
1753fn insert_static_neu<K: StaticStr>(
1754 global: &PartitionHandle,
1755 value: impl DbBytes,
1756) -> StorageResult<()> {
1757 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
1758 let value_bytes = value.to_db_bytes()?;
1759 global.insert(&key_bytes, &value_bytes)?;
1760 Ok(())
1761}
1762
1763/// Set a value to a fixed key, erroring if the value already exists
1764///
1765/// Intended for single-threaded init: not safe under concurrency, since there
1766/// is no transaction between checking if the already exists and writing it.
1767fn init_static_neu<K: StaticStr>(
1768 global: &PartitionHandle,
1769 value: impl DbBytes,
1770) -> StorageResult<()> {
1771 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
1772 if global.get(&key_bytes)?.is_some() {
1773 return Err(StorageError::InitError(format!(
1774 "init failed: value for key {key_bytes:?} already exists"
1775 )));
1776 }
1777 let value_bytes = value.to_db_bytes()?;
1778 global.insert(&key_bytes, &value_bytes)?;
1779 Ok(())
1780}
1781
1782/// Set a value to a fixed key
1783fn insert_batch_static_neu<K: StaticStr>(
1784 batch: &mut FjallBatch,
1785 global: &PartitionHandle,
1786 value: impl DbBytes,
1787) -> StorageResult<()> {
1788 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
1789 let value_bytes = value.to_db_bytes()?;
1790 batch.insert(global, &key_bytes, &value_bytes);
1791 Ok(())
1792}
1793
1794#[derive(Debug, serde::Serialize, schemars::JsonSchema)]
1795pub struct StorageInfo {
1796 pub keyspace_disk_space: u64,
1797 pub keyspace_journal_count: usize,
1798 pub keyspace_sequence: u64,
1799 pub global_approximate_len: usize,
1800}
1801
1802////////// temp stuff to remove:
1803
1804#[cfg(test)]
1805mod tests {
1806 use super::*;
1807 use crate::{DeleteAccount, RecordKey, UFOsCommit};
1808 use jetstream::events::{CommitEvent, CommitOp};
1809 use jetstream::exports::Cid;
1810 use serde_json::value::RawValue;
1811
1812 fn fjall_db() -> (FjallReader, FjallWriter) {
1813 let (read, write, _, _) = FjallStorage::init(
1814 tempfile::tempdir().unwrap(),
1815 "offline test (no real jetstream endpoint)".to_string(),
1816 false,
1817 FjallConfig { temp: true },
1818 )
1819 .unwrap();
1820 (read, write)
1821 }
1822
1823 const TEST_BATCH_LIMIT: usize = 16;
1824 fn beginning() -> HourTruncatedCursor {
1825 Cursor::from_start().into()
1826 }
1827
1828 #[derive(Debug, Default)]
1829 struct TestBatch {
1830 pub batch: EventBatch<TEST_BATCH_LIMIT>,
1831 }
1832
1833 impl TestBatch {
1834 #[allow(clippy::too_many_arguments)]
1835 pub fn create(
1836 &mut self,
1837 did: &str,
1838 collection: &str,
1839 rkey: &str,
1840 record: &str,
1841 rev: Option<&str>,
1842 cid: Option<Cid>,
1843 cursor: u64,
1844 ) -> Nsid {
1845 let did = Did::new(did.to_string()).unwrap();
1846 let collection = Nsid::new(collection.to_string()).unwrap();
1847 let record = RawValue::from_string(record.to_string()).unwrap();
1848 let cid = cid.unwrap_or(
1849 "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy"
1850 .parse()
1851 .unwrap(),
1852 );
1853
1854 let event = CommitEvent {
1855 collection,
1856 rkey: RecordKey::new(rkey.to_string()).unwrap(),
1857 rev: rev.unwrap_or("asdf").to_string(),
1858 operation: CommitOp::Create,
1859 record: Some(record),
1860 cid: Some(cid),
1861 };
1862
1863 let (commit, collection) =
1864 UFOsCommit::from_commit_info(event, did.clone(), Cursor::from_raw_u64(cursor))
1865 .unwrap();
1866
1867 self.batch
1868 .commits_by_nsid
1869 .entry(collection.clone())
1870 .or_default()
1871 .truncating_insert(commit, &[0u8; 16])
1872 .unwrap();
1873
1874 collection
1875 }
1876 #[allow(clippy::too_many_arguments)]
1877 pub fn update(
1878 &mut self,
1879 did: &str,
1880 collection: &str,
1881 rkey: &str,
1882 record: &str,
1883 rev: Option<&str>,
1884 cid: Option<Cid>,
1885 cursor: u64,
1886 ) -> Nsid {
1887 let did = Did::new(did.to_string()).unwrap();
1888 let collection = Nsid::new(collection.to_string()).unwrap();
1889 let record = RawValue::from_string(record.to_string()).unwrap();
1890 let cid = cid.unwrap_or(
1891 "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy"
1892 .parse()
1893 .unwrap(),
1894 );
1895
1896 let event = CommitEvent {
1897 collection,
1898 rkey: RecordKey::new(rkey.to_string()).unwrap(),
1899 rev: rev.unwrap_or("asdf").to_string(),
1900 operation: CommitOp::Update,
1901 record: Some(record),
1902 cid: Some(cid),
1903 };
1904
1905 let (commit, collection) =
1906 UFOsCommit::from_commit_info(event, did.clone(), Cursor::from_raw_u64(cursor))
1907 .unwrap();
1908
1909 self.batch
1910 .commits_by_nsid
1911 .entry(collection.clone())
1912 .or_default()
1913 .truncating_insert(commit, &[0u8; 16])
1914 .unwrap();
1915
1916 collection
1917 }
1918 #[allow(clippy::too_many_arguments)]
1919 pub fn delete(
1920 &mut self,
1921 did: &str,
1922 collection: &str,
1923 rkey: &str,
1924 rev: Option<&str>,
1925 cursor: u64,
1926 ) -> Nsid {
1927 let did = Did::new(did.to_string()).unwrap();
1928 let collection = Nsid::new(collection.to_string()).unwrap();
1929 let event = CommitEvent {
1930 collection,
1931 rkey: RecordKey::new(rkey.to_string()).unwrap(),
1932 rev: rev.unwrap_or("asdf").to_string(),
1933 operation: CommitOp::Delete,
1934 record: None,
1935 cid: None,
1936 };
1937
1938 let (commit, collection) =
1939 UFOsCommit::from_commit_info(event, did, Cursor::from_raw_u64(cursor)).unwrap();
1940
1941 self.batch
1942 .commits_by_nsid
1943 .entry(collection.clone())
1944 .or_default()
1945 .truncating_insert(commit, &[0u8; 16])
1946 .unwrap();
1947
1948 collection
1949 }
1950 pub fn delete_account(&mut self, did: &str, cursor: u64) -> Did {
1951 let did = Did::new(did.to_string()).unwrap();
1952 self.batch.account_removes.push(DeleteAccount {
1953 did: did.clone(),
1954 cursor: Cursor::from_raw_u64(cursor),
1955 });
1956 did
1957 }
1958 }
1959
1960 #[test]
1961 fn test_hello() -> anyhow::Result<()> {
1962 let (read, mut write) = fjall_db();
1963 write.insert_batch::<TEST_BATCH_LIMIT>(EventBatch::default())?;
1964 let JustCount {
1965 creates,
1966 dids_estimate,
1967 ..
1968 } = read.get_collection_counts(
1969 &Nsid::new("a.b.c".to_string()).unwrap(),
1970 beginning(),
1971 None,
1972 )?;
1973 assert_eq!(creates, 0);
1974 assert_eq!(dids_estimate, 0);
1975 Ok(())
1976 }
1977
1978 #[test]
1979 fn test_insert_one() -> anyhow::Result<()> {
1980 let (read, mut write) = fjall_db();
1981
1982 let mut batch = TestBatch::default();
1983 let collection = batch.create(
1984 "did:plc:inze6wrmsm7pjl7yta3oig77",
1985 "a.b.c",
1986 "asdf",
1987 "{}",
1988 Some("rev-z"),
1989 None,
1990 100,
1991 );
1992 write.insert_batch(batch.batch)?;
1993 write.step_rollup()?;
1994
1995 let JustCount {
1996 creates,
1997 dids_estimate,
1998 ..
1999 } = read.get_collection_counts(&collection, beginning(), None)?;
2000 assert_eq!(creates, 1);
2001 assert_eq!(dids_estimate, 1);
2002 let JustCount {
2003 creates,
2004 dids_estimate,
2005 ..
2006 } = read.get_collection_counts(
2007 &Nsid::new("d.e.f".to_string()).unwrap(),
2008 beginning(),
2009 None,
2010 )?;
2011 assert_eq!(creates, 0);
2012 assert_eq!(dids_estimate, 0);
2013
2014 let records = read.get_records_by_collections([collection].into(), 2, false)?;
2015 assert_eq!(records.len(), 1);
2016 let rec = &records[0];
2017 assert_eq!(rec.record.get(), "{}");
2018 assert!(!rec.is_update);
2019
2020 let records = read.get_records_by_collections(
2021 [Nsid::new("d.e.f".to_string()).unwrap()].into(),
2022 2,
2023 false,
2024 )?;
2025 assert_eq!(records.len(), 0);
2026
2027 Ok(())
2028 }
2029
2030 #[test]
2031 fn test_get_multi_collection() -> anyhow::Result<()> {
2032 let (read, mut write) = fjall_db();
2033
2034 let mut batch = TestBatch::default();
2035 batch.create(
2036 "did:plc:inze6wrmsm7pjl7yta3oig77",
2037 "a.a.a",
2038 "aaa",
2039 r#""earliest""#,
2040 Some("rev-a"),
2041 None,
2042 100,
2043 );
2044 batch.create(
2045 "did:plc:inze6wrmsm7pjl7yta3oig77",
2046 "a.a.b",
2047 "aab",
2048 r#""in between""#,
2049 Some("rev-ab"),
2050 None,
2051 101,
2052 );
2053 batch.create(
2054 "did:plc:inze6wrmsm7pjl7yta3oig77",
2055 "a.a.a",
2056 "aaa-2",
2057 r#""last""#,
2058 Some("rev-a-2"),
2059 None,
2060 102,
2061 );
2062 write.insert_batch(batch.batch)?;
2063
2064 let records = read.get_records_by_collections(
2065 HashSet::from([
2066 Nsid::new("a.a.a".to_string()).unwrap(),
2067 Nsid::new("a.a.b".to_string()).unwrap(),
2068 Nsid::new("a.a.c".to_string()).unwrap(),
2069 ]),
2070 100,
2071 false,
2072 )?;
2073 assert_eq!(records.len(), 3);
2074 assert_eq!(records[0].record.get(), r#""last""#);
2075 assert_eq!(
2076 records[0].collection,
2077 Nsid::new("a.a.a".to_string()).unwrap()
2078 );
2079 assert_eq!(records[1].record.get(), r#""in between""#);
2080 assert_eq!(
2081 records[1].collection,
2082 Nsid::new("a.a.b".to_string()).unwrap()
2083 );
2084 assert_eq!(records[2].record.get(), r#""earliest""#);
2085 assert_eq!(
2086 records[2].collection,
2087 Nsid::new("a.a.a".to_string()).unwrap()
2088 );
2089
2090 Ok(())
2091 }
2092
2093 #[test]
2094 fn test_get_multi_collection_expanded() -> anyhow::Result<()> {
2095 let (read, mut write) = fjall_db();
2096
2097 let mut batch = TestBatch::default();
2098 // insert some older ones in aab
2099 for i in 1..=3 {
2100 batch.create(
2101 "did:plc:inze6wrmsm7pjl7yta3oig77",
2102 "a.a.b",
2103 &format!("aab-{i}"),
2104 &format!(r#""b {i}""#),
2105 Some(&format!("rev-b-{i}")),
2106 None,
2107 100 + i,
2108 );
2109 }
2110 // and some newer ones in aaa
2111 for i in 1..=3 {
2112 batch.create(
2113 "did:plc:inze6wrmsm7pjl7yta3oig77",
2114 "a.a.a",
2115 &format!("aaa-{i}"),
2116 &format!(r#""a {i}""#),
2117 Some(&format!("rev-a-{i}")),
2118 None,
2119 200 + i,
2120 );
2121 }
2122 write.insert_batch(batch.batch)?;
2123
2124 let records = read.get_records_by_collections(
2125 HashSet::from([
2126 Nsid::new("a.a.a".to_string()).unwrap(),
2127 Nsid::new("a.a.b".to_string()).unwrap(),
2128 Nsid::new("a.a.c".to_string()).unwrap(),
2129 ]),
2130 2,
2131 true,
2132 )?;
2133 assert_eq!(records.len(), 4);
2134 assert_eq!(records[0].record.get(), r#""a 3""#);
2135 assert_eq!(
2136 records[0].collection,
2137 Nsid::new("a.a.a".to_string()).unwrap()
2138 );
2139
2140 assert_eq!(records[3].record.get(), r#""b 2""#);
2141 assert_eq!(
2142 records[3].collection,
2143 Nsid::new("a.a.b".to_string()).unwrap()
2144 );
2145
2146 Ok(())
2147 }
2148
2149 #[test]
2150 fn test_update_one() -> anyhow::Result<()> {
2151 let (read, mut write) = fjall_db();
2152
2153 let mut batch = TestBatch::default();
2154 let collection = batch.create(
2155 "did:plc:inze6wrmsm7pjl7yta3oig77",
2156 "a.b.c",
2157 "rkey-asdf",
2158 "{}",
2159 Some("rev-a"),
2160 None,
2161 100,
2162 );
2163 write.insert_batch(batch.batch)?;
2164
2165 let mut batch = TestBatch::default();
2166 batch.update(
2167 "did:plc:inze6wrmsm7pjl7yta3oig77",
2168 "a.b.c",
2169 "rkey-asdf",
2170 r#"{"ch": "ch-ch-ch-changes"}"#,
2171 Some("rev-z"),
2172 None,
2173 101,
2174 );
2175 write.insert_batch(batch.batch)?;
2176 write.step_rollup()?;
2177
2178 let JustCount {
2179 creates,
2180 dids_estimate,
2181 ..
2182 } = read.get_collection_counts(&collection, beginning(), None)?;
2183 assert_eq!(creates, 1);
2184 assert_eq!(dids_estimate, 1);
2185
2186 let records = read.get_records_by_collections([collection].into(), 2, false)?;
2187 assert_eq!(records.len(), 1);
2188 let rec = &records[0];
2189 assert_eq!(rec.record.get(), r#"{"ch": "ch-ch-ch-changes"}"#);
2190 assert!(rec.is_update);
2191 Ok(())
2192 }
2193
2194 #[test]
2195 fn test_delete_one() -> anyhow::Result<()> {
2196 let (read, mut write) = fjall_db();
2197
2198 let mut batch = TestBatch::default();
2199 let collection = batch.create(
2200 "did:plc:inze6wrmsm7pjl7yta3oig77",
2201 "a.b.c",
2202 "rkey-asdf",
2203 "{}",
2204 Some("rev-a"),
2205 None,
2206 100,
2207 );
2208 write.insert_batch(batch.batch)?;
2209
2210 let mut batch = TestBatch::default();
2211 batch.delete(
2212 "did:plc:inze6wrmsm7pjl7yta3oig77",
2213 "a.b.c",
2214 "rkey-asdf",
2215 Some("rev-z"),
2216 101,
2217 );
2218 write.insert_batch(batch.batch)?;
2219 write.step_rollup()?;
2220
2221 let JustCount {
2222 creates,
2223 dids_estimate,
2224 ..
2225 } = read.get_collection_counts(&collection, beginning(), None)?;
2226 assert_eq!(creates, 1);
2227 assert_eq!(dids_estimate, 1);
2228
2229 let records = read.get_records_by_collections([collection].into(), 2, false)?;
2230 assert_eq!(records.len(), 0);
2231
2232 Ok(())
2233 }
2234
2235 #[test]
2236 fn test_collection_trim() -> anyhow::Result<()> {
2237 let (read, mut write) = fjall_db();
2238
2239 let mut batch = TestBatch::default();
2240 batch.create(
2241 "did:plc:inze6wrmsm7pjl7yta3oig77",
2242 "a.a.a",
2243 "rkey-aaa",
2244 "{}",
2245 Some("rev-aaa"),
2246 None,
2247 10_000,
2248 );
2249 let mut last_b_cursor;
2250 for i in 1..=10 {
2251 last_b_cursor = 11_000 + i;
2252 batch.create(
2253 &format!("did:plc:inze6wrmsm7pjl7yta3oig7{}", i % 3),
2254 "a.a.b",
2255 &format!("rkey-bbb-{i}"),
2256 &format!(r#"{{"n": {i}}}"#),
2257 Some(&format!("rev-bbb-{i}")),
2258 None,
2259 last_b_cursor,
2260 );
2261 }
2262 batch.create(
2263 "did:plc:inze6wrmsm7pjl7yta3oig77",
2264 "a.a.c",
2265 "rkey-ccc",
2266 "{}",
2267 Some("rev-ccc"),
2268 None,
2269 12_000,
2270 );
2271
2272 write.insert_batch(batch.batch)?;
2273
2274 let records = read.get_records_by_collections(
2275 HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]),
2276 100,
2277 false,
2278 )?;
2279 assert_eq!(records.len(), 1);
2280 let records = read.get_records_by_collections(
2281 HashSet::from([Nsid::new("a.a.b".to_string()).unwrap()]),
2282 100,
2283 false,
2284 )?;
2285 assert_eq!(records.len(), 10);
2286 let records = read.get_records_by_collections(
2287 HashSet::from([Nsid::new("a.a.c".to_string()).unwrap()]),
2288 100,
2289 false,
2290 )?;
2291 assert_eq!(records.len(), 1);
2292 let records = read.get_records_by_collections(
2293 HashSet::from([Nsid::new("a.a.d".to_string()).unwrap()]),
2294 100,
2295 false,
2296 )?;
2297 assert_eq!(records.len(), 0);
2298
2299 write.trim_collection(&Nsid::new("a.a.a".to_string()).unwrap(), 6, false)?;
2300 write.trim_collection(&Nsid::new("a.a.b".to_string()).unwrap(), 6, false)?;
2301 write.trim_collection(&Nsid::new("a.a.c".to_string()).unwrap(), 6, false)?;
2302 write.trim_collection(&Nsid::new("a.a.d".to_string()).unwrap(), 6, false)?;
2303
2304 let records = read.get_records_by_collections(
2305 HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]),
2306 100,
2307 false,
2308 )?;
2309 assert_eq!(records.len(), 1);
2310 let records = read.get_records_by_collections(
2311 HashSet::from([Nsid::new("a.a.b".to_string()).unwrap()]),
2312 100,
2313 false,
2314 )?;
2315 assert_eq!(records.len(), 6);
2316 let records = read.get_records_by_collections(
2317 HashSet::from([Nsid::new("a.a.c".to_string()).unwrap()]),
2318 100,
2319 false,
2320 )?;
2321 assert_eq!(records.len(), 1);
2322 let records = read.get_records_by_collections(
2323 HashSet::from([Nsid::new("a.a.d".to_string()).unwrap()]),
2324 100,
2325 false,
2326 )?;
2327 assert_eq!(records.len(), 0);
2328
2329 Ok(())
2330 }
2331
2332 #[test]
2333 fn test_delete_account() -> anyhow::Result<()> {
2334 let (read, mut write) = fjall_db();
2335
2336 let mut batch = TestBatch::default();
2337 batch.create(
2338 "did:plc:person-a",
2339 "a.a.a",
2340 "rkey-aaa",
2341 "{}",
2342 Some("rev-aaa"),
2343 None,
2344 10_000,
2345 );
2346 for i in 1..=2 {
2347 batch.create(
2348 "did:plc:person-b",
2349 "a.a.a",
2350 &format!("rkey-bbb-{i}"),
2351 &format!(r#"{{"n": {i}}}"#),
2352 Some(&format!("rev-bbb-{i}")),
2353 None,
2354 11_000 + i,
2355 );
2356 }
2357 write.insert_batch(batch.batch)?;
2358
2359 let records = read.get_records_by_collections(
2360 HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]),
2361 100,
2362 false,
2363 )?;
2364 assert_eq!(records.len(), 3);
2365
2366 let records_deleted =
2367 write.delete_account(&Did::new("did:plc:person-b".to_string()).unwrap())?;
2368 assert_eq!(records_deleted, 2);
2369
2370 let records = read.get_records_by_collections(
2371 HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]),
2372 100,
2373 false,
2374 )?;
2375 assert_eq!(records.len(), 1);
2376
2377 Ok(())
2378 }
2379
2380 #[test]
2381 fn rollup_delete_account_removes_record() -> anyhow::Result<()> {
2382 let (read, mut write) = fjall_db();
2383
2384 let mut batch = TestBatch::default();
2385 batch.create(
2386 "did:plc:person-a",
2387 "a.a.a",
2388 "rkey-aaa",
2389 "{}",
2390 Some("rev-aaa"),
2391 None,
2392 10_000,
2393 );
2394 write.insert_batch(batch.batch)?;
2395
2396 let mut batch = TestBatch::default();
2397 batch.delete_account("did:plc:person-a", 9_999); // queue it before the rollup
2398 write.insert_batch(batch.batch)?;
2399
2400 write.step_rollup()?;
2401
2402 let records = read.get_records_by_collections(
2403 [Nsid::new("a.a.a".to_string()).unwrap()].into(),
2404 1,
2405 false,
2406 )?;
2407 assert_eq!(records.len(), 0);
2408
2409 Ok(())
2410 }
2411
2412 #[test]
2413 fn rollup_delete_live_count_step() -> anyhow::Result<()> {
2414 let (read, mut write) = fjall_db();
2415
2416 let mut batch = TestBatch::default();
2417 batch.create(
2418 "did:plc:person-a",
2419 "a.a.a",
2420 "rkey-aaa",
2421 "{}",
2422 Some("rev-aaa"),
2423 None,
2424 10_000,
2425 );
2426 write.insert_batch(batch.batch)?;
2427
2428 let (n, _) = write.step_rollup()?;
2429 assert_eq!(n, 1);
2430
2431 let mut batch = TestBatch::default();
2432 batch.delete_account("did:plc:person-a", 10_001);
2433 write.insert_batch(batch.batch)?;
2434
2435 let records = read.get_records_by_collections(
2436 [Nsid::new("a.a.a".to_string()).unwrap()].into(),
2437 1,
2438 false,
2439 )?;
2440 assert_eq!(records.len(), 1);
2441
2442 let (n, _) = write.step_rollup()?;
2443 assert_eq!(n, 1);
2444
2445 let records = read.get_records_by_collections(
2446 [Nsid::new("a.a.a".to_string()).unwrap()].into(),
2447 1,
2448 false,
2449 )?;
2450 assert_eq!(records.len(), 0);
2451
2452 let mut batch = TestBatch::default();
2453 batch.delete_account("did:plc:person-a", 9_999);
2454 write.insert_batch(batch.batch)?;
2455
2456 let (n, _) = write.step_rollup()?;
2457 assert_eq!(n, 0);
2458
2459 Ok(())
2460 }
2461
2462 #[test]
2463 fn rollup_multiple_count_batches() -> anyhow::Result<()> {
2464 let (_read, mut write) = fjall_db();
2465
2466 let mut batch = TestBatch::default();
2467 batch.create(
2468 "did:plc:person-a",
2469 "a.a.a",
2470 "rkey-aaa",
2471 "{}",
2472 Some("rev-aaa"),
2473 None,
2474 10_000,
2475 );
2476 write.insert_batch(batch.batch)?;
2477
2478 let mut batch = TestBatch::default();
2479 batch.create(
2480 "did:plc:person-a",
2481 "a.a.a",
2482 "rkey-aab",
2483 "{}",
2484 Some("rev-aab"),
2485 None,
2486 10_001,
2487 );
2488 write.insert_batch(batch.batch)?;
2489
2490 let (n, _) = write.step_rollup()?;
2491 assert_eq!(n, 2);
2492
2493 let (n, _) = write.step_rollup()?;
2494 assert_eq!(n, 0);
2495
2496 Ok(())
2497 }
2498
2499 #[test]
2500 fn counts_before_and_after_rollup() -> anyhow::Result<()> {
2501 let (read, mut write) = fjall_db();
2502
2503 let mut batch = TestBatch::default();
2504 batch.create(
2505 "did:plc:person-a",
2506 "a.a.a",
2507 "rkey-aaa",
2508 "{}",
2509 Some("rev-aaa"),
2510 None,
2511 10_000,
2512 );
2513 batch.create(
2514 "did:plc:person-b",
2515 "a.a.a",
2516 "rkey-bbb",
2517 "{}",
2518 Some("rev-bbb"),
2519 None,
2520 10_001,
2521 );
2522 write.insert_batch(batch.batch)?;
2523
2524 let mut batch = TestBatch::default();
2525 batch.delete_account("did:plc:person-a", 11_000);
2526 write.insert_batch(batch.batch)?;
2527
2528 let mut batch = TestBatch::default();
2529 batch.create(
2530 "did:plc:person-a",
2531 "a.a.a",
2532 "rkey-aac",
2533 "{}",
2534 Some("rev-aac"),
2535 None,
2536 12_000,
2537 );
2538 write.insert_batch(batch.batch)?;
2539
2540 // before any rollup
2541 let JustCount {
2542 creates,
2543 dids_estimate,
2544 ..
2545 } = read.get_collection_counts(
2546 &Nsid::new("a.a.a".to_string()).unwrap(),
2547 beginning(),
2548 None,
2549 )?;
2550 assert_eq!(creates, 0);
2551 assert_eq!(dids_estimate, 0);
2552
2553 // first batch rolled up
2554 let (n, _) = write.step_rollup()?;
2555 assert_eq!(n, 1);
2556
2557 let JustCount {
2558 creates,
2559 dids_estimate,
2560 ..
2561 } = read.get_collection_counts(
2562 &Nsid::new("a.a.a".to_string()).unwrap(),
2563 beginning(),
2564 None,
2565 )?;
2566 assert_eq!(creates, 2);
2567 assert_eq!(dids_estimate, 2);
2568
2569 // delete account rolled up
2570 let (n, _) = write.step_rollup()?;
2571 assert_eq!(n, 1);
2572
2573 let JustCount {
2574 creates,
2575 dids_estimate,
2576 ..
2577 } = read.get_collection_counts(
2578 &Nsid::new("a.a.a".to_string()).unwrap(),
2579 beginning(),
2580 None,
2581 )?;
2582 assert_eq!(creates, 2);
2583 assert_eq!(dids_estimate, 2);
2584
2585 // second batch rolled up
2586 let (n, _) = write.step_rollup()?;
2587 assert_eq!(n, 1);
2588
2589 let JustCount {
2590 creates,
2591 dids_estimate,
2592 ..
2593 } = read.get_collection_counts(
2594 &Nsid::new("a.a.a".to_string()).unwrap(),
2595 beginning(),
2596 None,
2597 )?;
2598 assert_eq!(creates, 3);
2599 assert_eq!(dids_estimate, 2);
2600
2601 // no more rollups left
2602 let (n, _) = write.step_rollup()?;
2603 assert_eq!(n, 0);
2604
2605 Ok(())
2606 }
2607
2608 #[test]
2609 fn get_prefix_children_lexi_empty() {
2610 let (read, _) = fjall_db();
2611 let (
2612 JustCount {
2613 creates,
2614 dids_estimate,
2615 ..
2616 },
2617 children,
2618 cursor,
2619 ) = read
2620 .get_prefix(
2621 NsidPrefix::new("aaa.aaa").unwrap(),
2622 10,
2623 OrderCollectionsBy::Lexi { cursor: None },
2624 None,
2625 None,
2626 )
2627 .unwrap();
2628
2629 assert_eq!(creates, 0);
2630 assert_eq!(dids_estimate, 0);
2631 assert_eq!(children, vec![]);
2632 assert_eq!(cursor, None);
2633 }
2634
2635 #[test]
2636 fn get_prefix_excludes_exact_collection() -> anyhow::Result<()> {
2637 let (read, mut write) = fjall_db();
2638
2639 let mut batch = TestBatch::default();
2640 batch.create(
2641 "did:plc:person-a",
2642 "a.a.a",
2643 "rkey-aaa",
2644 "{}",
2645 Some("rev-aaa"),
2646 None,
2647 10_000,
2648 );
2649 write.insert_batch(batch.batch)?;
2650 write.step_rollup()?;
2651
2652 let (
2653 JustCount {
2654 creates,
2655 dids_estimate,
2656 ..
2657 },
2658 children,
2659 cursor,
2660 ) = read.get_prefix(
2661 NsidPrefix::new("a.a.a").unwrap(),
2662 10,
2663 OrderCollectionsBy::Lexi { cursor: None },
2664 None,
2665 None,
2666 )?;
2667 assert_eq!(creates, 0);
2668 assert_eq!(dids_estimate, 0);
2669 assert_eq!(children, vec![]);
2670 assert_eq!(cursor, None);
2671 Ok(())
2672 }
2673
2674 #[test]
2675 fn get_prefix_excludes_neighbour_collection() -> anyhow::Result<()> {
2676 let (read, mut write) = fjall_db();
2677
2678 let mut batch = TestBatch::default();
2679 batch.create(
2680 "did:plc:person-a",
2681 "a.a.aa",
2682 "rkey-aaa",
2683 "{}",
2684 Some("rev-aaa"),
2685 None,
2686 10_000,
2687 );
2688 write.insert_batch(batch.batch)?;
2689 write.step_rollup()?;
2690
2691 let (
2692 JustCount {
2693 creates,
2694 dids_estimate,
2695 ..
2696 },
2697 children,
2698 cursor,
2699 ) = read.get_prefix(
2700 NsidPrefix::new("a.a.a").unwrap(),
2701 10,
2702 OrderCollectionsBy::Lexi { cursor: None },
2703 None,
2704 None,
2705 )?;
2706 assert_eq!(creates, 0);
2707 assert_eq!(dids_estimate, 0);
2708 assert_eq!(children, vec![]);
2709 assert_eq!(cursor, None);
2710 Ok(())
2711 }
2712
2713 #[test]
2714 fn get_prefix_includes_child_collection() -> anyhow::Result<()> {
2715 let (read, mut write) = fjall_db();
2716
2717 let mut batch = TestBatch::default();
2718 batch.create(
2719 "did:plc:person-a",
2720 "a.a.a",
2721 "rkey-aaa",
2722 "{}",
2723 Some("rev-aaa"),
2724 None,
2725 10_000,
2726 );
2727 write.insert_batch(batch.batch)?;
2728 write.step_rollup()?;
2729
2730 let (
2731 JustCount {
2732 creates,
2733 dids_estimate,
2734 ..
2735 },
2736 children,
2737 cursor,
2738 ) = read.get_prefix(
2739 NsidPrefix::new("a.a").unwrap(),
2740 10,
2741 OrderCollectionsBy::Lexi { cursor: None },
2742 None,
2743 None,
2744 )?;
2745 assert_eq!(creates, 1);
2746 assert_eq!(dids_estimate, 1);
2747 assert_eq!(
2748 children,
2749 vec![PrefixChild::Collection(NsidCount {
2750 nsid: "a.a.a".to_string(),
2751 creates: 1,
2752 updates: 0,
2753 deletes: 0,
2754 dids_estimate: 1
2755 }),]
2756 );
2757 assert_eq!(cursor, None);
2758 Ok(())
2759 }
2760
2761 #[test]
2762 fn get_prefix_includes_child_prefix() -> anyhow::Result<()> {
2763 let (read, mut write) = fjall_db();
2764
2765 let mut batch = TestBatch::default();
2766 batch.create(
2767 "did:plc:person-a",
2768 "a.a.a.a",
2769 "rkey-aaaa",
2770 "{}",
2771 Some("rev-aaaa"),
2772 None,
2773 10_000,
2774 );
2775 write.insert_batch(batch.batch)?;
2776 write.step_rollup()?;
2777
2778 let (
2779 JustCount {
2780 creates,
2781 dids_estimate,
2782 ..
2783 },
2784 children,
2785 cursor,
2786 ) = read.get_prefix(
2787 NsidPrefix::new("a.a").unwrap(),
2788 10,
2789 OrderCollectionsBy::Lexi { cursor: None },
2790 None,
2791 None,
2792 )?;
2793 assert_eq!(creates, 1);
2794 assert_eq!(dids_estimate, 1);
2795 assert_eq!(
2796 children,
2797 vec![PrefixChild::Prefix(PrefixCount {
2798 prefix: "a.a.a".to_string(),
2799 creates: 1,
2800 updates: 0,
2801 deletes: 0,
2802 dids_estimate: 1,
2803 }),]
2804 );
2805 assert_eq!(cursor, None);
2806 Ok(())
2807 }
2808
2809 #[test]
2810 fn get_prefix_merges_child_prefixes() -> anyhow::Result<()> {
2811 let (read, mut write) = fjall_db();
2812
2813 let mut batch = TestBatch::default();
2814 batch.create(
2815 "did:plc:person-a",
2816 "a.a.a.a",
2817 "rkey-aaaa",
2818 "{}",
2819 Some("rev-aaaa"),
2820 None,
2821 10_000,
2822 );
2823 batch.create(
2824 "did:plc:person-a",
2825 "a.a.a.b",
2826 "rkey-aaab",
2827 "{}",
2828 Some("rev-aaab"),
2829 None,
2830 10_001,
2831 );
2832 write.insert_batch(batch.batch)?;
2833 write.step_rollup()?;
2834
2835 let (
2836 JustCount {
2837 creates,
2838 dids_estimate,
2839 ..
2840 },
2841 children,
2842 cursor,
2843 ) = read.get_prefix(
2844 NsidPrefix::new("a.a").unwrap(),
2845 10,
2846 OrderCollectionsBy::Lexi { cursor: None },
2847 None,
2848 None,
2849 )?;
2850 assert_eq!(creates, 2);
2851 assert_eq!(dids_estimate, 1);
2852 assert_eq!(
2853 children,
2854 vec![PrefixChild::Prefix(PrefixCount {
2855 prefix: "a.a.a".to_string(),
2856 creates: 2,
2857 updates: 0,
2858 deletes: 0,
2859 dids_estimate: 1
2860 }),]
2861 );
2862 assert_eq!(cursor, None);
2863 Ok(())
2864 }
2865
2866 #[test]
2867 fn get_prefix_exact_and_child_and_prefix() -> anyhow::Result<()> {
2868 let (read, mut write) = fjall_db();
2869
2870 let mut batch = TestBatch::default();
2871 // exact:
2872 batch.create(
2873 "did:plc:person-a",
2874 "a.a.a",
2875 "rkey-aaa",
2876 "{}",
2877 Some("rev-aaa"),
2878 None,
2879 10_000,
2880 );
2881 // child:
2882 batch.create(
2883 "did:plc:person-a",
2884 "a.a.a.a",
2885 "rkey-aaaa",
2886 "{}",
2887 Some("rev-aaaa"),
2888 None,
2889 10_001,
2890 );
2891 // prefix:
2892 batch.create(
2893 "did:plc:person-a",
2894 "a.a.a.a.a",
2895 "rkey-aaaaa",
2896 "{}",
2897 Some("rev-aaaaa"),
2898 None,
2899 10_002,
2900 );
2901 write.insert_batch(batch.batch)?;
2902 write.step_rollup()?;
2903
2904 let (
2905 JustCount {
2906 creates,
2907 dids_estimate,
2908 ..
2909 },
2910 children,
2911 cursor,
2912 ) = read.get_prefix(
2913 NsidPrefix::new("a.a.a").unwrap(),
2914 10,
2915 OrderCollectionsBy::Lexi { cursor: None },
2916 None,
2917 None,
2918 )?;
2919 assert_eq!(creates, 2);
2920 assert_eq!(dids_estimate, 1);
2921 assert_eq!(
2922 children,
2923 vec![
2924 PrefixChild::Collection(NsidCount {
2925 nsid: "a.a.a.a".to_string(),
2926 creates: 1,
2927 updates: 0,
2928 deletes: 0,
2929 dids_estimate: 1
2930 }),
2931 PrefixChild::Prefix(PrefixCount {
2932 prefix: "a.a.a.a".to_string(),
2933 creates: 1,
2934 updates: 0,
2935 deletes: 0,
2936 dids_estimate: 1
2937 }),
2938 ]
2939 );
2940 assert_eq!(cursor, None);
2941 Ok(())
2942 }
2943}