forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::db_types::{
2 db_complete, DbBytes, DbStaticStr, EncodingResult, StaticStr, SubPrefixBytes,
3};
4use crate::error::StorageError;
5use crate::storage::{StorageResult, StorageWhatever, StoreBackground, StoreReader, StoreWriter};
6use crate::store_types::{
7 AllTimeDidsKey, AllTimeRecordsKey, AllTimeRollupKey, CommitCounts, CountsValue, CursorBucket,
8 DeleteAccountQueueKey, DeleteAccountQueueVal, HourTruncatedCursor, HourlyDidsKey,
9 HourlyRecordsKey, HourlyRollupKey, HourlyRollupStaticPrefix, JetstreamCursorKey,
10 JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey,
11 NewRollupCursorKey, NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal,
12 RecordLocationKey, RecordLocationMeta, RecordLocationVal, RecordRawValue, SketchSecretKey,
13 SketchSecretPrefix, TakeoffKey, TakeoffValue, TrimCollectionCursorKey, WeekTruncatedCursor,
14 WeeklyDidsKey, WeeklyRecordsKey, WeeklyRollupKey, WithCollection, WithRank, HOUR_IN_MICROS,
15 WEEK_IN_MICROS,
16};
17use crate::{
18 nice_duration, CommitAction, ConsumerInfo, Did, EncodingError, EventBatch, JustCount, Nsid,
19 NsidCount, NsidPrefix, OrderCollectionsBy, PrefixChild, PrefixCount, UFOsRecord,
20};
21use async_trait::async_trait;
22use fjall::{
23 Batch as FjallBatch, Config, Keyspace, PartitionCreateOptions, PartitionHandle, Snapshot,
24};
25use jetstream::events::Cursor;
26use metrics::{counter, describe_counter, describe_histogram, histogram, Unit};
27use std::collections::{HashMap, HashSet};
28use std::iter::Peekable;
29use std::ops::Bound;
30use std::path::Path;
31use std::sync::{
32 atomic::{AtomicBool, Ordering},
33 Arc,
34};
35use std::time::{Duration, Instant, SystemTime};
36
37const MAX_BATCHED_ACCOUNT_DELETE_RECORDS: usize = 1024;
38const MAX_BATCHED_ROLLUP_COUNTS: usize = 256;
39
40///
41/// new data format, roughly:
42///
43/// Partition: 'global'
44///
45/// - Global sequence counter (is the jetstream cursor -- monotonic with many gaps)
46/// - key: "js_cursor" (literal)
47/// - val: u64
48///
49/// - Jetstream server endpoint (persisted because the cursor can't be used on another instance without data loss)
50/// - key: "js_endpoint" (literal)
51/// - val: string (URL of the instance)
52///
53/// - Launch date
54/// - key: "takeoff" (literal)
55/// - val: u64 (micros timestamp, not from jetstream for now so not precise)
56///
57/// - Cardinality estimator secret
58/// - key: "sketch_secret" (literal)
59/// - val: [u8; 16]
60///
61/// - Rollup cursor (bg work: roll stats into hourlies, delete accounts, old record deletes)
62/// - key: "rollup_cursor" (literal)
63/// - val: u64 (tracks behind js_cursor)
64///
65/// - Feed trim cursor (bg work: delete oldest excess records)
66/// - key: "trim_cursor" || nullstr (nsid)
67/// - val: u64 (earliest previously-removed feed entry jetstream cursor)
68///
69/// Partition: 'feed'
70///
71/// - Per-collection list of record references ordered by jetstream cursor
72/// - key: nullstr || u64 (collection nsid null-terminated, jetstream cursor)
73/// - val: nullstr || nullstr || nullstr (did, rkey, rev. rev is mostly a sanity-check for now.)
74///
75///
76/// Partition: 'records'
77///
78/// - Actual records by their atproto location
79/// - key: nullstr || nullstr || nullstr (did, collection, rkey)
80/// - val: u64 || bool || nullstr || rawval (js_cursor, is_update, rev, actual record)
81///
82///
83/// Partition: 'rollups'
84///
85/// - Live (batched) records counts and dids estimate per collection
86/// - key: "live_counts" || u64 || nullstr (js_cursor, nsid)
87/// - val: u64 || HLL (count (not cursor), estimator)
88///
89///
90/// - Hourly total record counts and dids estimate per collection
91/// - key: "hourly_counts" || u64 || nullstr (hour, nsid)
92/// - val: u64 || HLL (count (not cursor), estimator)
93///
94/// - Hourly record count ranking
95/// - key: "hourly_rank_records" || u64 || u64 || nullstr (hour, count, nsid)
96/// - val: [empty]
97///
98/// - Hourly did estimate ranking
99/// - key: "hourly_rank_dids" || u64 || u64 || nullstr (hour, dids estimate, nsid)
100/// - val: [empty]
101///
102///
103/// - Weekly total record counts and dids estimate per collection
104/// - key: "weekly_counts" || u64 || nullstr (week, nsid)
105/// - val: u64 || HLL (count (not cursor), estimator)
106///
107/// - Weekly record count ranking
108/// - key: "weekly_rank_records" || u64 || u64 || nullstr (week, count, nsid)
109/// - val: [empty]
110///
111/// - Weekly did estimate ranking
112/// - key: "weekly_rank_dids" || u64 || u64 || nullstr (week, dids estimate, nsid)
113/// - val: [empty]
114///
115///
116/// - All-time total record counts and dids estimate per collection
117/// - key: "ever_counts" || nullstr (nsid)
118/// - val: u64 || HLL (count (not cursor), estimator)
119///
120/// - All-time total record record count ranking
121/// - key: "ever_rank_records" || u64 || nullstr (count, nsid)
122/// - val: [empty]
123///
124/// - All-time did estimate ranking
125/// - key: "ever_rank_dids" || u64 || nullstr (dids estimate, nsid)
126/// - val: [empty]
127///
128///
129/// Partition: 'queues'
130///
131/// - Delete account queue
132/// - key: "delete_acount" || u64 (js_cursor)
133/// - val: nullstr (did)
134///
135///
136/// TODO: moderation actions
137/// TODO: account privacy preferences. Might wait for the protocol-level (PDS-level?) stuff to land. Will probably do lazy fetching + caching on read.
138#[derive(Debug)]
139pub struct FjallStorage {}
140
141#[derive(Debug, Default)]
142pub struct FjallConfig {
143 /// drop the db when the storage is dropped
144 ///
145 /// this is only meant for tests
146 #[cfg(test)]
147 pub temp: bool,
148}
149
150impl StorageWhatever<FjallReader, FjallWriter, FjallBackground, FjallConfig> for FjallStorage {
151 fn init(
152 path: impl AsRef<Path>,
153 endpoint: String,
154 force_endpoint: bool,
155 _config: FjallConfig,
156 ) -> StorageResult<(FjallReader, FjallWriter, Option<Cursor>, SketchSecretPrefix)> {
157 let keyspace = {
158 let config = Config::new(path);
159
160 // #[cfg(not(test))]
161 // let config = config.fsync_ms(Some(4_000));
162
163 config.open()?
164 };
165
166 let global = keyspace.open_partition("global", PartitionCreateOptions::default())?;
167 let feeds = keyspace.open_partition("feeds", PartitionCreateOptions::default())?;
168 let records = keyspace.open_partition("records", PartitionCreateOptions::default())?;
169 let rollups = keyspace.open_partition("rollups", PartitionCreateOptions::default())?;
170 let queues = keyspace.open_partition("queues", PartitionCreateOptions::default())?;
171
172 let js_cursor = get_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?;
173
174 let sketch_secret = if js_cursor.is_some() {
175 let stored_endpoint =
176 get_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)?;
177 let JetstreamEndpointValue(stored) = stored_endpoint.ok_or(StorageError::InitError(
178 "found cursor but missing js_endpoint, refusing to start.".to_string(),
179 ))?;
180
181 let Some(stored_secret) =
182 get_static_neu::<SketchSecretKey, SketchSecretPrefix>(&global)?
183 else {
184 return Err(StorageError::InitError(
185 "found cursor but missing sketch_secret, refusing to start.".to_string(),
186 ));
187 };
188
189 if stored != endpoint {
190 if force_endpoint {
191 log::warn!("forcing a jetstream switch from {stored:?} to {endpoint:?}");
192 insert_static_neu::<JetstreamEndpointKey>(
193 &global,
194 JetstreamEndpointValue(endpoint.to_string()),
195 )?;
196 } else {
197 return Err(StorageError::InitError(format!(
198 "stored js_endpoint {stored:?} differs from provided {endpoint:?}, refusing to start without --jetstream-force.")));
199 }
200 }
201 stored_secret
202 } else {
203 log::info!("initializing a fresh db!");
204 init_static_neu::<JetstreamEndpointKey>(
205 &global,
206 JetstreamEndpointValue(endpoint.to_string()),
207 )?;
208
209 log::info!("generating new secret for cardinality sketches...");
210 let mut sketch_secret: SketchSecretPrefix = [0u8; 16];
211 getrandom::fill(&mut sketch_secret).map_err(|e| {
212 StorageError::InitError(format!(
213 "failed to get a random secret for cardinality sketches: {e:?}"
214 ))
215 })?;
216 init_static_neu::<SketchSecretKey>(&global, sketch_secret)?;
217
218 init_static_neu::<TakeoffKey>(&global, Cursor::at(SystemTime::now()))?;
219 init_static_neu::<NewRollupCursorKey>(&global, Cursor::from_start())?;
220
221 sketch_secret
222 };
223
224 let reader = FjallReader {
225 keyspace: keyspace.clone(),
226 global: global.clone(),
227 feeds: feeds.clone(),
228 records: records.clone(),
229 rollups: rollups.clone(),
230 };
231 let writer = FjallWriter {
232 bg_taken: Arc::new(AtomicBool::new(false)),
233 keyspace,
234 global,
235 feeds,
236 records,
237 rollups,
238 queues,
239 };
240 Ok((reader, writer, js_cursor, sketch_secret))
241 }
242}
243
244type FjallRKV = fjall::Result<(fjall::Slice, fjall::Slice)>;
245
246#[derive(Clone)]
247pub struct FjallReader {
248 keyspace: Keyspace,
249 global: PartitionHandle,
250 feeds: PartitionHandle,
251 records: PartitionHandle,
252 rollups: PartitionHandle,
253}
254
255/// An iterator that knows how to skip over deleted/invalidated records
256struct RecordIterator {
257 db_iter: Box<dyn Iterator<Item = FjallRKV>>,
258 records: PartitionHandle,
259 limit: usize,
260 fetched: usize,
261}
262impl RecordIterator {
263 pub fn new(
264 feeds: &PartitionHandle,
265 records: PartitionHandle,
266 collection: &Nsid,
267 limit: usize,
268 ) -> StorageResult<Self> {
269 let prefix = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?;
270 let db_iter = feeds.prefix(prefix).rev();
271 Ok(Self {
272 db_iter: Box::new(db_iter),
273 records,
274 limit,
275 fetched: 0,
276 })
277 }
278 fn get_record(&self, db_next: FjallRKV) -> StorageResult<Option<UFOsRecord>> {
279 let (key_bytes, val_bytes) = db_next?;
280 let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?;
281 let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?;
282 let location_key: RecordLocationKey = (&feed_key, &feed_val).into();
283
284 let Some(location_val_bytes) = self.records.get(location_key.to_db_bytes()?)? else {
285 // record was deleted (hopefully)
286 return Ok(None);
287 };
288
289 let (meta, n) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?;
290
291 if meta.cursor() != feed_key.cursor() {
292 // older/different version
293 return Ok(None);
294 }
295 if meta.rev != feed_val.rev() {
296 // weird...
297 log::warn!("record lookup: cursor match but rev did not...? excluding.");
298 return Ok(None);
299 }
300 let Some(raw_value_bytes) = location_val_bytes.get(n..) else {
301 log::warn!(
302 "record lookup: found record but could not get bytes to decode the record??"
303 );
304 return Ok(None);
305 };
306 let rawval = db_complete::<RecordRawValue>(raw_value_bytes)?;
307 Ok(Some(UFOsRecord {
308 collection: feed_key.collection().clone(),
309 cursor: feed_key.cursor(),
310 did: feed_val.did().clone(),
311 rkey: feed_val.rkey().clone(),
312 rev: meta.rev.to_string(),
313 record: rawval.try_into()?,
314 is_update: meta.is_update,
315 }))
316 }
317}
318impl Iterator for RecordIterator {
319 type Item = StorageResult<Option<UFOsRecord>>;
320 fn next(&mut self) -> Option<Self::Item> {
321 if self.fetched == self.limit {
322 return Some(Ok(None));
323 }
324 let record = loop {
325 let db_next = self.db_iter.next()?; // None short-circuits here
326 match self.get_record(db_next) {
327 Err(e) => return Some(Err(e)),
328 Ok(Some(record)) => break record,
329 Ok(None) => continue,
330 }
331 };
332 self.fetched += 1;
333 Some(Ok(Some(record)))
334 }
335}
336
337type GetCounts = Box<dyn FnOnce() -> StorageResult<CountsValue>>;
338type GetByterCounts = StorageResult<(Nsid, GetCounts)>;
339type NsidCounter = Box<dyn Iterator<Item = GetByterCounts>>;
340fn get_lexi_iter<T: WithCollection + DbBytes + 'static>(
341 snapshot: &Snapshot,
342 start: Bound<Vec<u8>>,
343 end: Bound<Vec<u8>>,
344) -> StorageResult<NsidCounter> {
345 Ok(Box::new(snapshot.range((start, end)).map(|kv| {
346 let (k_bytes, v_bytes) = kv?;
347 let key = db_complete::<T>(&k_bytes)?;
348 let nsid = key.collection().clone();
349 let get_counts: GetCounts = Box::new(move || Ok(db_complete::<CountsValue>(&v_bytes)?));
350 Ok((nsid, get_counts))
351 })))
352}
353type GetRollupKey = Arc<dyn Fn(&Nsid) -> EncodingResult<Vec<u8>>>;
354fn get_lookup_iter<T: WithCollection + WithRank + DbBytes + 'static>(
355 snapshot: lsm_tree::Snapshot,
356 start: Bound<Vec<u8>>,
357 end: Bound<Vec<u8>>,
358 get_rollup_key: GetRollupKey,
359) -> StorageResult<NsidCounter> {
360 Ok(Box::new(snapshot.range((start, end)).rev().map(
361 move |kv| {
362 let (k_bytes, _) = kv?;
363 let key = db_complete::<T>(&k_bytes)?;
364 let nsid = key.collection().clone();
365 let get_counts: GetCounts = Box::new({
366 let nsid = nsid.clone();
367 let snapshot = snapshot.clone();
368 let get_rollup_key = get_rollup_key.clone();
369 move || {
370 let db_count_bytes = snapshot.get(get_rollup_key(&nsid)?)?.expect(
371 "integrity: all-time rank rollup must have corresponding all-time count rollup",
372 );
373 Ok(db_complete::<CountsValue>(&db_count_bytes)?)
374 }
375 });
376 Ok((nsid, get_counts))
377 },
378 )))
379}
380
381type CollectionSerieses = HashMap<Nsid, Vec<CountsValue>>;
382
383impl FjallReader {
384 fn get_storage_stats(&self) -> StorageResult<serde_json::Value> {
385 let rollup_cursor =
386 get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?
387 .map(|c| c.to_raw_u64());
388
389 Ok(serde_json::json!({
390 "keyspace_disk_space": self.keyspace.disk_space(),
391 "keyspace_journal_count": self.keyspace.journal_count(),
392 "keyspace_sequence": self.keyspace.instant(),
393 "rollup_cursor": rollup_cursor,
394 }))
395 }
396
397 fn get_consumer_info(&self) -> StorageResult<ConsumerInfo> {
398 let global = self.global.snapshot();
399
400 let endpoint =
401 get_snapshot_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)?
402 .ok_or(StorageError::BadStateError(
403 "Could not find jetstream endpoint".to_string(),
404 ))?
405 .0;
406
407 let started_at = get_snapshot_static_neu::<TakeoffKey, TakeoffValue>(&global)?
408 .ok_or(StorageError::BadStateError(
409 "Could not find jetstream takeoff time".to_string(),
410 ))?
411 .to_raw_u64();
412
413 let latest_cursor =
414 get_snapshot_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?
415 .map(|c| c.to_raw_u64());
416
417 let rollup_cursor =
418 get_snapshot_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&global)?
419 .map(|c| c.to_raw_u64());
420
421 Ok(ConsumerInfo::Jetstream {
422 endpoint,
423 started_at,
424 latest_cursor,
425 rollup_cursor,
426 })
427 }
428
429 fn get_earliest_hour(&self, rollups: Option<&Snapshot>) -> StorageResult<HourTruncatedCursor> {
430 let cursor = rollups
431 .unwrap_or(&self.rollups.snapshot())
432 .prefix(HourlyRollupStaticPrefix::default().to_db_bytes()?)
433 .next()
434 .transpose()?
435 .map(|(key_bytes, _)| db_complete::<HourlyRollupKey>(&key_bytes))
436 .transpose()?
437 .map(|key| key.cursor())
438 .unwrap_or_else(|| Cursor::from_start().into());
439 Ok(cursor)
440 }
441
442 fn get_lexi_collections(
443 &self,
444 snapshot: Snapshot,
445 limit: usize,
446 cursor: Option<Vec<u8>>,
447 buckets: Vec<CursorBucket>,
448 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> {
449 let cursor_nsid = cursor.as_deref().map(db_complete::<Nsid>).transpose()?;
450 let mut iters: Vec<Peekable<NsidCounter>> = Vec::with_capacity(buckets.len());
451 for bucket in &buckets {
452 let it: NsidCounter = match bucket {
453 CursorBucket::Hour(t) => {
454 let start = cursor_nsid
455 .as_ref()
456 .map(|nsid| HourlyRollupKey::after_nsid(*t, nsid))
457 .unwrap_or_else(|| HourlyRollupKey::start(*t))?;
458 let end = HourlyRollupKey::end(*t)?;
459 get_lexi_iter::<HourlyRollupKey>(&snapshot, start, end)?
460 }
461 CursorBucket::Week(t) => {
462 let start = cursor_nsid
463 .as_ref()
464 .map(|nsid| WeeklyRollupKey::after_nsid(*t, nsid))
465 .unwrap_or_else(|| WeeklyRollupKey::start(*t))?;
466 let end = WeeklyRollupKey::end(*t)?;
467 get_lexi_iter::<WeeklyRollupKey>(&snapshot, start, end)?
468 }
469 CursorBucket::AllTime => {
470 let start = cursor_nsid
471 .as_ref()
472 .map(AllTimeRollupKey::after_nsid)
473 .unwrap_or_else(AllTimeRollupKey::start)?;
474 let end = AllTimeRollupKey::end()?;
475 get_lexi_iter::<AllTimeRollupKey>(&snapshot, start, end)?
476 }
477 };
478 iters.push(it.peekable());
479 }
480
481 let mut out = Vec::new();
482 let mut current_nsid = None;
483 for _ in 0..limit {
484 // double-scan the iters for each element: this could be eliminated but we're starting simple.
485 // first scan: find the lowest nsid
486 // second scan: take + merge, and advance all iters with lowest nsid
487 let mut lowest: Option<Nsid> = None;
488 for iter in &mut iters {
489 if let Some(bla) = iter.peek_mut() {
490 let (nsid, _) = match bla {
491 Ok(v) => v,
492 Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?,
493 };
494 lowest = match lowest {
495 Some(ref current) if nsid.as_str() > current.as_str() => lowest,
496 _ => Some(nsid.clone()),
497 };
498 }
499 }
500 current_nsid = lowest.clone();
501 let Some(nsid) = lowest else { break };
502
503 let mut merged = CountsValue::default();
504 for iter in &mut iters {
505 // unwrap: potential fjall error was already checked & bailed over when peeking in the first loop
506 if let Some(Ok((_, get_counts))) = iter.next_if(|v| v.as_ref().unwrap().0 == nsid) {
507 let counts = get_counts()?;
508 merged.merge(&counts);
509 }
510 }
511 out.push(NsidCount::new(&nsid, &merged));
512 }
513
514 let next_cursor = current_nsid.map(|s| s.to_db_bytes()).transpose()?;
515 Ok((out, next_cursor))
516 }
517
518 fn get_ordered_collections(
519 &self,
520 snapshot: Snapshot,
521 limit: usize,
522 order: OrderCollectionsBy,
523 buckets: Vec<CursorBucket>,
524 ) -> StorageResult<Vec<NsidCount>> {
525 let mut iters: Vec<NsidCounter> = Vec::with_capacity(buckets.len());
526
527 for bucket in buckets {
528 let it: NsidCounter = match (&order, bucket) {
529 (OrderCollectionsBy::RecordsCreated, CursorBucket::Hour(t)) => {
530 get_lookup_iter::<HourlyRecordsKey>(
531 snapshot.clone(),
532 HourlyRecordsKey::start(t)?,
533 HourlyRecordsKey::end(t)?,
534 Arc::new({
535 move |collection| HourlyRollupKey::new(t, collection).to_db_bytes()
536 }),
537 )?
538 }
539 (OrderCollectionsBy::DidsEstimate, CursorBucket::Hour(t)) => {
540 get_lookup_iter::<HourlyDidsKey>(
541 snapshot.clone(),
542 HourlyDidsKey::start(t)?,
543 HourlyDidsKey::end(t)?,
544 Arc::new({
545 move |collection| HourlyRollupKey::new(t, collection).to_db_bytes()
546 }),
547 )?
548 }
549 (OrderCollectionsBy::RecordsCreated, CursorBucket::Week(t)) => {
550 get_lookup_iter::<WeeklyRecordsKey>(
551 snapshot.clone(),
552 WeeklyRecordsKey::start(t)?,
553 WeeklyRecordsKey::end(t)?,
554 Arc::new({
555 move |collection| WeeklyRollupKey::new(t, collection).to_db_bytes()
556 }),
557 )?
558 }
559 (OrderCollectionsBy::DidsEstimate, CursorBucket::Week(t)) => {
560 get_lookup_iter::<WeeklyDidsKey>(
561 snapshot.clone(),
562 WeeklyDidsKey::start(t)?,
563 WeeklyDidsKey::end(t)?,
564 Arc::new({
565 move |collection| WeeklyRollupKey::new(t, collection).to_db_bytes()
566 }),
567 )?
568 }
569 (OrderCollectionsBy::RecordsCreated, CursorBucket::AllTime) => {
570 get_lookup_iter::<AllTimeRecordsKey>(
571 snapshot.clone(),
572 AllTimeRecordsKey::start()?,
573 AllTimeRecordsKey::end()?,
574 Arc::new(|collection| AllTimeRollupKey::new(collection).to_db_bytes()),
575 )?
576 }
577 (OrderCollectionsBy::DidsEstimate, CursorBucket::AllTime) => {
578 get_lookup_iter::<AllTimeDidsKey>(
579 snapshot.clone(),
580 AllTimeDidsKey::start()?,
581 AllTimeDidsKey::end()?,
582 Arc::new(|collection| AllTimeRollupKey::new(collection).to_db_bytes()),
583 )?
584 }
585 (OrderCollectionsBy::Lexi { .. }, _) => unreachable!(),
586 };
587 iters.push(it);
588 }
589
590 // overfetch by taking a bit more than the limit
591 // merge by collection
592 // sort by requested order, take limit, discard all remaining
593 //
594 // this isn't guaranteed to be correct, but it will hopefully be close most of the time:
595 // - it's possible that some NSIDs might score low during some time-buckets, and miss being merged
596 // - overfetching hopefully helps a bit by catching nsids near the threshold more often, but. yeah.
597 //
598 // this thing is heavy, there's probably a better way
599 let mut ranked: HashMap<Nsid, CountsValue> = HashMap::with_capacity(limit * 2);
600 for iter in iters {
601 for pair in iter.take((limit as f64 * 1.3).ceil() as usize) {
602 let (nsid, get_counts) = pair?;
603 let counts = get_counts()?;
604 ranked.entry(nsid).or_default().merge(&counts);
605 }
606 }
607 let mut ranked: Vec<(Nsid, CountsValue)> = ranked.into_iter().collect();
608 match order {
609 OrderCollectionsBy::RecordsCreated => ranked.sort_by_key(|(_, c)| c.counts().creates),
610 OrderCollectionsBy::DidsEstimate => ranked.sort_by_key(|(_, c)| c.dids().estimate()),
611 OrderCollectionsBy::Lexi { .. } => unreachable!(),
612 }
613 let counts = ranked
614 .into_iter()
615 .rev()
616 .take(limit)
617 .map(|(nsid, cv)| NsidCount::new(&nsid, &cv))
618 .collect();
619 Ok(counts)
620 }
621
622 fn get_collections(
623 &self,
624 limit: usize,
625 order: OrderCollectionsBy,
626 since: Option<HourTruncatedCursor>,
627 until: Option<HourTruncatedCursor>,
628 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> {
629 let snapshot = self.rollups.snapshot();
630 let buckets = if let (None, None) = (since, until) {
631 vec![CursorBucket::AllTime]
632 } else {
633 let mut lower = self.get_earliest_hour(Some(&snapshot))?;
634 if let Some(specified) = since {
635 if specified > lower {
636 lower = specified;
637 }
638 }
639 let upper = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into());
640 CursorBucket::buckets_spanning(lower, upper)
641 };
642 match order {
643 OrderCollectionsBy::Lexi { cursor } => {
644 self.get_lexi_collections(snapshot, limit, cursor, buckets)
645 }
646 _ => Ok((
647 self.get_ordered_collections(snapshot, limit, order, buckets)?,
648 None,
649 )),
650 }
651 }
652
653 fn get_lexi_prefix(
654 &self,
655 snapshot: Snapshot,
656 prefix: NsidPrefix,
657 limit: usize,
658 cursor: Option<Vec<u8>>,
659 buckets: Vec<CursorBucket>,
660 ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)> {
661 // let prefix_sub_with_null = prefix.as_str().to_string().to_db_bytes()?;
662 let prefix_sub = String::sub_prefix(&prefix.terminated())?; // with trailing dot to ensure full segment match
663 let cursor_child = cursor
664 .as_deref()
665 .map(|encoded_bytes| {
666 let decoded: String = db_complete(encoded_bytes)?;
667 // TODO: write some tests for cursors, there's probably bugs here
668 let as_sub_prefix_with_null = decoded.to_db_bytes()?;
669 Ok::<_, EncodingError>(as_sub_prefix_with_null)
670 })
671 .transpose()?;
672 let mut iters: Vec<NsidCounter> = Vec::with_capacity(buckets.len());
673 for bucket in &buckets {
674 let it: NsidCounter = match bucket {
675 CursorBucket::Hour(t) => {
676 let start = cursor_child
677 .as_ref()
678 .map(|child| HourlyRollupKey::after_nsid_prefix(*t, child))
679 .unwrap_or_else(|| HourlyRollupKey::after_nsid_prefix(*t, &prefix_sub))?;
680 let end = HourlyRollupKey::nsid_prefix_end(*t, &prefix_sub)?;
681 get_lexi_iter::<HourlyRollupKey>(&snapshot, start, end)?
682 }
683 CursorBucket::Week(t) => {
684 let start = cursor_child
685 .as_ref()
686 .map(|child| WeeklyRollupKey::after_nsid_prefix(*t, child))
687 .unwrap_or_else(|| WeeklyRollupKey::after_nsid_prefix(*t, &prefix_sub))?;
688 let end = WeeklyRollupKey::nsid_prefix_end(*t, &prefix_sub)?;
689 get_lexi_iter::<WeeklyRollupKey>(&snapshot, start, end)?
690 }
691 CursorBucket::AllTime => {
692 let start = cursor_child
693 .as_ref()
694 .map(|child| AllTimeRollupKey::after_nsid_prefix(child))
695 .unwrap_or_else(|| AllTimeRollupKey::after_nsid_prefix(&prefix_sub))?;
696 let end = AllTimeRollupKey::nsid_prefix_end(&prefix_sub)?;
697 get_lexi_iter::<AllTimeRollupKey>(&snapshot, start, end)?
698 }
699 };
700 iters.push(it);
701 }
702
703 // with apologies
704 let mut iters: Vec<_> = iters
705 .into_iter()
706 .map(|it| {
707 it.map(|bla| {
708 bla.map(|(nsid, v)| {
709 let Some(child) = Child::from_prefix(&nsid, &prefix) else {
710 panic!("failed from_prefix: {nsid:?} {prefix:?} (bad iter bounds?)");
711 };
712 (child, v)
713 })
714 })
715 .peekable()
716 })
717 .collect();
718
719 let mut items = Vec::new();
720 let mut prefix_count = CountsValue::default();
721 #[derive(Debug, Clone, PartialEq)]
722 enum Child {
723 FullNsid(Nsid),
724 ChildPrefix(String),
725 }
726 impl Child {
727 fn from_prefix(nsid: &Nsid, prefix: &NsidPrefix) -> Option<Self> {
728 if prefix.is_group_of(nsid) {
729 return Some(Child::FullNsid(nsid.clone()));
730 }
731 let suffix = nsid.as_str().strip_prefix(&format!("{}.", prefix.0))?;
732 let (segment, _) = suffix.split_once('.').unwrap();
733 let child_prefix = format!("{}.{segment}", prefix.0);
734 Some(Child::ChildPrefix(child_prefix))
735 }
736 fn is_before(&self, other: &Child) -> bool {
737 match (self, other) {
738 (Child::FullNsid(s), Child::ChildPrefix(o)) if s.as_str() == o => true,
739 (Child::ChildPrefix(s), Child::FullNsid(o)) if s == o.as_str() => false,
740 (Child::FullNsid(s), Child::FullNsid(o)) => s.as_str() < o.as_str(),
741 (Child::ChildPrefix(s), Child::ChildPrefix(o)) => s < o,
742 (Child::FullNsid(s), Child::ChildPrefix(o)) => s.to_string() < *o,
743 (Child::ChildPrefix(s), Child::FullNsid(o)) => *s < o.to_string(),
744 }
745 }
746 fn into_inner(self) -> String {
747 match self {
748 Child::FullNsid(s) => s.to_string(),
749 Child::ChildPrefix(s) => s,
750 }
751 }
752 }
753 let mut current_child: Option<Child> = None;
754 for _ in 0..limit {
755 // double-scan the iters for each element: this could be eliminated but we're starting simple.
756 // first scan: find the lowest nsid
757 // second scan: take + merge, and advance all iters with lowest nsid
758 let mut lowest: Option<Child> = None;
759 for iter in &mut iters {
760 if let Some(bla) = iter.peek_mut() {
761 let (child, _) = match bla {
762 Ok(v) => v,
763 Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?,
764 };
765
766 lowest = match lowest {
767 Some(ref current) if current.is_before(child) => lowest,
768 _ => Some(child.clone()),
769 };
770 }
771 }
772 current_child = lowest.clone();
773 let Some(child) = lowest else { break };
774
775 let mut merged = CountsValue::default();
776 for iter in &mut iters {
777 // unwrap: potential fjall error was already checked & bailed over when peeking in the first loop
778 while let Some(Ok((_, get_counts))) =
779 iter.next_if(|v| v.as_ref().unwrap().0 == child)
780 {
781 let counts = get_counts()?;
782 prefix_count.merge(&counts);
783 merged.merge(&counts);
784 }
785 }
786 items.push(match child {
787 Child::FullNsid(nsid) => PrefixChild::Collection(NsidCount::new(&nsid, &merged)),
788 Child::ChildPrefix(prefix) => {
789 PrefixChild::Prefix(PrefixCount::new(&prefix, &merged))
790 }
791 });
792 }
793
794 // TODO: could serialize the prefix count (with sketch) into the cursor so that uniqs can actually count up?
795 // ....er the sketch is probably too big
796 // TODO: this is probably buggy on child-type boundaries bleh
797 let next_cursor = current_child
798 .map(|s| s.into_inner().to_db_bytes())
799 .transpose()?;
800
801 Ok(((&prefix_count).into(), items, next_cursor))
802 }
803
804 fn get_prefix(
805 &self,
806 prefix: NsidPrefix,
807 limit: usize,
808 order: OrderCollectionsBy,
809 since: Option<HourTruncatedCursor>,
810 until: Option<HourTruncatedCursor>,
811 ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)> {
812 let snapshot = self.rollups.snapshot();
813 let buckets = if let (None, None) = (since, until) {
814 vec![CursorBucket::AllTime]
815 } else {
816 let mut lower = self.get_earliest_hour(Some(&snapshot))?;
817 if let Some(specified) = since {
818 if specified > lower {
819 lower = specified;
820 }
821 }
822 let upper = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into());
823 CursorBucket::buckets_spanning(lower, upper)
824 };
825 match order {
826 OrderCollectionsBy::Lexi { cursor } => {
827 self.get_lexi_prefix(snapshot, prefix, limit, cursor, buckets)
828 }
829 _ => todo!(),
830 }
831 }
832
833 /// - step: output series time step, in seconds
834 fn get_timeseries(
835 &self,
836 collections: Vec<Nsid>,
837 since: HourTruncatedCursor,
838 until: Option<HourTruncatedCursor>,
839 step: u64,
840 ) -> StorageResult<(Vec<HourTruncatedCursor>, CollectionSerieses)> {
841 if step > WEEK_IN_MICROS {
842 panic!("week-stepping is todo");
843 }
844 let until = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into());
845 let Ok(dt) = Cursor::from(until).duration_since(&Cursor::from(since)) else {
846 return Ok((
847 // empty: until < since
848 vec![],
849 collections.into_iter().map(|c| (c, vec![])).collect(),
850 ));
851 };
852 let n_hours = (dt.as_micros() as u64) / HOUR_IN_MICROS;
853 let mut counts_by_hour = Vec::with_capacity(n_hours as usize);
854 let snapshot = self.rollups.snapshot();
855 for hour in (0..n_hours).map(|i| since.nth_next(i)) {
856 let mut counts = Vec::with_capacity(collections.len());
857 for nsid in &collections {
858 let count = snapshot
859 .get(&HourlyRollupKey::new(hour, nsid).to_db_bytes()?)?
860 .as_deref()
861 .map(db_complete::<CountsValue>)
862 .transpose()?
863 .unwrap_or_default();
864 counts.push(count);
865 }
866 counts_by_hour.push((hour, counts));
867 }
868
869 let step_hours = step / (HOUR_IN_MICROS / 1_000_000);
870 let mut output_hours = Vec::with_capacity(step_hours as usize);
871 let mut output_series: CollectionSerieses = collections
872 .iter()
873 .map(|c| (c.clone(), Vec::with_capacity(step_hours as usize)))
874 .collect();
875
876 for chunk in counts_by_hour.chunks(step_hours as usize) {
877 output_hours.push(chunk[0].0); // always guaranteed to have at least one element in a chunks chunk
878 for (i, collection) in collections.iter().enumerate() {
879 let mut c = CountsValue::default();
880 for (_, counts) in chunk {
881 c.merge(&counts[i]);
882 }
883 output_series
884 .get_mut(collection)
885 .expect("output series is initialized with all collections")
886 .push(c);
887 }
888 }
889
890 Ok((output_hours, output_series))
891 }
892
893 fn get_collection_counts(
894 &self,
895 collection: &Nsid,
896 since: HourTruncatedCursor,
897 until: Option<HourTruncatedCursor>,
898 ) -> StorageResult<JustCount> {
899 // grab snapshots in case rollups happen while we're working
900 let rollups = self.rollups.snapshot();
901
902 let until = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into());
903 let buckets = CursorBucket::buckets_spanning(since, until);
904 let mut total_counts = CountsValue::default();
905
906 for bucket in buckets {
907 let key = match bucket {
908 CursorBucket::Hour(t) => HourlyRollupKey::new(t, collection).to_db_bytes()?,
909 CursorBucket::Week(t) => WeeklyRollupKey::new(t, collection).to_db_bytes()?,
910 CursorBucket::AllTime => unreachable!(), // TODO: fall back on this if the time span spans the whole dataset?
911 };
912 let count = rollups
913 .get(&key)?
914 .as_deref()
915 .map(db_complete::<CountsValue>)
916 .transpose()?
917 .unwrap_or_default();
918 total_counts.merge(&count);
919 }
920
921 Ok((&total_counts).into())
922 }
923
924 fn get_records_by_collections(
925 &self,
926 collections: HashSet<Nsid>,
927 limit: usize,
928 expand_each_collection: bool,
929 ) -> StorageResult<Vec<UFOsRecord>> {
930 if collections.is_empty() {
931 return Ok(vec![]);
932 }
933 let mut record_iterators = Vec::new();
934 for collection in collections {
935 let iter = RecordIterator::new(&self.feeds, self.records.clone(), &collection, limit)?;
936 record_iterators.push(iter.peekable());
937 }
938 let mut merged = Vec::new();
939 loop {
940 let mut latest: Option<(Cursor, usize)> = None; // ugh
941 for (i, iter) in record_iterators.iter_mut().enumerate() {
942 let Some(it) = iter.peek_mut() else {
943 continue;
944 };
945 let it = match it {
946 Ok(v) => v,
947 Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?,
948 };
949 let Some(rec) = it else {
950 if expand_each_collection {
951 continue;
952 } else {
953 break;
954 }
955 };
956 if let Some((cursor, _)) = latest {
957 if rec.cursor > cursor {
958 latest = Some((rec.cursor, i))
959 }
960 } else {
961 latest = Some((rec.cursor, i));
962 }
963 }
964 let Some((_, idx)) = latest else {
965 break;
966 };
967 // yeah yeah whateverrrrrrrrrrrrrrrr
968 merged.push(record_iterators[idx].next().unwrap().unwrap().unwrap());
969 }
970 Ok(merged)
971 }
972
973 fn search_collections(&self, terms: Vec<String>) -> StorageResult<Vec<NsidCount>> {
974 let start = AllTimeRollupKey::start()?;
975 let end = AllTimeRollupKey::end()?;
976 let mut matches = Vec::new();
977 let limit = 16; // TODO: param
978 for kv in self.rollups.range((start, end)) {
979 let (key_bytes, val_bytes) = kv?;
980 let key = db_complete::<AllTimeRollupKey>(&key_bytes)?;
981 let nsid = key.collection();
982 for term in &terms {
983 if nsid.contains(term) {
984 let counts = db_complete::<CountsValue>(&val_bytes)?;
985 matches.push(NsidCount::new(nsid, &counts));
986 break;
987 }
988 }
989 if matches.len() >= limit {
990 break;
991 }
992 }
993 // TODO: indicate incomplete results
994 Ok(matches)
995 }
996}
997
998#[async_trait]
999impl StoreReader for FjallReader {
1000 fn name(&self) -> String {
1001 "fjall storage v2".into()
1002 }
1003 async fn get_storage_stats(&self) -> StorageResult<serde_json::Value> {
1004 let s = self.clone();
1005 tokio::task::spawn_blocking(move || FjallReader::get_storage_stats(&s)).await?
1006 }
1007 async fn get_consumer_info(&self) -> StorageResult<ConsumerInfo> {
1008 let s = self.clone();
1009 tokio::task::spawn_blocking(move || FjallReader::get_consumer_info(&s)).await?
1010 }
1011 async fn get_collections(
1012 &self,
1013 limit: usize,
1014 order: OrderCollectionsBy,
1015 since: Option<HourTruncatedCursor>,
1016 until: Option<HourTruncatedCursor>,
1017 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> {
1018 let s = self.clone();
1019 tokio::task::spawn_blocking(move || {
1020 FjallReader::get_collections(&s, limit, order, since, until)
1021 })
1022 .await?
1023 }
1024 async fn get_prefix(
1025 &self,
1026 prefix: NsidPrefix,
1027 limit: usize,
1028 order: OrderCollectionsBy,
1029 since: Option<HourTruncatedCursor>,
1030 until: Option<HourTruncatedCursor>,
1031 ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)> {
1032 let s = self.clone();
1033 tokio::task::spawn_blocking(move || {
1034 FjallReader::get_prefix(&s, prefix, limit, order, since, until)
1035 })
1036 .await?
1037 }
1038 async fn get_timeseries(
1039 &self,
1040 collections: Vec<Nsid>,
1041 since: HourTruncatedCursor,
1042 until: Option<HourTruncatedCursor>,
1043 step: u64,
1044 ) -> StorageResult<(Vec<HourTruncatedCursor>, CollectionSerieses)> {
1045 let s = self.clone();
1046 tokio::task::spawn_blocking(move || {
1047 FjallReader::get_timeseries(&s, collections, since, until, step)
1048 })
1049 .await?
1050 }
1051 async fn get_collection_counts(
1052 &self,
1053 collection: &Nsid,
1054 since: HourTruncatedCursor,
1055 until: Option<HourTruncatedCursor>,
1056 ) -> StorageResult<JustCount> {
1057 let s = self.clone();
1058 let collection = collection.clone();
1059 tokio::task::spawn_blocking(move || {
1060 FjallReader::get_collection_counts(&s, &collection, since, until)
1061 })
1062 .await?
1063 }
1064 async fn get_records_by_collections(
1065 &self,
1066 collections: HashSet<Nsid>,
1067 limit: usize,
1068 expand_each_collection: bool,
1069 ) -> StorageResult<Vec<UFOsRecord>> {
1070 let s = self.clone();
1071 tokio::task::spawn_blocking(move || {
1072 FjallReader::get_records_by_collections(&s, collections, limit, expand_each_collection)
1073 })
1074 .await?
1075 }
1076 async fn search_collections(&self, terms: Vec<String>) -> StorageResult<Vec<NsidCount>> {
1077 let s = self.clone();
1078 tokio::task::spawn_blocking(move || FjallReader::search_collections(&s, terms)).await?
1079 }
1080}
1081
1082#[derive(Clone)]
1083pub struct FjallWriter {
1084 bg_taken: Arc<AtomicBool>,
1085 keyspace: Keyspace,
1086 global: PartitionHandle,
1087 feeds: PartitionHandle,
1088 records: PartitionHandle,
1089 rollups: PartitionHandle,
1090 queues: PartitionHandle,
1091}
1092
1093impl FjallWriter {
1094 fn rollup_delete_account(
1095 &mut self,
1096 cursor: Cursor,
1097 key_bytes: &[u8],
1098 val_bytes: &[u8],
1099 ) -> StorageResult<usize> {
1100 let did = db_complete::<DeleteAccountQueueVal>(val_bytes)?;
1101 self.delete_account(&did)?;
1102 let mut batch = self.keyspace.batch();
1103 batch.remove(&self.queues, key_bytes);
1104 insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, cursor)?;
1105 batch.commit()?;
1106 Ok(1)
1107 }
1108
1109 fn rollup_live_counts(
1110 &mut self,
1111 timelies: impl Iterator<Item = Result<(fjall::Slice, fjall::Slice), fjall::Error>>,
1112 cursor_exclusive_limit: Option<Cursor>,
1113 rollup_limit: usize,
1114 ) -> StorageResult<(usize, HashSet<Nsid>)> {
1115 // current strategy is to buffer counts in mem before writing the rollups
1116 // we *could* read+write every single batch to rollup.. but their merge is associative so
1117 // ...so save the db some work up front? is this worth it? who knows...
1118
1119 let mut dirty_nsids = HashSet::new();
1120
1121 #[derive(Eq, Hash, PartialEq)]
1122 enum Rollup {
1123 Hourly(HourTruncatedCursor),
1124 Weekly(WeekTruncatedCursor),
1125 AllTime,
1126 }
1127
1128 let mut batch = self.keyspace.batch();
1129 let mut cursors_advanced = 0;
1130 let mut last_cursor = Cursor::from_start();
1131 let mut counts_by_rollup: HashMap<(Nsid, Rollup), CountsValue> = HashMap::new();
1132
1133 for (i, kv) in timelies.enumerate() {
1134 if i >= rollup_limit {
1135 break;
1136 }
1137
1138 let (key_bytes, val_bytes) = kv?;
1139 let key = db_complete::<LiveCountsKey>(&key_bytes)?;
1140
1141 if cursor_exclusive_limit
1142 .map(|limit| key.cursor() > limit)
1143 .unwrap_or(false)
1144 {
1145 break;
1146 }
1147
1148 dirty_nsids.insert(key.collection().clone());
1149
1150 batch.remove(&self.rollups, key_bytes);
1151 let val = db_complete::<CountsValue>(&val_bytes)?;
1152 counts_by_rollup
1153 .entry((
1154 key.collection().clone(),
1155 Rollup::Hourly(key.cursor().into()),
1156 ))
1157 .or_default()
1158 .merge(&val);
1159 counts_by_rollup
1160 .entry((
1161 key.collection().clone(),
1162 Rollup::Weekly(key.cursor().into()),
1163 ))
1164 .or_default()
1165 .merge(&val);
1166 counts_by_rollup
1167 .entry((key.collection().clone(), Rollup::AllTime))
1168 .or_default()
1169 .merge(&val);
1170
1171 cursors_advanced += 1;
1172 last_cursor = key.cursor();
1173 }
1174
1175 // go through each new rollup thing and merge it with whatever might already be in the db
1176 for ((nsid, rollup), counts) in counts_by_rollup {
1177 let rollup_key_bytes = match rollup {
1178 Rollup::Hourly(hourly_cursor) => {
1179 HourlyRollupKey::new(hourly_cursor, &nsid).to_db_bytes()?
1180 }
1181 Rollup::Weekly(weekly_cursor) => {
1182 WeeklyRollupKey::new(weekly_cursor, &nsid).to_db_bytes()?
1183 }
1184 Rollup::AllTime => AllTimeRollupKey::new(&nsid).to_db_bytes()?,
1185 };
1186 let mut rolled: CountsValue = self
1187 .rollups
1188 .get(&rollup_key_bytes)?
1189 .as_deref()
1190 .map(db_complete::<CountsValue>)
1191 .transpose()?
1192 .unwrap_or_default();
1193
1194 // now that we have values, we can know the exising ranks
1195 let before_creates_count = rolled.counts().creates;
1196 let before_dids_estimate = rolled.dids().estimate() as u64;
1197
1198 // update the rollup
1199 rolled.merge(&counts);
1200
1201 // new ranks
1202 let new_creates_count = rolled.counts().creates;
1203 let new_dids_estimate = rolled.dids().estimate() as u64;
1204
1205 // update create-ranked secondary index if rank changed
1206 if new_creates_count != before_creates_count {
1207 let (old_k, new_k) = match rollup {
1208 Rollup::Hourly(cursor) => (
1209 HourlyRecordsKey::new(cursor, before_creates_count.into(), &nsid)
1210 .to_db_bytes()?,
1211 HourlyRecordsKey::new(cursor, new_creates_count.into(), &nsid)
1212 .to_db_bytes()?,
1213 ),
1214 Rollup::Weekly(cursor) => (
1215 WeeklyRecordsKey::new(cursor, before_creates_count.into(), &nsid)
1216 .to_db_bytes()?,
1217 WeeklyRecordsKey::new(cursor, new_creates_count.into(), &nsid)
1218 .to_db_bytes()?,
1219 ),
1220 Rollup::AllTime => (
1221 AllTimeRecordsKey::new(before_creates_count.into(), &nsid).to_db_bytes()?,
1222 AllTimeRecordsKey::new(new_creates_count.into(), &nsid).to_db_bytes()?,
1223 ),
1224 };
1225 batch.remove(&self.rollups, &old_k); // TODO: when fjall gets weak delete, this will hopefully work way better
1226 batch.insert(&self.rollups, &new_k, "");
1227 }
1228
1229 // update dids-ranked secondary index if rank changed
1230 if new_dids_estimate != before_dids_estimate {
1231 let (old_k, new_k) = match rollup {
1232 Rollup::Hourly(cursor) => (
1233 HourlyDidsKey::new(cursor, before_dids_estimate.into(), &nsid)
1234 .to_db_bytes()?,
1235 HourlyDidsKey::new(cursor, new_dids_estimate.into(), &nsid)
1236 .to_db_bytes()?,
1237 ),
1238 Rollup::Weekly(cursor) => (
1239 WeeklyDidsKey::new(cursor, before_dids_estimate.into(), &nsid)
1240 .to_db_bytes()?,
1241 WeeklyDidsKey::new(cursor, new_dids_estimate.into(), &nsid)
1242 .to_db_bytes()?,
1243 ),
1244 Rollup::AllTime => (
1245 AllTimeDidsKey::new(before_dids_estimate.into(), &nsid).to_db_bytes()?,
1246 AllTimeDidsKey::new(new_dids_estimate.into(), &nsid).to_db_bytes()?,
1247 ),
1248 };
1249 batch.remove(&self.rollups, &old_k); // TODO: when fjall gets weak delete, this will hopefully work way better
1250 batch.insert(&self.rollups, &new_k, "");
1251 }
1252
1253 // replace the main counts rollup
1254 batch.insert(&self.rollups, &rollup_key_bytes, &rolled.to_db_bytes()?);
1255 }
1256
1257 insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, last_cursor)?;
1258
1259 batch.commit()?;
1260 Ok((cursors_advanced, dirty_nsids))
1261 }
1262}
1263
1264impl StoreWriter<FjallBackground> for FjallWriter {
1265 fn background_tasks(&mut self, reroll: bool) -> StorageResult<FjallBackground> {
1266 if self.bg_taken.swap(true, Ordering::SeqCst) {
1267 return Err(StorageError::BackgroundAlreadyStarted);
1268 }
1269 describe_histogram!(
1270 "storage_trim_dirty_nsids",
1271 Unit::Count,
1272 "number of NSIDs trimmed"
1273 );
1274 describe_histogram!(
1275 "storage_trim_duration",
1276 Unit::Microseconds,
1277 "how long it took to trim the dirty NSIDs"
1278 );
1279 describe_counter!(
1280 "storage_trim_removed",
1281 Unit::Count,
1282 "how many records were removed during trim"
1283 );
1284 if reroll {
1285 log::info!("reroll: resetting rollup cursor...");
1286 insert_static_neu::<NewRollupCursorKey>(&self.global, Cursor::from_start())?;
1287 log::info!("reroll: clearing trim cursors...");
1288 let mut batch = self.keyspace.batch();
1289 for kv in self
1290 .global
1291 .prefix(TrimCollectionCursorKey::from_prefix_to_db_bytes(
1292 &Default::default(),
1293 )?)
1294 {
1295 let (k, _) = kv?;
1296 batch.remove(&self.global, k);
1297 }
1298 let n = batch.len();
1299 batch.commit()?;
1300 log::info!("reroll: cleared {n} trim cursors.");
1301 }
1302 Ok(FjallBackground(self.clone()))
1303 }
1304
1305 fn insert_batch<const LIMIT: usize>(
1306 &mut self,
1307 event_batch: EventBatch<LIMIT>,
1308 ) -> StorageResult<()> {
1309 if event_batch.is_empty() {
1310 return Ok(());
1311 }
1312
1313 let mut batch = self.keyspace.batch();
1314
1315 // would be nice not to have to iterate everything at once here
1316 let latest = event_batch.latest_cursor().unwrap();
1317
1318 for (nsid, commits) in event_batch.commits_by_nsid {
1319 for commit in commits.commits {
1320 let location_key: RecordLocationKey = (&commit, &nsid).into();
1321
1322 match commit.action {
1323 CommitAction::Cut => {
1324 batch.remove(&self.records, &location_key.to_db_bytes()?);
1325 }
1326 CommitAction::Put(put_action) => {
1327 let feed_key = NsidRecordFeedKey::from_pair(nsid.clone(), commit.cursor);
1328 let feed_val: NsidRecordFeedVal =
1329 (&commit.did, &commit.rkey, commit.rev.as_str()).into();
1330 batch.insert(
1331 &self.feeds,
1332 feed_key.to_db_bytes()?,
1333 feed_val.to_db_bytes()?,
1334 );
1335
1336 let location_val: RecordLocationVal =
1337 (commit.cursor, commit.rev.as_str(), put_action).into();
1338 batch.insert(
1339 &self.records,
1340 &location_key.to_db_bytes()?,
1341 &location_val.to_db_bytes()?,
1342 );
1343 }
1344 }
1345 }
1346 let live_counts_key: LiveCountsKey = (latest, &nsid).into();
1347 let counts_value = CountsValue::new(
1348 CommitCounts {
1349 creates: commits.creates as u64,
1350 updates: commits.updates as u64,
1351 deletes: commits.deletes as u64,
1352 },
1353 commits.dids_estimate,
1354 );
1355 batch.insert(
1356 &self.rollups,
1357 &live_counts_key.to_db_bytes()?,
1358 &counts_value.to_db_bytes()?,
1359 );
1360 }
1361
1362 for remove in event_batch.account_removes {
1363 let queue_key = DeleteAccountQueueKey::new(remove.cursor);
1364 let queue_val: DeleteAccountQueueVal = remove.did;
1365 batch.insert(
1366 &self.queues,
1367 &queue_key.to_db_bytes()?,
1368 &queue_val.to_db_bytes()?,
1369 );
1370 }
1371
1372 batch.insert(
1373 &self.global,
1374 DbStaticStr::<JetstreamCursorKey>::default().to_db_bytes()?,
1375 latest.to_db_bytes()?,
1376 );
1377
1378 batch.commit()?;
1379 Ok(())
1380 }
1381
1382 fn step_rollup(&mut self) -> StorageResult<(usize, HashSet<Nsid>)> {
1383 let mut dirty_nsids = HashSet::new();
1384
1385 let rollup_cursor =
1386 get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?.ok_or(
1387 StorageError::BadStateError("Could not find current rollup cursor".to_string()),
1388 )?;
1389
1390 // timelies
1391 let live_counts_range = LiveCountsKey::range_from_cursor(rollup_cursor)?;
1392 let mut timely_iter = self.rollups.range(live_counts_range).peekable();
1393
1394 let timely_next = timely_iter
1395 .peek_mut()
1396 .map(|kv| -> StorageResult<LiveCountsKey> {
1397 match kv {
1398 Err(e) => Err(std::mem::replace(e, fjall::Error::Poisoned))?,
1399 Ok((key_bytes, _)) => {
1400 let key = db_complete::<LiveCountsKey>(key_bytes)?;
1401 Ok(key)
1402 }
1403 }
1404 })
1405 .transpose()?;
1406
1407 // delete accounts
1408 let delete_accounts_range =
1409 DeleteAccountQueueKey::new(rollup_cursor).range_to_prefix_end()?;
1410
1411 let next_delete = self
1412 .queues
1413 .range(delete_accounts_range)
1414 .next()
1415 .transpose()?
1416 .map(|(key_bytes, val_bytes)| {
1417 db_complete::<DeleteAccountQueueKey>(&key_bytes)
1418 .map(|k| (k.suffix, key_bytes, val_bytes))
1419 })
1420 .transpose()?;
1421
1422 let cursors_stepped = match (timely_next, next_delete) {
1423 (Some(timely), Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => {
1424 if timely.cursor() < delete_cursor {
1425 let (n, dirty) = self.rollup_live_counts(
1426 timely_iter,
1427 Some(delete_cursor),
1428 MAX_BATCHED_ROLLUP_COUNTS,
1429 )?;
1430 dirty_nsids.extend(dirty);
1431 n
1432 } else {
1433 self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)?
1434 }
1435 }
1436 (Some(_), None) => {
1437 let (n, dirty) =
1438 self.rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS)?;
1439 dirty_nsids.extend(dirty);
1440 n
1441 }
1442 (None, Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => {
1443 self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)?
1444 }
1445 (None, None) => 0,
1446 };
1447
1448 Ok((cursors_stepped, dirty_nsids))
1449 }
1450
1451 fn trim_collection(
1452 &mut self,
1453 collection: &Nsid,
1454 limit: usize,
1455 full_scan: bool,
1456 ) -> StorageResult<(usize, usize, bool)> {
1457 let mut dangling_feed_keys_cleaned = 0;
1458 let mut records_deleted = 0;
1459
1460 let live_range = if full_scan {
1461 let start = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?;
1462 let end = NsidRecordFeedKey::prefix_range_end(collection)?;
1463 start..end
1464 } else {
1465 let feed_trim_cursor_key =
1466 TrimCollectionCursorKey::new(collection.clone()).to_db_bytes()?;
1467 let trim_cursor = self
1468 .global
1469 .get(&feed_trim_cursor_key)?
1470 .map(|value_bytes| db_complete(&value_bytes))
1471 .transpose()?
1472 .unwrap_or(Cursor::from_start());
1473 NsidRecordFeedKey::from_pair(collection.clone(), trim_cursor).range_to_prefix_end()?
1474 };
1475
1476 let mut live_records_found = 0;
1477 let mut candidate_new_feed_lower_cursor = None;
1478 let ended_early = false;
1479 let mut current_cursor: Option<Cursor> = None;
1480 for (i, kv) in self.feeds.range(live_range).rev().enumerate() {
1481 if i > 0 && i % 500_000 == 0 {
1482 log::info!(
1483 "trim: at {i} for {:?} (now at {})",
1484 collection.to_string(),
1485 current_cursor
1486 .map(|c| c
1487 .elapsed()
1488 .map(nice_duration)
1489 .unwrap_or("[not past]".into()))
1490 .unwrap_or("??".into()),
1491 );
1492 }
1493 let (key_bytes, val_bytes) = kv?;
1494 let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?;
1495 let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?;
1496 let location_key: RecordLocationKey = (&feed_key, &feed_val).into();
1497 let location_key_bytes = location_key.to_db_bytes()?;
1498
1499 let Some(location_val_bytes) = self.records.get(&location_key_bytes)? else {
1500 // record was deleted (hopefully)
1501 self.feeds.remove(&*key_bytes)?;
1502 dangling_feed_keys_cleaned += 1;
1503 continue;
1504 };
1505
1506 let (meta, _) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?;
1507 current_cursor = Some(meta.cursor());
1508
1509 if meta.cursor() != feed_key.cursor() {
1510 // older/different version
1511 self.feeds.remove(&*key_bytes)?;
1512 dangling_feed_keys_cleaned += 1;
1513 continue;
1514 }
1515 if meta.rev != feed_val.rev() {
1516 // weird...
1517 log::warn!("record lookup: cursor match but rev did not...? removing.");
1518 self.records.remove(&location_key_bytes)?;
1519 self.feeds.remove(&*key_bytes)?;
1520 dangling_feed_keys_cleaned += 1;
1521 continue;
1522 }
1523
1524 live_records_found += 1;
1525 if live_records_found <= limit {
1526 continue;
1527 }
1528 if candidate_new_feed_lower_cursor.is_none() {
1529 candidate_new_feed_lower_cursor = Some(feed_key.cursor());
1530 }
1531
1532 self.feeds.remove(&location_key_bytes)?;
1533 self.feeds.remove(key_bytes)?;
1534 records_deleted += 1;
1535 }
1536
1537 if !ended_early {
1538 if let Some(new_cursor) = candidate_new_feed_lower_cursor {
1539 self.global.insert(
1540 &TrimCollectionCursorKey::new(collection.clone()).to_db_bytes()?,
1541 &new_cursor.to_db_bytes()?,
1542 )?;
1543 }
1544 }
1545
1546 log::trace!("trim_collection ({collection:?}) removed {dangling_feed_keys_cleaned} dangling feed entries and {records_deleted} records (ended early? {ended_early})");
1547 Ok((dangling_feed_keys_cleaned, records_deleted, ended_early))
1548 }
1549
1550 fn delete_account(&mut self, did: &Did) -> Result<usize, StorageError> {
1551 let mut records_deleted = 0;
1552 let mut batch = self.keyspace.batch();
1553 let prefix = RecordLocationKey::from_prefix_to_db_bytes(did)?;
1554 for kv in self.records.prefix(prefix) {
1555 let (key_bytes, _) = kv?;
1556 batch.remove(&self.records, key_bytes);
1557 records_deleted += 1;
1558 if batch.len() >= MAX_BATCHED_ACCOUNT_DELETE_RECORDS {
1559 batch.commit()?;
1560 batch = self.keyspace.batch();
1561 }
1562 }
1563 batch.commit()?;
1564 Ok(records_deleted)
1565 }
1566}
1567
1568pub struct FjallBackground(FjallWriter);
1569
1570#[async_trait]
1571impl StoreBackground for FjallBackground {
1572 async fn run(mut self, backfill: bool) -> StorageResult<()> {
1573 let mut dirty_nsids = HashSet::new();
1574
1575 // backfill condition here is iffy -- longer is good when doing the main ingest and then collection trims
1576 // shorter once those are done helps things catch up
1577 // the best setting for non-backfill is non-obvious.. it can be pretty slow and still be fine
1578 let mut rollup =
1579 tokio::time::interval(Duration::from_micros(if backfill { 100 } else { 32_000 }));
1580 rollup.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1581
1582 // backfill condition again iffy. collection trims should probably happen in their own phase.
1583 let mut trim = tokio::time::interval(Duration::from_secs(if backfill { 18 } else { 9 }));
1584 trim.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1585
1586 loop {
1587 tokio::select! {
1588 _ = rollup.tick() => {
1589 let mut db = self.0.clone();
1590 let (n, dirty) = tokio::task::spawn_blocking(move || db.step_rollup()).await??;
1591 if n == 0 {
1592 rollup.reset_after(Duration::from_millis(1_200)); // we're caught up, take a break
1593 }
1594 dirty_nsids.extend(dirty);
1595 log::trace!("rolled up {n} items ({} collections now dirty)", dirty_nsids.len());
1596 },
1597 _ = trim.tick() => {
1598 let n = dirty_nsids.len();
1599 log::trace!("trimming {n} nsids: {dirty_nsids:?}");
1600 let t0 = Instant::now();
1601 let (mut total_danglers, mut total_deleted) = (0, 0);
1602 let mut completed = HashSet::new();
1603 for collection in &dirty_nsids {
1604 let mut db = self.0.clone();
1605 let c = collection.clone();
1606 let (danglers, deleted, ended_early) = tokio::task::spawn_blocking(move || db.trim_collection(&c, 512, false)).await??;
1607 total_danglers += danglers;
1608 total_deleted += deleted;
1609 if !ended_early {
1610 completed.insert(collection.clone());
1611 }
1612 if total_deleted > 10_000_000 {
1613 log::info!("trim stopped early, more than 10M records already deleted.");
1614 break;
1615 }
1616 }
1617 let dt = t0.elapsed();
1618 log::trace!("finished trimming {n} nsids in {dt:?}: {total_danglers} dangling and {total_deleted} total removed.");
1619 histogram!("storage_trim_dirty_nsids").record(completed.len() as f64);
1620 histogram!("storage_trim_duration").record(dt.as_micros() as f64);
1621 counter!("storage_trim_removed", "dangling" => "true").increment(total_danglers as u64);
1622 counter!("storage_trim_removed", "dangling" => "false").increment((total_deleted - total_danglers) as u64);
1623 for c in completed {
1624 dirty_nsids.remove(&c);
1625 }
1626 },
1627 };
1628 }
1629 }
1630}
1631
1632/// Get a value from a fixed key
1633fn get_static_neu<K: StaticStr, V: DbBytes>(global: &PartitionHandle) -> StorageResult<Option<V>> {
1634 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
1635 let value = global
1636 .get(&key_bytes)?
1637 .map(|value_bytes| db_complete(&value_bytes))
1638 .transpose()?;
1639 Ok(value)
1640}
1641
1642/// Get a value from a fixed key
1643fn get_snapshot_static_neu<K: StaticStr, V: DbBytes>(
1644 global: &fjall::Snapshot,
1645) -> StorageResult<Option<V>> {
1646 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
1647 let value = global
1648 .get(&key_bytes)?
1649 .map(|value_bytes| db_complete(&value_bytes))
1650 .transpose()?;
1651 Ok(value)
1652}
1653
1654/// Set a value to a fixed key
1655fn insert_static_neu<K: StaticStr>(
1656 global: &PartitionHandle,
1657 value: impl DbBytes,
1658) -> StorageResult<()> {
1659 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
1660 let value_bytes = value.to_db_bytes()?;
1661 global.insert(&key_bytes, &value_bytes)?;
1662 Ok(())
1663}
1664
1665/// Set a value to a fixed key, erroring if the value already exists
1666///
1667/// Intended for single-threaded init: not safe under concurrency, since there
1668/// is no transaction between checking if the already exists and writing it.
1669fn init_static_neu<K: StaticStr>(
1670 global: &PartitionHandle,
1671 value: impl DbBytes,
1672) -> StorageResult<()> {
1673 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
1674 if global.get(&key_bytes)?.is_some() {
1675 return Err(StorageError::InitError(format!(
1676 "init failed: value for key {key_bytes:?} already exists"
1677 )));
1678 }
1679 let value_bytes = value.to_db_bytes()?;
1680 global.insert(&key_bytes, &value_bytes)?;
1681 Ok(())
1682}
1683
1684/// Set a value to a fixed key
1685fn insert_batch_static_neu<K: StaticStr>(
1686 batch: &mut FjallBatch,
1687 global: &PartitionHandle,
1688 value: impl DbBytes,
1689) -> StorageResult<()> {
1690 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
1691 let value_bytes = value.to_db_bytes()?;
1692 batch.insert(global, &key_bytes, &value_bytes);
1693 Ok(())
1694}
1695
1696#[derive(Debug, serde::Serialize, schemars::JsonSchema)]
1697pub struct StorageInfo {
1698 pub keyspace_disk_space: u64,
1699 pub keyspace_journal_count: usize,
1700 pub keyspace_sequence: u64,
1701 pub global_approximate_len: usize,
1702}
1703
1704////////// temp stuff to remove:
1705
1706#[cfg(test)]
1707mod tests {
1708 use super::*;
1709 use crate::{DeleteAccount, RecordKey, UFOsCommit};
1710 use jetstream::events::{CommitEvent, CommitOp};
1711 use jetstream::exports::Cid;
1712 use serde_json::value::RawValue;
1713
1714 fn fjall_db() -> (FjallReader, FjallWriter) {
1715 let (read, write, _, _) = FjallStorage::init(
1716 tempfile::tempdir().unwrap(),
1717 "offline test (no real jetstream endpoint)".to_string(),
1718 false,
1719 FjallConfig { temp: true },
1720 )
1721 .unwrap();
1722 (read, write)
1723 }
1724
1725 const TEST_BATCH_LIMIT: usize = 16;
1726 fn beginning() -> HourTruncatedCursor {
1727 Cursor::from_start().into()
1728 }
1729
1730 #[derive(Debug, Default)]
1731 struct TestBatch {
1732 pub batch: EventBatch<TEST_BATCH_LIMIT>,
1733 }
1734
1735 impl TestBatch {
1736 #[allow(clippy::too_many_arguments)]
1737 pub fn create(
1738 &mut self,
1739 did: &str,
1740 collection: &str,
1741 rkey: &str,
1742 record: &str,
1743 rev: Option<&str>,
1744 cid: Option<Cid>,
1745 cursor: u64,
1746 ) -> Nsid {
1747 let did = Did::new(did.to_string()).unwrap();
1748 let collection = Nsid::new(collection.to_string()).unwrap();
1749 let record = RawValue::from_string(record.to_string()).unwrap();
1750 let cid = cid.unwrap_or(
1751 "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy"
1752 .parse()
1753 .unwrap(),
1754 );
1755
1756 let event = CommitEvent {
1757 collection,
1758 rkey: RecordKey::new(rkey.to_string()).unwrap(),
1759 rev: rev.unwrap_or("asdf").to_string(),
1760 operation: CommitOp::Create,
1761 record: Some(record),
1762 cid: Some(cid),
1763 };
1764
1765 let (commit, collection) =
1766 UFOsCommit::from_commit_info(event, did.clone(), Cursor::from_raw_u64(cursor))
1767 .unwrap();
1768
1769 self.batch
1770 .commits_by_nsid
1771 .entry(collection.clone())
1772 .or_default()
1773 .truncating_insert(commit, &[0u8; 16])
1774 .unwrap();
1775
1776 collection
1777 }
1778 #[allow(clippy::too_many_arguments)]
1779 pub fn update(
1780 &mut self,
1781 did: &str,
1782 collection: &str,
1783 rkey: &str,
1784 record: &str,
1785 rev: Option<&str>,
1786 cid: Option<Cid>,
1787 cursor: u64,
1788 ) -> Nsid {
1789 let did = Did::new(did.to_string()).unwrap();
1790 let collection = Nsid::new(collection.to_string()).unwrap();
1791 let record = RawValue::from_string(record.to_string()).unwrap();
1792 let cid = cid.unwrap_or(
1793 "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy"
1794 .parse()
1795 .unwrap(),
1796 );
1797
1798 let event = CommitEvent {
1799 collection,
1800 rkey: RecordKey::new(rkey.to_string()).unwrap(),
1801 rev: rev.unwrap_or("asdf").to_string(),
1802 operation: CommitOp::Update,
1803 record: Some(record),
1804 cid: Some(cid),
1805 };
1806
1807 let (commit, collection) =
1808 UFOsCommit::from_commit_info(event, did.clone(), Cursor::from_raw_u64(cursor))
1809 .unwrap();
1810
1811 self.batch
1812 .commits_by_nsid
1813 .entry(collection.clone())
1814 .or_default()
1815 .truncating_insert(commit, &[0u8; 16])
1816 .unwrap();
1817
1818 collection
1819 }
1820 #[allow(clippy::too_many_arguments)]
1821 pub fn delete(
1822 &mut self,
1823 did: &str,
1824 collection: &str,
1825 rkey: &str,
1826 rev: Option<&str>,
1827 cursor: u64,
1828 ) -> Nsid {
1829 let did = Did::new(did.to_string()).unwrap();
1830 let collection = Nsid::new(collection.to_string()).unwrap();
1831 let event = CommitEvent {
1832 collection,
1833 rkey: RecordKey::new(rkey.to_string()).unwrap(),
1834 rev: rev.unwrap_or("asdf").to_string(),
1835 operation: CommitOp::Delete,
1836 record: None,
1837 cid: None,
1838 };
1839
1840 let (commit, collection) =
1841 UFOsCommit::from_commit_info(event, did, Cursor::from_raw_u64(cursor)).unwrap();
1842
1843 self.batch
1844 .commits_by_nsid
1845 .entry(collection.clone())
1846 .or_default()
1847 .truncating_insert(commit, &[0u8; 16])
1848 .unwrap();
1849
1850 collection
1851 }
1852 pub fn delete_account(&mut self, did: &str, cursor: u64) -> Did {
1853 let did = Did::new(did.to_string()).unwrap();
1854 self.batch.account_removes.push(DeleteAccount {
1855 did: did.clone(),
1856 cursor: Cursor::from_raw_u64(cursor),
1857 });
1858 did
1859 }
1860 }
1861
1862 #[test]
1863 fn test_hello() -> anyhow::Result<()> {
1864 let (read, mut write) = fjall_db();
1865 write.insert_batch::<TEST_BATCH_LIMIT>(EventBatch::default())?;
1866 let JustCount {
1867 creates,
1868 dids_estimate,
1869 ..
1870 } = read.get_collection_counts(
1871 &Nsid::new("a.b.c".to_string()).unwrap(),
1872 beginning(),
1873 None,
1874 )?;
1875 assert_eq!(creates, 0);
1876 assert_eq!(dids_estimate, 0);
1877 Ok(())
1878 }
1879
1880 #[test]
1881 fn test_insert_one() -> anyhow::Result<()> {
1882 let (read, mut write) = fjall_db();
1883
1884 let mut batch = TestBatch::default();
1885 let collection = batch.create(
1886 "did:plc:inze6wrmsm7pjl7yta3oig77",
1887 "a.b.c",
1888 "asdf",
1889 "{}",
1890 Some("rev-z"),
1891 None,
1892 100,
1893 );
1894 write.insert_batch(batch.batch)?;
1895 write.step_rollup()?;
1896
1897 let JustCount {
1898 creates,
1899 dids_estimate,
1900 ..
1901 } = read.get_collection_counts(&collection, beginning(), None)?;
1902 assert_eq!(creates, 1);
1903 assert_eq!(dids_estimate, 1);
1904 let JustCount {
1905 creates,
1906 dids_estimate,
1907 ..
1908 } = read.get_collection_counts(
1909 &Nsid::new("d.e.f".to_string()).unwrap(),
1910 beginning(),
1911 None,
1912 )?;
1913 assert_eq!(creates, 0);
1914 assert_eq!(dids_estimate, 0);
1915
1916 let records = read.get_records_by_collections([collection].into(), 2, false)?;
1917 assert_eq!(records.len(), 1);
1918 let rec = &records[0];
1919 assert_eq!(rec.record.get(), "{}");
1920 assert!(!rec.is_update);
1921
1922 let records = read.get_records_by_collections(
1923 [Nsid::new("d.e.f".to_string()).unwrap()].into(),
1924 2,
1925 false,
1926 )?;
1927 assert_eq!(records.len(), 0);
1928
1929 Ok(())
1930 }
1931
1932 #[test]
1933 fn test_get_multi_collection() -> anyhow::Result<()> {
1934 let (read, mut write) = fjall_db();
1935
1936 let mut batch = TestBatch::default();
1937 batch.create(
1938 "did:plc:inze6wrmsm7pjl7yta3oig77",
1939 "a.a.a",
1940 "aaa",
1941 r#""earliest""#,
1942 Some("rev-a"),
1943 None,
1944 100,
1945 );
1946 batch.create(
1947 "did:plc:inze6wrmsm7pjl7yta3oig77",
1948 "a.a.b",
1949 "aab",
1950 r#""in between""#,
1951 Some("rev-ab"),
1952 None,
1953 101,
1954 );
1955 batch.create(
1956 "did:plc:inze6wrmsm7pjl7yta3oig77",
1957 "a.a.a",
1958 "aaa-2",
1959 r#""last""#,
1960 Some("rev-a-2"),
1961 None,
1962 102,
1963 );
1964 write.insert_batch(batch.batch)?;
1965
1966 let records = read.get_records_by_collections(
1967 HashSet::from([
1968 Nsid::new("a.a.a".to_string()).unwrap(),
1969 Nsid::new("a.a.b".to_string()).unwrap(),
1970 Nsid::new("a.a.c".to_string()).unwrap(),
1971 ]),
1972 100,
1973 false,
1974 )?;
1975 assert_eq!(records.len(), 3);
1976 assert_eq!(records[0].record.get(), r#""last""#);
1977 assert_eq!(
1978 records[0].collection,
1979 Nsid::new("a.a.a".to_string()).unwrap()
1980 );
1981 assert_eq!(records[1].record.get(), r#""in between""#);
1982 assert_eq!(
1983 records[1].collection,
1984 Nsid::new("a.a.b".to_string()).unwrap()
1985 );
1986 assert_eq!(records[2].record.get(), r#""earliest""#);
1987 assert_eq!(
1988 records[2].collection,
1989 Nsid::new("a.a.a".to_string()).unwrap()
1990 );
1991
1992 Ok(())
1993 }
1994
1995 #[test]
1996 fn test_get_multi_collection_expanded() -> anyhow::Result<()> {
1997 let (read, mut write) = fjall_db();
1998
1999 let mut batch = TestBatch::default();
2000 // insert some older ones in aab
2001 for i in 1..=3 {
2002 batch.create(
2003 "did:plc:inze6wrmsm7pjl7yta3oig77",
2004 "a.a.b",
2005 &format!("aab-{i}"),
2006 &format!(r#""b {i}""#),
2007 Some(&format!("rev-b-{i}")),
2008 None,
2009 100 + i,
2010 );
2011 }
2012 // and some newer ones in aaa
2013 for i in 1..=3 {
2014 batch.create(
2015 "did:plc:inze6wrmsm7pjl7yta3oig77",
2016 "a.a.a",
2017 &format!("aaa-{i}"),
2018 &format!(r#""a {i}""#),
2019 Some(&format!("rev-a-{i}")),
2020 None,
2021 200 + i,
2022 );
2023 }
2024 write.insert_batch(batch.batch)?;
2025
2026 let records = read.get_records_by_collections(
2027 HashSet::from([
2028 Nsid::new("a.a.a".to_string()).unwrap(),
2029 Nsid::new("a.a.b".to_string()).unwrap(),
2030 Nsid::new("a.a.c".to_string()).unwrap(),
2031 ]),
2032 2,
2033 true,
2034 )?;
2035 assert_eq!(records.len(), 4);
2036 assert_eq!(records[0].record.get(), r#""a 3""#);
2037 assert_eq!(
2038 records[0].collection,
2039 Nsid::new("a.a.a".to_string()).unwrap()
2040 );
2041
2042 assert_eq!(records[3].record.get(), r#""b 2""#);
2043 assert_eq!(
2044 records[3].collection,
2045 Nsid::new("a.a.b".to_string()).unwrap()
2046 );
2047
2048 Ok(())
2049 }
2050
2051 #[test]
2052 fn test_update_one() -> anyhow::Result<()> {
2053 let (read, mut write) = fjall_db();
2054
2055 let mut batch = TestBatch::default();
2056 let collection = batch.create(
2057 "did:plc:inze6wrmsm7pjl7yta3oig77",
2058 "a.b.c",
2059 "rkey-asdf",
2060 "{}",
2061 Some("rev-a"),
2062 None,
2063 100,
2064 );
2065 write.insert_batch(batch.batch)?;
2066
2067 let mut batch = TestBatch::default();
2068 batch.update(
2069 "did:plc:inze6wrmsm7pjl7yta3oig77",
2070 "a.b.c",
2071 "rkey-asdf",
2072 r#"{"ch": "ch-ch-ch-changes"}"#,
2073 Some("rev-z"),
2074 None,
2075 101,
2076 );
2077 write.insert_batch(batch.batch)?;
2078 write.step_rollup()?;
2079
2080 let JustCount {
2081 creates,
2082 dids_estimate,
2083 ..
2084 } = read.get_collection_counts(&collection, beginning(), None)?;
2085 assert_eq!(creates, 1);
2086 assert_eq!(dids_estimate, 1);
2087
2088 let records = read.get_records_by_collections([collection].into(), 2, false)?;
2089 assert_eq!(records.len(), 1);
2090 let rec = &records[0];
2091 assert_eq!(rec.record.get(), r#"{"ch": "ch-ch-ch-changes"}"#);
2092 assert!(rec.is_update);
2093 Ok(())
2094 }
2095
2096 #[test]
2097 fn test_delete_one() -> anyhow::Result<()> {
2098 let (read, mut write) = fjall_db();
2099
2100 let mut batch = TestBatch::default();
2101 let collection = batch.create(
2102 "did:plc:inze6wrmsm7pjl7yta3oig77",
2103 "a.b.c",
2104 "rkey-asdf",
2105 "{}",
2106 Some("rev-a"),
2107 None,
2108 100,
2109 );
2110 write.insert_batch(batch.batch)?;
2111
2112 let mut batch = TestBatch::default();
2113 batch.delete(
2114 "did:plc:inze6wrmsm7pjl7yta3oig77",
2115 "a.b.c",
2116 "rkey-asdf",
2117 Some("rev-z"),
2118 101,
2119 );
2120 write.insert_batch(batch.batch)?;
2121 write.step_rollup()?;
2122
2123 let JustCount {
2124 creates,
2125 dids_estimate,
2126 ..
2127 } = read.get_collection_counts(&collection, beginning(), None)?;
2128 assert_eq!(creates, 1);
2129 assert_eq!(dids_estimate, 1);
2130
2131 let records = read.get_records_by_collections([collection].into(), 2, false)?;
2132 assert_eq!(records.len(), 0);
2133
2134 Ok(())
2135 }
2136
2137 #[test]
2138 fn test_collection_trim() -> anyhow::Result<()> {
2139 let (read, mut write) = fjall_db();
2140
2141 let mut batch = TestBatch::default();
2142 batch.create(
2143 "did:plc:inze6wrmsm7pjl7yta3oig77",
2144 "a.a.a",
2145 "rkey-aaa",
2146 "{}",
2147 Some("rev-aaa"),
2148 None,
2149 10_000,
2150 );
2151 let mut last_b_cursor;
2152 for i in 1..=10 {
2153 last_b_cursor = 11_000 + i;
2154 batch.create(
2155 &format!("did:plc:inze6wrmsm7pjl7yta3oig7{}", i % 3),
2156 "a.a.b",
2157 &format!("rkey-bbb-{i}"),
2158 &format!(r#"{{"n": {i}}}"#),
2159 Some(&format!("rev-bbb-{i}")),
2160 None,
2161 last_b_cursor,
2162 );
2163 }
2164 batch.create(
2165 "did:plc:inze6wrmsm7pjl7yta3oig77",
2166 "a.a.c",
2167 "rkey-ccc",
2168 "{}",
2169 Some("rev-ccc"),
2170 None,
2171 12_000,
2172 );
2173
2174 write.insert_batch(batch.batch)?;
2175
2176 let records = read.get_records_by_collections(
2177 HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]),
2178 100,
2179 false,
2180 )?;
2181 assert_eq!(records.len(), 1);
2182 let records = read.get_records_by_collections(
2183 HashSet::from([Nsid::new("a.a.b".to_string()).unwrap()]),
2184 100,
2185 false,
2186 )?;
2187 assert_eq!(records.len(), 10);
2188 let records = read.get_records_by_collections(
2189 HashSet::from([Nsid::new("a.a.c".to_string()).unwrap()]),
2190 100,
2191 false,
2192 )?;
2193 assert_eq!(records.len(), 1);
2194 let records = read.get_records_by_collections(
2195 HashSet::from([Nsid::new("a.a.d".to_string()).unwrap()]),
2196 100,
2197 false,
2198 )?;
2199 assert_eq!(records.len(), 0);
2200
2201 write.trim_collection(&Nsid::new("a.a.a".to_string()).unwrap(), 6, false)?;
2202 write.trim_collection(&Nsid::new("a.a.b".to_string()).unwrap(), 6, false)?;
2203 write.trim_collection(&Nsid::new("a.a.c".to_string()).unwrap(), 6, false)?;
2204 write.trim_collection(&Nsid::new("a.a.d".to_string()).unwrap(), 6, false)?;
2205
2206 let records = read.get_records_by_collections(
2207 HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]),
2208 100,
2209 false,
2210 )?;
2211 assert_eq!(records.len(), 1);
2212 let records = read.get_records_by_collections(
2213 HashSet::from([Nsid::new("a.a.b".to_string()).unwrap()]),
2214 100,
2215 false,
2216 )?;
2217 assert_eq!(records.len(), 6);
2218 let records = read.get_records_by_collections(
2219 HashSet::from([Nsid::new("a.a.c".to_string()).unwrap()]),
2220 100,
2221 false,
2222 )?;
2223 assert_eq!(records.len(), 1);
2224 let records = read.get_records_by_collections(
2225 HashSet::from([Nsid::new("a.a.d".to_string()).unwrap()]),
2226 100,
2227 false,
2228 )?;
2229 assert_eq!(records.len(), 0);
2230
2231 Ok(())
2232 }
2233
2234 #[test]
2235 fn test_delete_account() -> anyhow::Result<()> {
2236 let (read, mut write) = fjall_db();
2237
2238 let mut batch = TestBatch::default();
2239 batch.create(
2240 "did:plc:person-a",
2241 "a.a.a",
2242 "rkey-aaa",
2243 "{}",
2244 Some("rev-aaa"),
2245 None,
2246 10_000,
2247 );
2248 for i in 1..=2 {
2249 batch.create(
2250 "did:plc:person-b",
2251 "a.a.a",
2252 &format!("rkey-bbb-{i}"),
2253 &format!(r#"{{"n": {i}}}"#),
2254 Some(&format!("rev-bbb-{i}")),
2255 None,
2256 11_000 + i,
2257 );
2258 }
2259 write.insert_batch(batch.batch)?;
2260
2261 let records = read.get_records_by_collections(
2262 HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]),
2263 100,
2264 false,
2265 )?;
2266 assert_eq!(records.len(), 3);
2267
2268 let records_deleted =
2269 write.delete_account(&Did::new("did:plc:person-b".to_string()).unwrap())?;
2270 assert_eq!(records_deleted, 2);
2271
2272 let records = read.get_records_by_collections(
2273 HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]),
2274 100,
2275 false,
2276 )?;
2277 assert_eq!(records.len(), 1);
2278
2279 Ok(())
2280 }
2281
2282 #[test]
2283 fn rollup_delete_account_removes_record() -> anyhow::Result<()> {
2284 let (read, mut write) = fjall_db();
2285
2286 let mut batch = TestBatch::default();
2287 batch.create(
2288 "did:plc:person-a",
2289 "a.a.a",
2290 "rkey-aaa",
2291 "{}",
2292 Some("rev-aaa"),
2293 None,
2294 10_000,
2295 );
2296 write.insert_batch(batch.batch)?;
2297
2298 let mut batch = TestBatch::default();
2299 batch.delete_account("did:plc:person-a", 9_999); // queue it before the rollup
2300 write.insert_batch(batch.batch)?;
2301
2302 write.step_rollup()?;
2303
2304 let records = read.get_records_by_collections(
2305 [Nsid::new("a.a.a".to_string()).unwrap()].into(),
2306 1,
2307 false,
2308 )?;
2309 assert_eq!(records.len(), 0);
2310
2311 Ok(())
2312 }
2313
2314 #[test]
2315 fn rollup_delete_live_count_step() -> anyhow::Result<()> {
2316 let (read, mut write) = fjall_db();
2317
2318 let mut batch = TestBatch::default();
2319 batch.create(
2320 "did:plc:person-a",
2321 "a.a.a",
2322 "rkey-aaa",
2323 "{}",
2324 Some("rev-aaa"),
2325 None,
2326 10_000,
2327 );
2328 write.insert_batch(batch.batch)?;
2329
2330 let (n, _) = write.step_rollup()?;
2331 assert_eq!(n, 1);
2332
2333 let mut batch = TestBatch::default();
2334 batch.delete_account("did:plc:person-a", 10_001);
2335 write.insert_batch(batch.batch)?;
2336
2337 let records = read.get_records_by_collections(
2338 [Nsid::new("a.a.a".to_string()).unwrap()].into(),
2339 1,
2340 false,
2341 )?;
2342 assert_eq!(records.len(), 1);
2343
2344 let (n, _) = write.step_rollup()?;
2345 assert_eq!(n, 1);
2346
2347 let records = read.get_records_by_collections(
2348 [Nsid::new("a.a.a".to_string()).unwrap()].into(),
2349 1,
2350 false,
2351 )?;
2352 assert_eq!(records.len(), 0);
2353
2354 let mut batch = TestBatch::default();
2355 batch.delete_account("did:plc:person-a", 9_999);
2356 write.insert_batch(batch.batch)?;
2357
2358 let (n, _) = write.step_rollup()?;
2359 assert_eq!(n, 0);
2360
2361 Ok(())
2362 }
2363
2364 #[test]
2365 fn rollup_multiple_count_batches() -> anyhow::Result<()> {
2366 let (_read, mut write) = fjall_db();
2367
2368 let mut batch = TestBatch::default();
2369 batch.create(
2370 "did:plc:person-a",
2371 "a.a.a",
2372 "rkey-aaa",
2373 "{}",
2374 Some("rev-aaa"),
2375 None,
2376 10_000,
2377 );
2378 write.insert_batch(batch.batch)?;
2379
2380 let mut batch = TestBatch::default();
2381 batch.create(
2382 "did:plc:person-a",
2383 "a.a.a",
2384 "rkey-aab",
2385 "{}",
2386 Some("rev-aab"),
2387 None,
2388 10_001,
2389 );
2390 write.insert_batch(batch.batch)?;
2391
2392 let (n, _) = write.step_rollup()?;
2393 assert_eq!(n, 2);
2394
2395 let (n, _) = write.step_rollup()?;
2396 assert_eq!(n, 0);
2397
2398 Ok(())
2399 }
2400
2401 #[test]
2402 fn counts_before_and_after_rollup() -> anyhow::Result<()> {
2403 let (read, mut write) = fjall_db();
2404
2405 let mut batch = TestBatch::default();
2406 batch.create(
2407 "did:plc:person-a",
2408 "a.a.a",
2409 "rkey-aaa",
2410 "{}",
2411 Some("rev-aaa"),
2412 None,
2413 10_000,
2414 );
2415 batch.create(
2416 "did:plc:person-b",
2417 "a.a.a",
2418 "rkey-bbb",
2419 "{}",
2420 Some("rev-bbb"),
2421 None,
2422 10_001,
2423 );
2424 write.insert_batch(batch.batch)?;
2425
2426 let mut batch = TestBatch::default();
2427 batch.delete_account("did:plc:person-a", 11_000);
2428 write.insert_batch(batch.batch)?;
2429
2430 let mut batch = TestBatch::default();
2431 batch.create(
2432 "did:plc:person-a",
2433 "a.a.a",
2434 "rkey-aac",
2435 "{}",
2436 Some("rev-aac"),
2437 None,
2438 12_000,
2439 );
2440 write.insert_batch(batch.batch)?;
2441
2442 // before any rollup
2443 let JustCount {
2444 creates,
2445 dids_estimate,
2446 ..
2447 } = read.get_collection_counts(
2448 &Nsid::new("a.a.a".to_string()).unwrap(),
2449 beginning(),
2450 None,
2451 )?;
2452 assert_eq!(creates, 0);
2453 assert_eq!(dids_estimate, 0);
2454
2455 // first batch rolled up
2456 let (n, _) = write.step_rollup()?;
2457 assert_eq!(n, 1);
2458
2459 let JustCount {
2460 creates,
2461 dids_estimate,
2462 ..
2463 } = read.get_collection_counts(
2464 &Nsid::new("a.a.a".to_string()).unwrap(),
2465 beginning(),
2466 None,
2467 )?;
2468 assert_eq!(creates, 2);
2469 assert_eq!(dids_estimate, 2);
2470
2471 // delete account rolled up
2472 let (n, _) = write.step_rollup()?;
2473 assert_eq!(n, 1);
2474
2475 let JustCount {
2476 creates,
2477 dids_estimate,
2478 ..
2479 } = read.get_collection_counts(
2480 &Nsid::new("a.a.a".to_string()).unwrap(),
2481 beginning(),
2482 None,
2483 )?;
2484 assert_eq!(creates, 2);
2485 assert_eq!(dids_estimate, 2);
2486
2487 // second batch rolled up
2488 let (n, _) = write.step_rollup()?;
2489 assert_eq!(n, 1);
2490
2491 let JustCount {
2492 creates,
2493 dids_estimate,
2494 ..
2495 } = read.get_collection_counts(
2496 &Nsid::new("a.a.a".to_string()).unwrap(),
2497 beginning(),
2498 None,
2499 )?;
2500 assert_eq!(creates, 3);
2501 assert_eq!(dids_estimate, 2);
2502
2503 // no more rollups left
2504 let (n, _) = write.step_rollup()?;
2505 assert_eq!(n, 0);
2506
2507 Ok(())
2508 }
2509
2510 #[test]
2511 fn get_prefix_children_lexi_empty() {
2512 let (read, _) = fjall_db();
2513 let (
2514 JustCount {
2515 creates,
2516 dids_estimate,
2517 ..
2518 },
2519 children,
2520 cursor,
2521 ) = read
2522 .get_prefix(
2523 NsidPrefix::new("aaa.aaa").unwrap(),
2524 10,
2525 OrderCollectionsBy::Lexi { cursor: None },
2526 None,
2527 None,
2528 )
2529 .unwrap();
2530
2531 assert_eq!(creates, 0);
2532 assert_eq!(dids_estimate, 0);
2533 assert_eq!(children, vec![]);
2534 assert_eq!(cursor, None);
2535 }
2536
2537 #[test]
2538 fn get_prefix_excludes_exact_collection() -> anyhow::Result<()> {
2539 let (read, mut write) = fjall_db();
2540
2541 let mut batch = TestBatch::default();
2542 batch.create(
2543 "did:plc:person-a",
2544 "a.a.a",
2545 "rkey-aaa",
2546 "{}",
2547 Some("rev-aaa"),
2548 None,
2549 10_000,
2550 );
2551 write.insert_batch(batch.batch)?;
2552 write.step_rollup()?;
2553
2554 let (
2555 JustCount {
2556 creates,
2557 dids_estimate,
2558 ..
2559 },
2560 children,
2561 cursor,
2562 ) = read.get_prefix(
2563 NsidPrefix::new("a.a.a").unwrap(),
2564 10,
2565 OrderCollectionsBy::Lexi { cursor: None },
2566 None,
2567 None,
2568 )?;
2569 assert_eq!(creates, 0);
2570 assert_eq!(dids_estimate, 0);
2571 assert_eq!(children, vec![]);
2572 assert_eq!(cursor, None);
2573 Ok(())
2574 }
2575
2576 #[test]
2577 fn get_prefix_excludes_neighbour_collection() -> anyhow::Result<()> {
2578 let (read, mut write) = fjall_db();
2579
2580 let mut batch = TestBatch::default();
2581 batch.create(
2582 "did:plc:person-a",
2583 "a.a.aa",
2584 "rkey-aaa",
2585 "{}",
2586 Some("rev-aaa"),
2587 None,
2588 10_000,
2589 );
2590 write.insert_batch(batch.batch)?;
2591 write.step_rollup()?;
2592
2593 let (
2594 JustCount {
2595 creates,
2596 dids_estimate,
2597 ..
2598 },
2599 children,
2600 cursor,
2601 ) = read.get_prefix(
2602 NsidPrefix::new("a.a.a").unwrap(),
2603 10,
2604 OrderCollectionsBy::Lexi { cursor: None },
2605 None,
2606 None,
2607 )?;
2608 assert_eq!(creates, 0);
2609 assert_eq!(dids_estimate, 0);
2610 assert_eq!(children, vec![]);
2611 assert_eq!(cursor, None);
2612 Ok(())
2613 }
2614
2615 #[test]
2616 fn get_prefix_includes_child_collection() -> anyhow::Result<()> {
2617 let (read, mut write) = fjall_db();
2618
2619 let mut batch = TestBatch::default();
2620 batch.create(
2621 "did:plc:person-a",
2622 "a.a.a",
2623 "rkey-aaa",
2624 "{}",
2625 Some("rev-aaa"),
2626 None,
2627 10_000,
2628 );
2629 write.insert_batch(batch.batch)?;
2630 write.step_rollup()?;
2631
2632 let (
2633 JustCount {
2634 creates,
2635 dids_estimate,
2636 ..
2637 },
2638 children,
2639 cursor,
2640 ) = read.get_prefix(
2641 NsidPrefix::new("a.a").unwrap(),
2642 10,
2643 OrderCollectionsBy::Lexi { cursor: None },
2644 None,
2645 None,
2646 )?;
2647 assert_eq!(creates, 1);
2648 assert_eq!(dids_estimate, 1);
2649 assert_eq!(
2650 children,
2651 vec![PrefixChild::Collection(NsidCount {
2652 nsid: "a.a.a".to_string(),
2653 creates: 1,
2654 updates: 0,
2655 deletes: 0,
2656 dids_estimate: 1
2657 }),]
2658 );
2659 assert_eq!(cursor, None);
2660 Ok(())
2661 }
2662
2663 #[test]
2664 fn get_prefix_includes_child_prefix() -> anyhow::Result<()> {
2665 let (read, mut write) = fjall_db();
2666
2667 let mut batch = TestBatch::default();
2668 batch.create(
2669 "did:plc:person-a",
2670 "a.a.a.a",
2671 "rkey-aaaa",
2672 "{}",
2673 Some("rev-aaaa"),
2674 None,
2675 10_000,
2676 );
2677 write.insert_batch(batch.batch)?;
2678 write.step_rollup()?;
2679
2680 let (
2681 JustCount {
2682 creates,
2683 dids_estimate,
2684 ..
2685 },
2686 children,
2687 cursor,
2688 ) = read.get_prefix(
2689 NsidPrefix::new("a.a").unwrap(),
2690 10,
2691 OrderCollectionsBy::Lexi { cursor: None },
2692 None,
2693 None,
2694 )?;
2695 assert_eq!(creates, 1);
2696 assert_eq!(dids_estimate, 1);
2697 assert_eq!(
2698 children,
2699 vec![PrefixChild::Prefix(PrefixCount {
2700 prefix: "a.a.a".to_string(),
2701 creates: 1,
2702 updates: 0,
2703 deletes: 0,
2704 dids_estimate: 1,
2705 }),]
2706 );
2707 assert_eq!(cursor, None);
2708 Ok(())
2709 }
2710
2711 #[test]
2712 fn get_prefix_merges_child_prefixes() -> anyhow::Result<()> {
2713 let (read, mut write) = fjall_db();
2714
2715 let mut batch = TestBatch::default();
2716 batch.create(
2717 "did:plc:person-a",
2718 "a.a.a.a",
2719 "rkey-aaaa",
2720 "{}",
2721 Some("rev-aaaa"),
2722 None,
2723 10_000,
2724 );
2725 batch.create(
2726 "did:plc:person-a",
2727 "a.a.a.b",
2728 "rkey-aaab",
2729 "{}",
2730 Some("rev-aaab"),
2731 None,
2732 10_001,
2733 );
2734 write.insert_batch(batch.batch)?;
2735 write.step_rollup()?;
2736
2737 let (
2738 JustCount {
2739 creates,
2740 dids_estimate,
2741 ..
2742 },
2743 children,
2744 cursor,
2745 ) = read.get_prefix(
2746 NsidPrefix::new("a.a").unwrap(),
2747 10,
2748 OrderCollectionsBy::Lexi { cursor: None },
2749 None,
2750 None,
2751 )?;
2752 assert_eq!(creates, 2);
2753 assert_eq!(dids_estimate, 1);
2754 assert_eq!(
2755 children,
2756 vec![PrefixChild::Prefix(PrefixCount {
2757 prefix: "a.a.a".to_string(),
2758 creates: 2,
2759 updates: 0,
2760 deletes: 0,
2761 dids_estimate: 1
2762 }),]
2763 );
2764 assert_eq!(cursor, None);
2765 Ok(())
2766 }
2767
2768 #[test]
2769 fn get_prefix_exact_and_child_and_prefix() -> anyhow::Result<()> {
2770 let (read, mut write) = fjall_db();
2771
2772 let mut batch = TestBatch::default();
2773 // exact:
2774 batch.create(
2775 "did:plc:person-a",
2776 "a.a.a",
2777 "rkey-aaa",
2778 "{}",
2779 Some("rev-aaa"),
2780 None,
2781 10_000,
2782 );
2783 // child:
2784 batch.create(
2785 "did:plc:person-a",
2786 "a.a.a.a",
2787 "rkey-aaaa",
2788 "{}",
2789 Some("rev-aaaa"),
2790 None,
2791 10_001,
2792 );
2793 // prefix:
2794 batch.create(
2795 "did:plc:person-a",
2796 "a.a.a.a.a",
2797 "rkey-aaaaa",
2798 "{}",
2799 Some("rev-aaaaa"),
2800 None,
2801 10_002,
2802 );
2803 write.insert_batch(batch.batch)?;
2804 write.step_rollup()?;
2805
2806 let (
2807 JustCount {
2808 creates,
2809 dids_estimate,
2810 ..
2811 },
2812 children,
2813 cursor,
2814 ) = read.get_prefix(
2815 NsidPrefix::new("a.a.a").unwrap(),
2816 10,
2817 OrderCollectionsBy::Lexi { cursor: None },
2818 None,
2819 None,
2820 )?;
2821 assert_eq!(creates, 2);
2822 assert_eq!(dids_estimate, 1);
2823 assert_eq!(
2824 children,
2825 vec![
2826 PrefixChild::Collection(NsidCount {
2827 nsid: "a.a.a.a".to_string(),
2828 creates: 1,
2829 updates: 0,
2830 deletes: 0,
2831 dids_estimate: 1
2832 }),
2833 PrefixChild::Prefix(PrefixCount {
2834 prefix: "a.a.a.a".to_string(),
2835 creates: 1,
2836 updates: 0,
2837 deletes: 0,
2838 dids_estimate: 1
2839 }),
2840 ]
2841 );
2842 assert_eq!(cursor, None);
2843 Ok(())
2844 }
2845}