Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::db_types::{ 2 db_complete, DbBytes, DbStaticStr, EncodingResult, StaticStr, SubPrefixBytes, 3}; 4use crate::error::StorageError; 5use crate::storage::{StorageResult, StorageWhatever, StoreBackground, StoreReader, StoreWriter}; 6use crate::store_types::{ 7 AllTimeDidsKey, AllTimeRecordsKey, AllTimeRollupKey, CommitCounts, CountsValue, CursorBucket, 8 DeleteAccountQueueKey, DeleteAccountQueueVal, HourTruncatedCursor, HourlyDidsKey, 9 HourlyRecordsKey, HourlyRollupKey, HourlyRollupStaticPrefix, JetstreamCursorKey, 10 JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey, 11 NewRollupCursorKey, NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal, 12 RecordLocationKey, RecordLocationMeta, RecordLocationVal, RecordRawValue, SketchSecretKey, 13 SketchSecretPrefix, TakeoffKey, TakeoffValue, TrimCollectionCursorKey, WeekTruncatedCursor, 14 WeeklyDidsKey, WeeklyRecordsKey, WeeklyRollupKey, WithCollection, WithRank, HOUR_IN_MICROS, 15 WEEK_IN_MICROS, 16}; 17use crate::{ 18 nice_duration, CommitAction, ConsumerInfo, Did, EncodingError, EventBatch, JustCount, Nsid, 19 NsidCount, NsidPrefix, OrderCollectionsBy, PrefixChild, PrefixCount, UFOsRecord, 20}; 21use async_trait::async_trait; 22use fjall::{ 23 Batch as FjallBatch, Config, Keyspace, PartitionCreateOptions, PartitionHandle, Snapshot, 24}; 25use jetstream::events::Cursor; 26use lsm_tree::AbstractTree; 27use metrics::{ 28 counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram, Unit, 29}; 30use std::collections::{HashMap, HashSet}; 31use std::iter::Peekable; 32use std::ops::Bound; 33use std::path::Path; 34use std::sync::{ 35 atomic::{AtomicBool, Ordering}, 36 Arc, 37}; 38use std::time::{Duration, Instant, SystemTime}; 39 40const MAX_BATCHED_ACCOUNT_DELETE_RECORDS: usize = 1024; 41const MAX_BATCHED_ROLLUP_COUNTS: usize = 256; 42 43/// 44/// new data format, roughly: 45/// 46/// Partition: 'global' 47/// 48/// - Global sequence counter (is the jetstream cursor -- monotonic with many gaps) 49/// - key: "js_cursor" (literal) 50/// - val: u64 51/// 52/// - Jetstream server endpoint (persisted because the cursor can't be used on another instance without data loss) 53/// - key: "js_endpoint" (literal) 54/// - val: string (URL of the instance) 55/// 56/// - Launch date 57/// - key: "takeoff" (literal) 58/// - val: u64 (micros timestamp, not from jetstream for now so not precise) 59/// 60/// - Cardinality estimator secret 61/// - key: "sketch_secret" (literal) 62/// - val: [u8; 16] 63/// 64/// - Rollup cursor (bg work: roll stats into hourlies, delete accounts, old record deletes) 65/// - key: "rollup_cursor" (literal) 66/// - val: u64 (tracks behind js_cursor) 67/// 68/// - Feed trim cursor (bg work: delete oldest excess records) 69/// - key: "trim_cursor" || nullstr (nsid) 70/// - val: u64 (earliest previously-removed feed entry jetstream cursor) 71/// 72/// Partition: 'feed' 73/// 74/// - Per-collection list of record references ordered by jetstream cursor 75/// - key: nullstr || u64 (collection nsid null-terminated, jetstream cursor) 76/// - val: nullstr || nullstr || nullstr (did, rkey, rev. rev is mostly a sanity-check for now.) 77/// 78/// 79/// Partition: 'records' 80/// 81/// - Actual records by their atproto location 82/// - key: nullstr || nullstr || nullstr (did, collection, rkey) 83/// - val: u64 || bool || nullstr || rawval (js_cursor, is_update, rev, actual record) 84/// 85/// 86/// Partition: 'rollups' 87/// 88/// - Live (batched) records counts and dids estimate per collection 89/// - key: "live_counts" || u64 || nullstr (js_cursor, nsid) 90/// - val: u64 || HLL (count (not cursor), estimator) 91/// 92/// 93/// - Hourly total record counts and dids estimate per collection 94/// - key: "hourly_counts" || u64 || nullstr (hour, nsid) 95/// - val: u64 || HLL (count (not cursor), estimator) 96/// 97/// - Hourly record count ranking 98/// - key: "hourly_rank_records" || u64 || u64 || nullstr (hour, count, nsid) 99/// - val: [empty] 100/// 101/// - Hourly did estimate ranking 102/// - key: "hourly_rank_dids" || u64 || u64 || nullstr (hour, dids estimate, nsid) 103/// - val: [empty] 104/// 105/// 106/// - Weekly total record counts and dids estimate per collection 107/// - key: "weekly_counts" || u64 || nullstr (week, nsid) 108/// - val: u64 || HLL (count (not cursor), estimator) 109/// 110/// - Weekly record count ranking 111/// - key: "weekly_rank_records" || u64 || u64 || nullstr (week, count, nsid) 112/// - val: [empty] 113/// 114/// - Weekly did estimate ranking 115/// - key: "weekly_rank_dids" || u64 || u64 || nullstr (week, dids estimate, nsid) 116/// - val: [empty] 117/// 118/// 119/// - All-time total record counts and dids estimate per collection 120/// - key: "ever_counts" || nullstr (nsid) 121/// - val: u64 || HLL (count (not cursor), estimator) 122/// 123/// - All-time total record record count ranking 124/// - key: "ever_rank_records" || u64 || nullstr (count, nsid) 125/// - val: [empty] 126/// 127/// - All-time did estimate ranking 128/// - key: "ever_rank_dids" || u64 || nullstr (dids estimate, nsid) 129/// - val: [empty] 130/// 131/// 132/// Partition: 'queues' 133/// 134/// - Delete account queue 135/// - key: "delete_acount" || u64 (js_cursor) 136/// - val: nullstr (did) 137/// 138/// 139/// TODO: moderation actions 140/// TODO: account privacy preferences. Might wait for the protocol-level (PDS-level?) stuff to land. Will probably do lazy fetching + caching on read. 141#[derive(Debug)] 142pub struct FjallStorage {} 143 144#[derive(Debug, Default)] 145pub struct FjallConfig { 146 /// drop the db when the storage is dropped 147 /// 148 /// this is only meant for tests 149 #[cfg(test)] 150 pub temp: bool, 151} 152 153impl StorageWhatever<FjallReader, FjallWriter, FjallBackground, FjallConfig> for FjallStorage { 154 fn init( 155 path: impl AsRef<Path>, 156 endpoint: String, 157 force_endpoint: bool, 158 _config: FjallConfig, 159 ) -> StorageResult<(FjallReader, FjallWriter, Option<Cursor>, SketchSecretPrefix)> { 160 let keyspace = { 161 let config = Config::new(path); 162 163 // #[cfg(not(test))] 164 // let config = config.fsync_ms(Some(4_000)); 165 166 config.open()? 167 }; 168 169 let global = keyspace.open_partition("global", PartitionCreateOptions::default())?; 170 let feeds = keyspace.open_partition("feeds", PartitionCreateOptions::default())?; 171 let records = keyspace.open_partition("records", PartitionCreateOptions::default())?; 172 let rollups = keyspace.open_partition("rollups", PartitionCreateOptions::default())?; 173 let queues = keyspace.open_partition("queues", PartitionCreateOptions::default())?; 174 175 let js_cursor = get_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?; 176 177 let sketch_secret = if js_cursor.is_some() { 178 let stored_endpoint = 179 get_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)?; 180 let JetstreamEndpointValue(stored) = stored_endpoint.ok_or(StorageError::InitError( 181 "found cursor but missing js_endpoint, refusing to start.".to_string(), 182 ))?; 183 184 let Some(stored_secret) = 185 get_static_neu::<SketchSecretKey, SketchSecretPrefix>(&global)? 186 else { 187 return Err(StorageError::InitError( 188 "found cursor but missing sketch_secret, refusing to start.".to_string(), 189 )); 190 }; 191 192 if stored != endpoint { 193 if force_endpoint { 194 log::warn!("forcing a jetstream switch from {stored:?} to {endpoint:?}"); 195 insert_static_neu::<JetstreamEndpointKey>( 196 &global, 197 JetstreamEndpointValue(endpoint.to_string()), 198 )?; 199 } else { 200 return Err(StorageError::InitError(format!( 201 "stored js_endpoint {stored:?} differs from provided {endpoint:?}, refusing to start without --jetstream-force."))); 202 } 203 } 204 stored_secret 205 } else { 206 log::info!("initializing a fresh db!"); 207 init_static_neu::<JetstreamEndpointKey>( 208 &global, 209 JetstreamEndpointValue(endpoint.to_string()), 210 )?; 211 212 log::info!("generating new secret for cardinality sketches..."); 213 let mut sketch_secret: SketchSecretPrefix = [0u8; 16]; 214 getrandom::fill(&mut sketch_secret).map_err(|e| { 215 StorageError::InitError(format!( 216 "failed to get a random secret for cardinality sketches: {e:?}" 217 )) 218 })?; 219 init_static_neu::<SketchSecretKey>(&global, sketch_secret)?; 220 221 init_static_neu::<TakeoffKey>(&global, Cursor::at(SystemTime::now()))?; 222 init_static_neu::<NewRollupCursorKey>(&global, Cursor::from_start())?; 223 224 sketch_secret 225 }; 226 227 let reader = FjallReader { 228 keyspace: keyspace.clone(), 229 global: global.clone(), 230 feeds: feeds.clone(), 231 records: records.clone(), 232 rollups: rollups.clone(), 233 queues: queues.clone(), 234 }; 235 reader.describe_metrics(); 236 let writer = FjallWriter { 237 bg_taken: Arc::new(AtomicBool::new(false)), 238 keyspace, 239 global, 240 feeds, 241 records, 242 rollups, 243 queues, 244 }; 245 writer.describe_metrics(); 246 Ok((reader, writer, js_cursor, sketch_secret)) 247 } 248} 249 250type FjallRKV = fjall::Result<(fjall::Slice, fjall::Slice)>; 251 252#[derive(Clone)] 253pub struct FjallReader { 254 keyspace: Keyspace, 255 global: PartitionHandle, 256 feeds: PartitionHandle, 257 records: PartitionHandle, 258 rollups: PartitionHandle, 259 queues: PartitionHandle, 260} 261 262/// An iterator that knows how to skip over deleted/invalidated records 263struct RecordIterator { 264 db_iter: Box<dyn Iterator<Item = FjallRKV>>, 265 records: PartitionHandle, 266 limit: usize, 267 fetched: usize, 268} 269impl RecordIterator { 270 pub fn new( 271 feeds: &PartitionHandle, 272 records: PartitionHandle, 273 collection: &Nsid, 274 limit: usize, 275 ) -> StorageResult<Self> { 276 let prefix = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?; 277 let db_iter = feeds.prefix(prefix).rev(); 278 Ok(Self { 279 db_iter: Box::new(db_iter), 280 records, 281 limit, 282 fetched: 0, 283 }) 284 } 285 fn get_record(&self, db_next: FjallRKV) -> StorageResult<Option<UFOsRecord>> { 286 let (key_bytes, val_bytes) = db_next?; 287 let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?; 288 let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?; 289 let location_key: RecordLocationKey = (&feed_key, &feed_val).into(); 290 291 let Some(location_val_bytes) = self.records.get(location_key.to_db_bytes()?)? else { 292 // record was deleted (hopefully) 293 return Ok(None); 294 }; 295 296 let (meta, n) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?; 297 298 if meta.cursor() != feed_key.cursor() { 299 // older/different version 300 return Ok(None); 301 } 302 if meta.rev != feed_val.rev() { 303 // weird... 304 log::warn!("record lookup: cursor match but rev did not...? excluding."); 305 return Ok(None); 306 } 307 let Some(raw_value_bytes) = location_val_bytes.get(n..) else { 308 log::warn!( 309 "record lookup: found record but could not get bytes to decode the record??" 310 ); 311 return Ok(None); 312 }; 313 let rawval = db_complete::<RecordRawValue>(raw_value_bytes)?; 314 Ok(Some(UFOsRecord { 315 collection: feed_key.collection().clone(), 316 cursor: feed_key.cursor(), 317 did: feed_val.did().clone(), 318 rkey: feed_val.rkey().clone(), 319 rev: meta.rev.to_string(), 320 record: rawval.try_into()?, 321 is_update: meta.is_update, 322 })) 323 } 324} 325impl Iterator for RecordIterator { 326 type Item = StorageResult<Option<UFOsRecord>>; 327 fn next(&mut self) -> Option<Self::Item> { 328 if self.fetched == self.limit { 329 return Some(Ok(None)); 330 } 331 let record = loop { 332 let db_next = self.db_iter.next()?; // None short-circuits here 333 match self.get_record(db_next) { 334 Err(e) => return Some(Err(e)), 335 Ok(Some(record)) => break record, 336 Ok(None) => continue, 337 } 338 }; 339 self.fetched += 1; 340 Some(Ok(Some(record))) 341 } 342} 343 344type GetCounts = Box<dyn FnOnce() -> StorageResult<CountsValue>>; 345type GetByterCounts = StorageResult<(Nsid, GetCounts)>; 346type NsidCounter = Box<dyn Iterator<Item = GetByterCounts>>; 347fn get_lexi_iter<T: WithCollection + DbBytes + 'static>( 348 snapshot: &Snapshot, 349 start: Bound<Vec<u8>>, 350 end: Bound<Vec<u8>>, 351) -> StorageResult<NsidCounter> { 352 Ok(Box::new(snapshot.range((start, end)).map(|kv| { 353 let (k_bytes, v_bytes) = kv?; 354 let key = db_complete::<T>(&k_bytes)?; 355 let nsid = key.collection().clone(); 356 let get_counts: GetCounts = Box::new(move || Ok(db_complete::<CountsValue>(&v_bytes)?)); 357 Ok((nsid, get_counts)) 358 }))) 359} 360type GetRollupKey = Arc<dyn Fn(&Nsid) -> EncodingResult<Vec<u8>>>; 361fn get_lookup_iter<T: WithCollection + WithRank + DbBytes + 'static>( 362 snapshot: lsm_tree::Snapshot, 363 start: Bound<Vec<u8>>, 364 end: Bound<Vec<u8>>, 365 get_rollup_key: GetRollupKey, 366) -> StorageResult<NsidCounter> { 367 Ok(Box::new(snapshot.range((start, end)).rev().map( 368 move |kv| { 369 let (k_bytes, _) = kv?; 370 let key = db_complete::<T>(&k_bytes)?; 371 let nsid = key.collection().clone(); 372 let get_counts: GetCounts = Box::new({ 373 let nsid = nsid.clone(); 374 let snapshot = snapshot.clone(); 375 let get_rollup_key = get_rollup_key.clone(); 376 move || { 377 let db_count_bytes = snapshot.get(get_rollup_key(&nsid)?)?.expect( 378 "integrity: all-time rank rollup must have corresponding all-time count rollup", 379 ); 380 Ok(db_complete::<CountsValue>(&db_count_bytes)?) 381 } 382 }); 383 Ok((nsid, get_counts)) 384 }, 385 ))) 386} 387 388type CollectionSerieses = HashMap<Nsid, Vec<CountsValue>>; 389 390impl FjallReader { 391 fn describe_metrics(&self) { 392 describe_gauge!( 393 "storage_fjall_l0_run_count", 394 Unit::Count, 395 "number of L0 runs in a partition" 396 ); 397 describe_gauge!( 398 "storage_fjall_keyspace_disk_space", 399 Unit::Bytes, 400 "total storage used according to fjall" 401 ); 402 describe_gauge!( 403 "storage_fjall_journal_count", 404 Unit::Count, 405 "total keyspace journals according to fjall" 406 ); 407 describe_gauge!( 408 "storage_fjall_keyspace_sequence", 409 Unit::Count, 410 "fjall keyspace sequence" 411 ); 412 } 413 414 fn get_storage_stats(&self) -> StorageResult<serde_json::Value> { 415 let rollup_cursor = 416 get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)? 417 .map(|c| c.to_raw_u64()); 418 419 Ok(serde_json::json!({ 420 "keyspace_disk_space": self.keyspace.disk_space(), 421 "keyspace_journal_count": self.keyspace.journal_count(), 422 "keyspace_sequence": self.keyspace.instant(), 423 "rollup_cursor": rollup_cursor, 424 })) 425 } 426 427 fn get_consumer_info(&self) -> StorageResult<ConsumerInfo> { 428 let global = self.global.snapshot(); 429 430 let endpoint = 431 get_snapshot_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)? 432 .ok_or(StorageError::BadStateError( 433 "Could not find jetstream endpoint".to_string(), 434 ))? 435 .0; 436 437 let started_at = get_snapshot_static_neu::<TakeoffKey, TakeoffValue>(&global)? 438 .ok_or(StorageError::BadStateError( 439 "Could not find jetstream takeoff time".to_string(), 440 ))? 441 .to_raw_u64(); 442 443 let latest_cursor = 444 get_snapshot_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)? 445 .map(|c| c.to_raw_u64()); 446 447 let rollup_cursor = 448 get_snapshot_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&global)? 449 .map(|c| c.to_raw_u64()); 450 451 Ok(ConsumerInfo::Jetstream { 452 endpoint, 453 started_at, 454 latest_cursor, 455 rollup_cursor, 456 }) 457 } 458 459 fn get_earliest_hour(&self, rollups: Option<&Snapshot>) -> StorageResult<HourTruncatedCursor> { 460 let cursor = rollups 461 .unwrap_or(&self.rollups.snapshot()) 462 .prefix(HourlyRollupStaticPrefix::default().to_db_bytes()?) 463 .next() 464 .transpose()? 465 .map(|(key_bytes, _)| db_complete::<HourlyRollupKey>(&key_bytes)) 466 .transpose()? 467 .map(|key| key.cursor()) 468 .unwrap_or_else(|| Cursor::from_start().into()); 469 Ok(cursor) 470 } 471 472 fn get_lexi_collections( 473 &self, 474 snapshot: Snapshot, 475 limit: usize, 476 cursor: Option<Vec<u8>>, 477 buckets: Vec<CursorBucket>, 478 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> { 479 let cursor_nsid = cursor.as_deref().map(db_complete::<Nsid>).transpose()?; 480 let mut iters: Vec<Peekable<NsidCounter>> = Vec::with_capacity(buckets.len()); 481 for bucket in &buckets { 482 let it: NsidCounter = match bucket { 483 CursorBucket::Hour(t) => { 484 let start = cursor_nsid 485 .as_ref() 486 .map(|nsid| HourlyRollupKey::after_nsid(*t, nsid)) 487 .unwrap_or_else(|| HourlyRollupKey::start(*t))?; 488 let end = HourlyRollupKey::end(*t)?; 489 get_lexi_iter::<HourlyRollupKey>(&snapshot, start, end)? 490 } 491 CursorBucket::Week(t) => { 492 let start = cursor_nsid 493 .as_ref() 494 .map(|nsid| WeeklyRollupKey::after_nsid(*t, nsid)) 495 .unwrap_or_else(|| WeeklyRollupKey::start(*t))?; 496 let end = WeeklyRollupKey::end(*t)?; 497 get_lexi_iter::<WeeklyRollupKey>(&snapshot, start, end)? 498 } 499 CursorBucket::AllTime => { 500 let start = cursor_nsid 501 .as_ref() 502 .map(AllTimeRollupKey::after_nsid) 503 .unwrap_or_else(AllTimeRollupKey::start)?; 504 let end = AllTimeRollupKey::end()?; 505 get_lexi_iter::<AllTimeRollupKey>(&snapshot, start, end)? 506 } 507 }; 508 iters.push(it.peekable()); 509 } 510 511 let mut out = Vec::new(); 512 let mut current_nsid = None; 513 for _ in 0..limit { 514 // double-scan the iters for each element: this could be eliminated but we're starting simple. 515 // first scan: find the lowest nsid 516 // second scan: take + merge, and advance all iters with lowest nsid 517 let mut lowest: Option<Nsid> = None; 518 for iter in &mut iters { 519 if let Some(bla) = iter.peek_mut() { 520 let (nsid, _) = match bla { 521 Ok(v) => v, 522 Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?, 523 }; 524 lowest = match lowest { 525 Some(ref current) if nsid.as_str() > current.as_str() => lowest, 526 _ => Some(nsid.clone()), 527 }; 528 } 529 } 530 current_nsid = lowest.clone(); 531 let Some(nsid) = lowest else { break }; 532 533 let mut merged = CountsValue::default(); 534 for iter in &mut iters { 535 // unwrap: potential fjall error was already checked & bailed over when peeking in the first loop 536 if let Some(Ok((_, get_counts))) = iter.next_if(|v| v.as_ref().unwrap().0 == nsid) { 537 let counts = get_counts()?; 538 merged.merge(&counts); 539 } 540 } 541 out.push(NsidCount::new(&nsid, &merged)); 542 } 543 544 let next_cursor = current_nsid.map(|s| s.to_db_bytes()).transpose()?; 545 Ok((out, next_cursor)) 546 } 547 548 fn get_ordered_collections( 549 &self, 550 snapshot: Snapshot, 551 limit: usize, 552 order: OrderCollectionsBy, 553 buckets: Vec<CursorBucket>, 554 ) -> StorageResult<Vec<NsidCount>> { 555 let mut iters: Vec<NsidCounter> = Vec::with_capacity(buckets.len()); 556 557 for bucket in buckets { 558 let it: NsidCounter = match (&order, bucket) { 559 (OrderCollectionsBy::RecordsCreated, CursorBucket::Hour(t)) => { 560 get_lookup_iter::<HourlyRecordsKey>( 561 snapshot.clone(), 562 HourlyRecordsKey::start(t)?, 563 HourlyRecordsKey::end(t)?, 564 Arc::new({ 565 move |collection| HourlyRollupKey::new(t, collection).to_db_bytes() 566 }), 567 )? 568 } 569 (OrderCollectionsBy::DidsEstimate, CursorBucket::Hour(t)) => { 570 get_lookup_iter::<HourlyDidsKey>( 571 snapshot.clone(), 572 HourlyDidsKey::start(t)?, 573 HourlyDidsKey::end(t)?, 574 Arc::new({ 575 move |collection| HourlyRollupKey::new(t, collection).to_db_bytes() 576 }), 577 )? 578 } 579 (OrderCollectionsBy::RecordsCreated, CursorBucket::Week(t)) => { 580 get_lookup_iter::<WeeklyRecordsKey>( 581 snapshot.clone(), 582 WeeklyRecordsKey::start(t)?, 583 WeeklyRecordsKey::end(t)?, 584 Arc::new({ 585 move |collection| WeeklyRollupKey::new(t, collection).to_db_bytes() 586 }), 587 )? 588 } 589 (OrderCollectionsBy::DidsEstimate, CursorBucket::Week(t)) => { 590 get_lookup_iter::<WeeklyDidsKey>( 591 snapshot.clone(), 592 WeeklyDidsKey::start(t)?, 593 WeeklyDidsKey::end(t)?, 594 Arc::new({ 595 move |collection| WeeklyRollupKey::new(t, collection).to_db_bytes() 596 }), 597 )? 598 } 599 (OrderCollectionsBy::RecordsCreated, CursorBucket::AllTime) => { 600 get_lookup_iter::<AllTimeRecordsKey>( 601 snapshot.clone(), 602 AllTimeRecordsKey::start()?, 603 AllTimeRecordsKey::end()?, 604 Arc::new(|collection| AllTimeRollupKey::new(collection).to_db_bytes()), 605 )? 606 } 607 (OrderCollectionsBy::DidsEstimate, CursorBucket::AllTime) => { 608 get_lookup_iter::<AllTimeDidsKey>( 609 snapshot.clone(), 610 AllTimeDidsKey::start()?, 611 AllTimeDidsKey::end()?, 612 Arc::new(|collection| AllTimeRollupKey::new(collection).to_db_bytes()), 613 )? 614 } 615 (OrderCollectionsBy::Lexi { .. }, _) => unreachable!(), 616 }; 617 iters.push(it); 618 } 619 620 // overfetch by taking a bit more than the limit 621 // merge by collection 622 // sort by requested order, take limit, discard all remaining 623 // 624 // this isn't guaranteed to be correct, but it will hopefully be close most of the time: 625 // - it's possible that some NSIDs might score low during some time-buckets, and miss being merged 626 // - overfetching hopefully helps a bit by catching nsids near the threshold more often, but. yeah. 627 // 628 // this thing is heavy, there's probably a better way 629 let mut ranked: HashMap<Nsid, CountsValue> = HashMap::with_capacity(limit * 2); 630 for iter in iters { 631 for pair in iter.take((limit as f64 * 1.3).ceil() as usize) { 632 let (nsid, get_counts) = pair?; 633 let counts = get_counts()?; 634 ranked.entry(nsid).or_default().merge(&counts); 635 } 636 } 637 let mut ranked: Vec<(Nsid, CountsValue)> = ranked.into_iter().collect(); 638 match order { 639 OrderCollectionsBy::RecordsCreated => ranked.sort_by_key(|(_, c)| c.counts().creates), 640 OrderCollectionsBy::DidsEstimate => ranked.sort_by_key(|(_, c)| c.dids().estimate()), 641 OrderCollectionsBy::Lexi { .. } => unreachable!(), 642 } 643 let counts = ranked 644 .into_iter() 645 .rev() 646 .take(limit) 647 .map(|(nsid, cv)| NsidCount::new(&nsid, &cv)) 648 .collect(); 649 Ok(counts) 650 } 651 652 fn get_collections( 653 &self, 654 limit: usize, 655 order: OrderCollectionsBy, 656 since: Option<HourTruncatedCursor>, 657 until: Option<HourTruncatedCursor>, 658 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> { 659 let snapshot = self.rollups.snapshot(); 660 let buckets = if let (None, None) = (since, until) { 661 vec![CursorBucket::AllTime] 662 } else { 663 let mut lower = self.get_earliest_hour(Some(&snapshot))?; 664 if let Some(specified) = since { 665 if specified > lower { 666 lower = specified; 667 } 668 } 669 let upper = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into()); 670 CursorBucket::buckets_spanning(lower, upper) 671 }; 672 match order { 673 OrderCollectionsBy::Lexi { cursor } => { 674 self.get_lexi_collections(snapshot, limit, cursor, buckets) 675 } 676 _ => Ok(( 677 self.get_ordered_collections(snapshot, limit, order, buckets)?, 678 None, 679 )), 680 } 681 } 682 683 fn get_lexi_prefix( 684 &self, 685 snapshot: Snapshot, 686 prefix: NsidPrefix, 687 limit: usize, 688 cursor: Option<Vec<u8>>, 689 buckets: Vec<CursorBucket>, 690 ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)> { 691 // let prefix_sub_with_null = prefix.as_str().to_string().to_db_bytes()?; 692 let prefix_sub = String::sub_prefix(&prefix.terminated())?; // with trailing dot to ensure full segment match 693 let cursor_child = cursor 694 .as_deref() 695 .map(|encoded_bytes| { 696 let decoded: String = db_complete(encoded_bytes)?; 697 // TODO: write some tests for cursors, there's probably bugs here 698 let as_sub_prefix_with_null = decoded.to_db_bytes()?; 699 Ok::<_, EncodingError>(as_sub_prefix_with_null) 700 }) 701 .transpose()?; 702 let mut iters: Vec<NsidCounter> = Vec::with_capacity(buckets.len()); 703 for bucket in &buckets { 704 let it: NsidCounter = match bucket { 705 CursorBucket::Hour(t) => { 706 let start = cursor_child 707 .as_ref() 708 .map(|child| HourlyRollupKey::after_nsid_prefix(*t, child)) 709 .unwrap_or_else(|| HourlyRollupKey::after_nsid_prefix(*t, &prefix_sub))?; 710 let end = HourlyRollupKey::nsid_prefix_end(*t, &prefix_sub)?; 711 get_lexi_iter::<HourlyRollupKey>(&snapshot, start, end)? 712 } 713 CursorBucket::Week(t) => { 714 let start = cursor_child 715 .as_ref() 716 .map(|child| WeeklyRollupKey::after_nsid_prefix(*t, child)) 717 .unwrap_or_else(|| WeeklyRollupKey::after_nsid_prefix(*t, &prefix_sub))?; 718 let end = WeeklyRollupKey::nsid_prefix_end(*t, &prefix_sub)?; 719 get_lexi_iter::<WeeklyRollupKey>(&snapshot, start, end)? 720 } 721 CursorBucket::AllTime => { 722 let start = cursor_child 723 .as_ref() 724 .map(|child| AllTimeRollupKey::after_nsid_prefix(child)) 725 .unwrap_or_else(|| AllTimeRollupKey::after_nsid_prefix(&prefix_sub))?; 726 let end = AllTimeRollupKey::nsid_prefix_end(&prefix_sub)?; 727 get_lexi_iter::<AllTimeRollupKey>(&snapshot, start, end)? 728 } 729 }; 730 iters.push(it); 731 } 732 733 // with apologies 734 let mut iters: Vec<_> = iters 735 .into_iter() 736 .map(|it| { 737 it.map(|bla| { 738 bla.map(|(nsid, v)| { 739 let Some(child) = Child::from_prefix(&nsid, &prefix) else { 740 panic!("failed from_prefix: {nsid:?} {prefix:?} (bad iter bounds?)"); 741 }; 742 (child, v) 743 }) 744 }) 745 .peekable() 746 }) 747 .collect(); 748 749 let mut items = Vec::new(); 750 let mut prefix_count = CountsValue::default(); 751 #[derive(Debug, Clone, PartialEq)] 752 enum Child { 753 FullNsid(Nsid), 754 ChildPrefix(String), 755 } 756 impl Child { 757 fn from_prefix(nsid: &Nsid, prefix: &NsidPrefix) -> Option<Self> { 758 if prefix.is_group_of(nsid) { 759 return Some(Child::FullNsid(nsid.clone())); 760 } 761 let suffix = nsid.as_str().strip_prefix(&format!("{}.", prefix.0))?; 762 let (segment, _) = suffix.split_once('.').unwrap(); 763 let child_prefix = format!("{}.{segment}", prefix.0); 764 Some(Child::ChildPrefix(child_prefix)) 765 } 766 fn is_before(&self, other: &Child) -> bool { 767 match (self, other) { 768 (Child::FullNsid(s), Child::ChildPrefix(o)) if s.as_str() == o => true, 769 (Child::ChildPrefix(s), Child::FullNsid(o)) if s == o.as_str() => false, 770 (Child::FullNsid(s), Child::FullNsid(o)) => s.as_str() < o.as_str(), 771 (Child::ChildPrefix(s), Child::ChildPrefix(o)) => s < o, 772 (Child::FullNsid(s), Child::ChildPrefix(o)) => s.to_string() < *o, 773 (Child::ChildPrefix(s), Child::FullNsid(o)) => *s < o.to_string(), 774 } 775 } 776 fn into_inner(self) -> String { 777 match self { 778 Child::FullNsid(s) => s.to_string(), 779 Child::ChildPrefix(s) => s, 780 } 781 } 782 } 783 let mut current_child: Option<Child> = None; 784 for _ in 0..limit { 785 // double-scan the iters for each element: this could be eliminated but we're starting simple. 786 // first scan: find the lowest nsid 787 // second scan: take + merge, and advance all iters with lowest nsid 788 let mut lowest: Option<Child> = None; 789 for iter in &mut iters { 790 if let Some(bla) = iter.peek_mut() { 791 let (child, _) = match bla { 792 Ok(v) => v, 793 Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?, 794 }; 795 796 lowest = match lowest { 797 Some(ref current) if current.is_before(child) => lowest, 798 _ => Some(child.clone()), 799 }; 800 } 801 } 802 current_child = lowest.clone(); 803 let Some(child) = lowest else { break }; 804 805 let mut merged = CountsValue::default(); 806 for iter in &mut iters { 807 // unwrap: potential fjall error was already checked & bailed over when peeking in the first loop 808 while let Some(Ok((_, get_counts))) = 809 iter.next_if(|v| v.as_ref().unwrap().0 == child) 810 { 811 let counts = get_counts()?; 812 prefix_count.merge(&counts); 813 merged.merge(&counts); 814 } 815 } 816 items.push(match child { 817 Child::FullNsid(nsid) => PrefixChild::Collection(NsidCount::new(&nsid, &merged)), 818 Child::ChildPrefix(prefix) => { 819 PrefixChild::Prefix(PrefixCount::new(&prefix, &merged)) 820 } 821 }); 822 } 823 824 // TODO: could serialize the prefix count (with sketch) into the cursor so that uniqs can actually count up? 825 // ....er the sketch is probably too big 826 // TODO: this is probably buggy on child-type boundaries bleh 827 let next_cursor = current_child 828 .map(|s| s.into_inner().to_db_bytes()) 829 .transpose()?; 830 831 Ok(((&prefix_count).into(), items, next_cursor)) 832 } 833 834 fn get_prefix( 835 &self, 836 prefix: NsidPrefix, 837 limit: usize, 838 order: OrderCollectionsBy, 839 since: Option<HourTruncatedCursor>, 840 until: Option<HourTruncatedCursor>, 841 ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)> { 842 let snapshot = self.rollups.snapshot(); 843 let buckets = if let (None, None) = (since, until) { 844 vec![CursorBucket::AllTime] 845 } else { 846 let mut lower = self.get_earliest_hour(Some(&snapshot))?; 847 if let Some(specified) = since { 848 if specified > lower { 849 lower = specified; 850 } 851 } 852 let upper = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into()); 853 CursorBucket::buckets_spanning(lower, upper) 854 }; 855 match order { 856 OrderCollectionsBy::Lexi { cursor } => { 857 self.get_lexi_prefix(snapshot, prefix, limit, cursor, buckets) 858 } 859 _ => todo!(), 860 } 861 } 862 863 /// - step: output series time step, in seconds 864 fn get_timeseries( 865 &self, 866 collections: Vec<Nsid>, 867 since: HourTruncatedCursor, 868 until: Option<HourTruncatedCursor>, 869 step: u64, 870 ) -> StorageResult<(Vec<HourTruncatedCursor>, CollectionSerieses)> { 871 if step > WEEK_IN_MICROS { 872 panic!("week-stepping is todo"); 873 } 874 let until = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into()); 875 let Ok(dt) = Cursor::from(until).duration_since(&Cursor::from(since)) else { 876 return Ok(( 877 // empty: until < since 878 vec![], 879 collections.into_iter().map(|c| (c, vec![])).collect(), 880 )); 881 }; 882 let n_hours = (dt.as_micros() as u64) / HOUR_IN_MICROS; 883 let mut counts_by_hour = Vec::with_capacity(n_hours as usize); 884 let snapshot = self.rollups.snapshot(); 885 for hour in (0..n_hours).map(|i| since.nth_next(i)) { 886 let mut counts = Vec::with_capacity(collections.len()); 887 for nsid in &collections { 888 let count = snapshot 889 .get(&HourlyRollupKey::new(hour, nsid).to_db_bytes()?)? 890 .as_deref() 891 .map(db_complete::<CountsValue>) 892 .transpose()? 893 .unwrap_or_default(); 894 counts.push(count); 895 } 896 counts_by_hour.push((hour, counts)); 897 } 898 899 let step_hours = step / (HOUR_IN_MICROS / 1_000_000); 900 let mut output_hours = Vec::with_capacity(step_hours as usize); 901 let mut output_series: CollectionSerieses = collections 902 .iter() 903 .map(|c| (c.clone(), Vec::with_capacity(step_hours as usize))) 904 .collect(); 905 906 for chunk in counts_by_hour.chunks(step_hours as usize) { 907 output_hours.push(chunk[0].0); // always guaranteed to have at least one element in a chunks chunk 908 for (i, collection) in collections.iter().enumerate() { 909 let mut c = CountsValue::default(); 910 for (_, counts) in chunk { 911 c.merge(&counts[i]); 912 } 913 output_series 914 .get_mut(collection) 915 .expect("output series is initialized with all collections") 916 .push(c); 917 } 918 } 919 920 Ok((output_hours, output_series)) 921 } 922 923 fn get_collection_counts( 924 &self, 925 collection: &Nsid, 926 since: HourTruncatedCursor, 927 until: Option<HourTruncatedCursor>, 928 ) -> StorageResult<JustCount> { 929 // grab snapshots in case rollups happen while we're working 930 let rollups = self.rollups.snapshot(); 931 932 let until = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into()); 933 let buckets = CursorBucket::buckets_spanning(since, until); 934 let mut total_counts = CountsValue::default(); 935 936 for bucket in buckets { 937 let key = match bucket { 938 CursorBucket::Hour(t) => HourlyRollupKey::new(t, collection).to_db_bytes()?, 939 CursorBucket::Week(t) => WeeklyRollupKey::new(t, collection).to_db_bytes()?, 940 CursorBucket::AllTime => unreachable!(), // TODO: fall back on this if the time span spans the whole dataset? 941 }; 942 let count = rollups 943 .get(&key)? 944 .as_deref() 945 .map(db_complete::<CountsValue>) 946 .transpose()? 947 .unwrap_or_default(); 948 total_counts.merge(&count); 949 } 950 951 Ok((&total_counts).into()) 952 } 953 954 fn get_records_by_collections( 955 &self, 956 collections: HashSet<Nsid>, 957 limit: usize, 958 expand_each_collection: bool, 959 ) -> StorageResult<Vec<UFOsRecord>> { 960 if collections.is_empty() { 961 return Ok(vec![]); 962 } 963 let mut record_iterators = Vec::new(); 964 for collection in collections { 965 let iter = RecordIterator::new(&self.feeds, self.records.clone(), &collection, limit)?; 966 record_iterators.push(iter.peekable()); 967 } 968 let mut merged = Vec::new(); 969 loop { 970 let mut latest: Option<(Cursor, usize)> = None; // ugh 971 for (i, iter) in record_iterators.iter_mut().enumerate() { 972 let Some(it) = iter.peek_mut() else { 973 continue; 974 }; 975 let it = match it { 976 Ok(v) => v, 977 Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?, 978 }; 979 let Some(rec) = it else { 980 if expand_each_collection { 981 continue; 982 } else { 983 break; 984 } 985 }; 986 if let Some((cursor, _)) = latest { 987 if rec.cursor > cursor { 988 latest = Some((rec.cursor, i)) 989 } 990 } else { 991 latest = Some((rec.cursor, i)); 992 } 993 } 994 let Some((_, idx)) = latest else { 995 break; 996 }; 997 // yeah yeah whateverrrrrrrrrrrrrrrr 998 merged.push(record_iterators[idx].next().unwrap().unwrap().unwrap()); 999 } 1000 Ok(merged) 1001 } 1002 1003 fn search_collections(&self, terms: Vec<String>) -> StorageResult<Vec<NsidCount>> { 1004 let start = AllTimeRollupKey::start()?; 1005 let end = AllTimeRollupKey::end()?; 1006 let mut matches = Vec::new(); 1007 let limit = 16; // TODO: param 1008 for kv in self.rollups.range((start, end)) { 1009 let (key_bytes, val_bytes) = kv?; 1010 let key = db_complete::<AllTimeRollupKey>(&key_bytes)?; 1011 let nsid = key.collection(); 1012 for term in &terms { 1013 if nsid.contains(term) { 1014 let counts = db_complete::<CountsValue>(&val_bytes)?; 1015 matches.push(NsidCount::new(nsid, &counts)); 1016 break; 1017 } 1018 } 1019 if matches.len() >= limit { 1020 break; 1021 } 1022 } 1023 // TODO: indicate incomplete results 1024 Ok(matches) 1025 } 1026} 1027 1028#[async_trait] 1029impl StoreReader for FjallReader { 1030 fn name(&self) -> String { 1031 "fjall storage v2".into() 1032 } 1033 fn update_metrics(&self) { 1034 gauge!("storage_fjall_l0_run_count", "partition" => "global") 1035 .set(self.global.tree.l0_run_count() as f64); 1036 gauge!("storage_fjall_l0_run_count", "partition" => "feeds") 1037 .set(self.feeds.tree.l0_run_count() as f64); 1038 gauge!("storage_fjall_l0_run_count", "partition" => "records") 1039 .set(self.records.tree.l0_run_count() as f64); 1040 gauge!("storage_fjall_l0_run_count", "partition" => "rollups") 1041 .set(self.rollups.tree.l0_run_count() as f64); 1042 gauge!("storage_fjall_l0_run_count", "partition" => "queues") 1043 .set(self.queues.tree.l0_run_count() as f64); 1044 gauge!("storage_fjall_keyspace_disk_space").set(self.keyspace.disk_space() as f64); 1045 gauge!("storage_fjall_journal_count").set(self.keyspace.journal_count() as f64); 1046 gauge!("storage_fjall_keyspace_sequence").set(self.keyspace.instant() as f64); 1047 } 1048 async fn get_storage_stats(&self) -> StorageResult<serde_json::Value> { 1049 let s = self.clone(); 1050 tokio::task::spawn_blocking(move || FjallReader::get_storage_stats(&s)).await? 1051 } 1052 async fn get_consumer_info(&self) -> StorageResult<ConsumerInfo> { 1053 let s = self.clone(); 1054 tokio::task::spawn_blocking(move || FjallReader::get_consumer_info(&s)).await? 1055 } 1056 async fn get_collections( 1057 &self, 1058 limit: usize, 1059 order: OrderCollectionsBy, 1060 since: Option<HourTruncatedCursor>, 1061 until: Option<HourTruncatedCursor>, 1062 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> { 1063 let s = self.clone(); 1064 tokio::task::spawn_blocking(move || { 1065 FjallReader::get_collections(&s, limit, order, since, until) 1066 }) 1067 .await? 1068 } 1069 async fn get_prefix( 1070 &self, 1071 prefix: NsidPrefix, 1072 limit: usize, 1073 order: OrderCollectionsBy, 1074 since: Option<HourTruncatedCursor>, 1075 until: Option<HourTruncatedCursor>, 1076 ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)> { 1077 let s = self.clone(); 1078 tokio::task::spawn_blocking(move || { 1079 FjallReader::get_prefix(&s, prefix, limit, order, since, until) 1080 }) 1081 .await? 1082 } 1083 async fn get_timeseries( 1084 &self, 1085 collections: Vec<Nsid>, 1086 since: HourTruncatedCursor, 1087 until: Option<HourTruncatedCursor>, 1088 step: u64, 1089 ) -> StorageResult<(Vec<HourTruncatedCursor>, CollectionSerieses)> { 1090 let s = self.clone(); 1091 tokio::task::spawn_blocking(move || { 1092 FjallReader::get_timeseries(&s, collections, since, until, step) 1093 }) 1094 .await? 1095 } 1096 async fn get_collection_counts( 1097 &self, 1098 collection: &Nsid, 1099 since: HourTruncatedCursor, 1100 until: Option<HourTruncatedCursor>, 1101 ) -> StorageResult<JustCount> { 1102 let s = self.clone(); 1103 let collection = collection.clone(); 1104 tokio::task::spawn_blocking(move || { 1105 FjallReader::get_collection_counts(&s, &collection, since, until) 1106 }) 1107 .await? 1108 } 1109 async fn get_records_by_collections( 1110 &self, 1111 collections: HashSet<Nsid>, 1112 limit: usize, 1113 expand_each_collection: bool, 1114 ) -> StorageResult<Vec<UFOsRecord>> { 1115 let s = self.clone(); 1116 tokio::task::spawn_blocking(move || { 1117 FjallReader::get_records_by_collections(&s, collections, limit, expand_each_collection) 1118 }) 1119 .await? 1120 } 1121 async fn search_collections(&self, terms: Vec<String>) -> StorageResult<Vec<NsidCount>> { 1122 let s = self.clone(); 1123 tokio::task::spawn_blocking(move || FjallReader::search_collections(&s, terms)).await? 1124 } 1125} 1126 1127#[derive(Clone)] 1128pub struct FjallWriter { 1129 bg_taken: Arc<AtomicBool>, 1130 keyspace: Keyspace, 1131 global: PartitionHandle, 1132 feeds: PartitionHandle, 1133 records: PartitionHandle, 1134 rollups: PartitionHandle, 1135 queues: PartitionHandle, 1136} 1137 1138impl FjallWriter { 1139 fn describe_metrics(&self) { 1140 describe_histogram!( 1141 "storage_insert_batch_db_batch_items", 1142 Unit::Count, 1143 "how many items are in the fjall batch for batched inserts" 1144 ); 1145 describe_histogram!( 1146 "storage_insert_batch_db_batch_size", 1147 Unit::Count, 1148 "in-memory size of the fjall batch for batched inserts" 1149 ); 1150 describe_histogram!( 1151 "storage_rollup_counts_db_batch_items", 1152 Unit::Count, 1153 "how many items are in the fjall batch for a timlies rollup" 1154 ); 1155 describe_histogram!( 1156 "storage_rollup_counts_db_batch_size", 1157 Unit::Count, 1158 "in-memory size of the fjall batch for a timelies rollup" 1159 ); 1160 describe_counter!( 1161 "storage_delete_account_partial_commits", 1162 Unit::Count, 1163 "fjall checkpoint commits for cleaning up accounts with too many records" 1164 ); 1165 describe_counter!( 1166 "storage_delete_account_completions", 1167 Unit::Count, 1168 "total count of account deletes handled" 1169 ); 1170 describe_counter!( 1171 "storage_delete_account_records_deleted", 1172 Unit::Count, 1173 "total records deleted when handling account deletes" 1174 ); 1175 describe_histogram!( 1176 "storage_trim_dirty_nsids", 1177 Unit::Count, 1178 "number of NSIDs trimmed" 1179 ); 1180 describe_histogram!( 1181 "storage_trim_duration", 1182 Unit::Microseconds, 1183 "how long it took to trim the dirty NSIDs" 1184 ); 1185 describe_counter!( 1186 "storage_trim_removed", 1187 Unit::Count, 1188 "how many records were removed during trim" 1189 ); 1190 } 1191 fn rollup_delete_account( 1192 &mut self, 1193 cursor: Cursor, 1194 key_bytes: &[u8], 1195 val_bytes: &[u8], 1196 ) -> StorageResult<usize> { 1197 let did = db_complete::<DeleteAccountQueueVal>(val_bytes)?; 1198 self.delete_account(&did)?; 1199 let mut batch = self.keyspace.batch(); 1200 batch.remove(&self.queues, key_bytes); 1201 insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, cursor)?; 1202 batch.commit()?; 1203 Ok(1) 1204 } 1205 1206 fn rollup_live_counts( 1207 &mut self, 1208 timelies: impl Iterator<Item = Result<(fjall::Slice, fjall::Slice), fjall::Error>>, 1209 cursor_exclusive_limit: Option<Cursor>, 1210 rollup_limit: usize, 1211 ) -> StorageResult<(usize, HashSet<Nsid>)> { 1212 // current strategy is to buffer counts in mem before writing the rollups 1213 // we *could* read+write every single batch to rollup.. but their merge is associative so 1214 // ...so save the db some work up front? is this worth it? who knows... 1215 1216 let mut dirty_nsids = HashSet::new(); 1217 1218 #[derive(Eq, Hash, PartialEq)] 1219 enum Rollup { 1220 Hourly(HourTruncatedCursor), 1221 Weekly(WeekTruncatedCursor), 1222 AllTime, 1223 } 1224 1225 let mut batch = self.keyspace.batch(); 1226 let mut cursors_advanced = 0; 1227 let mut last_cursor = Cursor::from_start(); 1228 let mut counts_by_rollup: HashMap<(Nsid, Rollup), CountsValue> = HashMap::new(); 1229 1230 for (i, kv) in timelies.enumerate() { 1231 if i >= rollup_limit { 1232 break; 1233 } 1234 1235 let (key_bytes, val_bytes) = kv?; 1236 let key = db_complete::<LiveCountsKey>(&key_bytes)?; 1237 1238 if cursor_exclusive_limit 1239 .map(|limit| key.cursor() > limit) 1240 .unwrap_or(false) 1241 { 1242 break; 1243 } 1244 1245 dirty_nsids.insert(key.collection().clone()); 1246 1247 batch.remove(&self.rollups, key_bytes); 1248 let val = db_complete::<CountsValue>(&val_bytes)?; 1249 counts_by_rollup 1250 .entry(( 1251 key.collection().clone(), 1252 Rollup::Hourly(key.cursor().into()), 1253 )) 1254 .or_default() 1255 .merge(&val); 1256 counts_by_rollup 1257 .entry(( 1258 key.collection().clone(), 1259 Rollup::Weekly(key.cursor().into()), 1260 )) 1261 .or_default() 1262 .merge(&val); 1263 counts_by_rollup 1264 .entry((key.collection().clone(), Rollup::AllTime)) 1265 .or_default() 1266 .merge(&val); 1267 1268 cursors_advanced += 1; 1269 last_cursor = key.cursor(); 1270 } 1271 1272 // go through each new rollup thing and merge it with whatever might already be in the db 1273 for ((nsid, rollup), counts) in counts_by_rollup { 1274 let rollup_key_bytes = match rollup { 1275 Rollup::Hourly(hourly_cursor) => { 1276 HourlyRollupKey::new(hourly_cursor, &nsid).to_db_bytes()? 1277 } 1278 Rollup::Weekly(weekly_cursor) => { 1279 WeeklyRollupKey::new(weekly_cursor, &nsid).to_db_bytes()? 1280 } 1281 Rollup::AllTime => AllTimeRollupKey::new(&nsid).to_db_bytes()?, 1282 }; 1283 let mut rolled: CountsValue = self 1284 .rollups 1285 .get(&rollup_key_bytes)? 1286 .as_deref() 1287 .map(db_complete::<CountsValue>) 1288 .transpose()? 1289 .unwrap_or_default(); 1290 1291 // now that we have values, we can know the exising ranks 1292 let before_creates_count = rolled.counts().creates; 1293 let before_dids_estimate = rolled.dids().estimate() as u64; 1294 1295 // update the rollup 1296 rolled.merge(&counts); 1297 1298 // new ranks 1299 let new_creates_count = rolled.counts().creates; 1300 let new_dids_estimate = rolled.dids().estimate() as u64; 1301 1302 // update create-ranked secondary index if rank changed 1303 if new_creates_count != before_creates_count { 1304 let (old_k, new_k) = match rollup { 1305 Rollup::Hourly(cursor) => ( 1306 HourlyRecordsKey::new(cursor, before_creates_count.into(), &nsid) 1307 .to_db_bytes()?, 1308 HourlyRecordsKey::new(cursor, new_creates_count.into(), &nsid) 1309 .to_db_bytes()?, 1310 ), 1311 Rollup::Weekly(cursor) => ( 1312 WeeklyRecordsKey::new(cursor, before_creates_count.into(), &nsid) 1313 .to_db_bytes()?, 1314 WeeklyRecordsKey::new(cursor, new_creates_count.into(), &nsid) 1315 .to_db_bytes()?, 1316 ), 1317 Rollup::AllTime => ( 1318 AllTimeRecordsKey::new(before_creates_count.into(), &nsid).to_db_bytes()?, 1319 AllTimeRecordsKey::new(new_creates_count.into(), &nsid).to_db_bytes()?, 1320 ), 1321 }; 1322 // remove_weak is allowed here because the secondary ranking index only ever inserts once at a key 1323 batch.remove_weak(&self.rollups, &old_k); 1324 batch.insert(&self.rollups, &new_k, ""); 1325 } 1326 1327 // update dids-ranked secondary index if rank changed 1328 if new_dids_estimate != before_dids_estimate { 1329 let (old_k, new_k) = match rollup { 1330 Rollup::Hourly(cursor) => ( 1331 HourlyDidsKey::new(cursor, before_dids_estimate.into(), &nsid) 1332 .to_db_bytes()?, 1333 HourlyDidsKey::new(cursor, new_dids_estimate.into(), &nsid) 1334 .to_db_bytes()?, 1335 ), 1336 Rollup::Weekly(cursor) => ( 1337 WeeklyDidsKey::new(cursor, before_dids_estimate.into(), &nsid) 1338 .to_db_bytes()?, 1339 WeeklyDidsKey::new(cursor, new_dids_estimate.into(), &nsid) 1340 .to_db_bytes()?, 1341 ), 1342 Rollup::AllTime => ( 1343 AllTimeDidsKey::new(before_dids_estimate.into(), &nsid).to_db_bytes()?, 1344 AllTimeDidsKey::new(new_dids_estimate.into(), &nsid).to_db_bytes()?, 1345 ), 1346 }; 1347 // remove_weak is allowed here because the secondary ranking index only ever inserts once at a key 1348 batch.remove_weak(&self.rollups, &old_k); 1349 batch.insert(&self.rollups, &new_k, ""); 1350 } 1351 1352 // replace the main counts rollup 1353 batch.insert(&self.rollups, &rollup_key_bytes, &rolled.to_db_bytes()?); 1354 } 1355 1356 insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, last_cursor)?; 1357 1358 histogram!("storage_rollup_counts_db_batch_items").record(batch.len() as f64); 1359 histogram!("storage_rollup_counts_db_batch_size") 1360 .record(std::mem::size_of_val(&batch) as f64); 1361 batch.commit()?; 1362 Ok((cursors_advanced, dirty_nsids)) 1363 } 1364} 1365 1366impl StoreWriter<FjallBackground> for FjallWriter { 1367 fn background_tasks(&mut self, reroll: bool) -> StorageResult<FjallBackground> { 1368 if self.bg_taken.swap(true, Ordering::SeqCst) { 1369 return Err(StorageError::BackgroundAlreadyStarted); 1370 } 1371 if reroll { 1372 log::info!("reroll: resetting rollup cursor..."); 1373 insert_static_neu::<NewRollupCursorKey>(&self.global, Cursor::from_start())?; 1374 log::info!("reroll: clearing trim cursors..."); 1375 let mut batch = self.keyspace.batch(); 1376 for kv in self 1377 .global 1378 .prefix(TrimCollectionCursorKey::from_prefix_to_db_bytes( 1379 &Default::default(), 1380 )?) 1381 { 1382 let (k, _) = kv?; 1383 batch.remove(&self.global, k); 1384 } 1385 let n = batch.len(); 1386 batch.commit()?; 1387 log::info!("reroll: cleared {n} trim cursors."); 1388 } 1389 Ok(FjallBackground(self.clone())) 1390 } 1391 1392 fn insert_batch<const LIMIT: usize>( 1393 &mut self, 1394 event_batch: EventBatch<LIMIT>, 1395 ) -> StorageResult<()> { 1396 if event_batch.is_empty() { 1397 return Ok(()); 1398 } 1399 1400 let mut batch = self.keyspace.batch(); 1401 1402 // would be nice not to have to iterate everything at once here 1403 let latest = event_batch.latest_cursor().unwrap(); 1404 1405 for (nsid, commits) in event_batch.commits_by_nsid { 1406 for commit in commits.commits { 1407 let location_key: RecordLocationKey = (&commit, &nsid).into(); 1408 1409 match commit.action { 1410 CommitAction::Cut => { 1411 batch.remove(&self.records, &location_key.to_db_bytes()?); 1412 } 1413 CommitAction::Put(put_action) => { 1414 let feed_key = NsidRecordFeedKey::from_pair(nsid.clone(), commit.cursor); 1415 let feed_val: NsidRecordFeedVal = 1416 (&commit.did, &commit.rkey, commit.rev.as_str()).into(); 1417 batch.insert( 1418 &self.feeds, 1419 feed_key.to_db_bytes()?, 1420 feed_val.to_db_bytes()?, 1421 ); 1422 1423 let location_val: RecordLocationVal = 1424 (commit.cursor, commit.rev.as_str(), put_action).into(); 1425 batch.insert( 1426 &self.records, 1427 &location_key.to_db_bytes()?, 1428 &location_val.to_db_bytes()?, 1429 ); 1430 } 1431 } 1432 } 1433 let live_counts_key: LiveCountsKey = (latest, &nsid).into(); 1434 let counts_value = CountsValue::new( 1435 CommitCounts { 1436 creates: commits.creates as u64, 1437 updates: commits.updates as u64, 1438 deletes: commits.deletes as u64, 1439 }, 1440 commits.dids_estimate, 1441 ); 1442 batch.insert( 1443 &self.rollups, 1444 &live_counts_key.to_db_bytes()?, 1445 &counts_value.to_db_bytes()?, 1446 ); 1447 } 1448 1449 for remove in event_batch.account_removes { 1450 let queue_key = DeleteAccountQueueKey::new(remove.cursor); 1451 let queue_val: DeleteAccountQueueVal = remove.did; 1452 batch.insert( 1453 &self.queues, 1454 &queue_key.to_db_bytes()?, 1455 &queue_val.to_db_bytes()?, 1456 ); 1457 } 1458 1459 batch.insert( 1460 &self.global, 1461 DbStaticStr::<JetstreamCursorKey>::default().to_db_bytes()?, 1462 latest.to_db_bytes()?, 1463 ); 1464 1465 histogram!("storage_insert_batch_db_batch_items").record(batch.len() as f64); 1466 histogram!("storage_insert_batch_db_batch_size") 1467 .record(std::mem::size_of_val(&batch) as f64); 1468 batch.commit()?; 1469 Ok(()) 1470 } 1471 1472 fn step_rollup(&mut self) -> StorageResult<(usize, HashSet<Nsid>)> { 1473 let mut dirty_nsids = HashSet::new(); 1474 1475 let rollup_cursor = 1476 get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?.ok_or( 1477 StorageError::BadStateError("Could not find current rollup cursor".to_string()), 1478 )?; 1479 1480 // timelies 1481 let live_counts_range = LiveCountsKey::range_from_cursor(rollup_cursor)?; 1482 let mut timely_iter = self.rollups.range(live_counts_range).peekable(); 1483 1484 let timely_next = timely_iter 1485 .peek_mut() 1486 .map(|kv| -> StorageResult<LiveCountsKey> { 1487 match kv { 1488 Err(e) => Err(std::mem::replace(e, fjall::Error::Poisoned))?, 1489 Ok((key_bytes, _)) => { 1490 let key = db_complete::<LiveCountsKey>(key_bytes)?; 1491 Ok(key) 1492 } 1493 } 1494 }) 1495 .transpose()?; 1496 1497 // delete accounts 1498 let delete_accounts_range = 1499 DeleteAccountQueueKey::new(rollup_cursor).range_to_prefix_end()?; 1500 1501 let next_delete = self 1502 .queues 1503 .range(delete_accounts_range) 1504 .next() 1505 .transpose()? 1506 .map(|(key_bytes, val_bytes)| { 1507 db_complete::<DeleteAccountQueueKey>(&key_bytes) 1508 .map(|k| (k.suffix, key_bytes, val_bytes)) 1509 }) 1510 .transpose()?; 1511 1512 let cursors_stepped = match (timely_next, next_delete) { 1513 (Some(timely), Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => { 1514 if timely.cursor() < delete_cursor { 1515 let (n, dirty) = self.rollup_live_counts( 1516 timely_iter, 1517 Some(delete_cursor), 1518 MAX_BATCHED_ROLLUP_COUNTS, 1519 )?; 1520 dirty_nsids.extend(dirty); 1521 n 1522 } else { 1523 self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)? 1524 } 1525 } 1526 (Some(_), None) => { 1527 let (n, dirty) = 1528 self.rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS)?; 1529 dirty_nsids.extend(dirty); 1530 n 1531 } 1532 (None, Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => { 1533 self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)? 1534 } 1535 (None, None) => 0, 1536 }; 1537 1538 Ok((cursors_stepped, dirty_nsids)) 1539 } 1540 1541 fn trim_collection( 1542 &mut self, 1543 collection: &Nsid, 1544 limit: usize, 1545 full_scan: bool, 1546 ) -> StorageResult<(usize, usize, bool)> { 1547 let mut dangling_feed_keys_cleaned = 0; 1548 let mut records_deleted = 0; 1549 1550 let live_range = if full_scan { 1551 let start = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?; 1552 let end = NsidRecordFeedKey::prefix_range_end(collection)?; 1553 start..end 1554 } else { 1555 let feed_trim_cursor_key = 1556 TrimCollectionCursorKey::new(collection.clone()).to_db_bytes()?; 1557 let trim_cursor = self 1558 .global 1559 .get(&feed_trim_cursor_key)? 1560 .map(|value_bytes| db_complete(&value_bytes)) 1561 .transpose()? 1562 .unwrap_or(Cursor::from_start()); 1563 NsidRecordFeedKey::from_pair(collection.clone(), trim_cursor).range_to_prefix_end()? 1564 }; 1565 1566 let mut live_records_found = 0; 1567 let mut candidate_new_feed_lower_cursor = None; 1568 let ended_early = false; 1569 let mut current_cursor: Option<Cursor> = None; 1570 for (i, kv) in self.feeds.range(live_range).rev().enumerate() { 1571 if i > 0 && i % 500_000 == 0 { 1572 log::info!( 1573 "trim: at {i} for {:?} (now at {})", 1574 collection.to_string(), 1575 current_cursor 1576 .map(|c| c 1577 .elapsed() 1578 .map(nice_duration) 1579 .unwrap_or("[not past]".into())) 1580 .unwrap_or("??".into()), 1581 ); 1582 } 1583 let (key_bytes, val_bytes) = kv?; 1584 let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?; 1585 let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?; 1586 let location_key: RecordLocationKey = (&feed_key, &feed_val).into(); 1587 let location_key_bytes = location_key.to_db_bytes()?; 1588 1589 let Some(location_val_bytes) = self.records.get(&location_key_bytes)? else { 1590 // record was deleted (hopefully) 1591 self.feeds.remove(&*key_bytes)?; 1592 dangling_feed_keys_cleaned += 1; 1593 continue; 1594 }; 1595 1596 let (meta, _) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?; 1597 current_cursor = Some(meta.cursor()); 1598 1599 if meta.cursor() != feed_key.cursor() { 1600 // older/different version 1601 self.feeds.remove(&*key_bytes)?; 1602 dangling_feed_keys_cleaned += 1; 1603 continue; 1604 } 1605 if meta.rev != feed_val.rev() { 1606 // weird... 1607 log::warn!("record lookup: cursor match but rev did not...? removing."); 1608 self.records.remove(&location_key_bytes)?; 1609 self.feeds.remove(&*key_bytes)?; 1610 dangling_feed_keys_cleaned += 1; 1611 continue; 1612 } 1613 1614 live_records_found += 1; 1615 if live_records_found <= limit { 1616 continue; 1617 } 1618 if candidate_new_feed_lower_cursor.is_none() { 1619 candidate_new_feed_lower_cursor = Some(feed_key.cursor()); 1620 } 1621 1622 self.feeds.remove(&location_key_bytes)?; 1623 self.feeds.remove(key_bytes)?; 1624 records_deleted += 1; 1625 } 1626 1627 if !ended_early { 1628 if let Some(new_cursor) = candidate_new_feed_lower_cursor { 1629 self.global.insert( 1630 &TrimCollectionCursorKey::new(collection.clone()).to_db_bytes()?, 1631 &new_cursor.to_db_bytes()?, 1632 )?; 1633 } 1634 } 1635 1636 log::trace!("trim_collection ({collection:?}) removed {dangling_feed_keys_cleaned} dangling feed entries and {records_deleted} records (ended early? {ended_early})"); 1637 Ok((dangling_feed_keys_cleaned, records_deleted, ended_early)) 1638 } 1639 1640 fn delete_account(&mut self, did: &Did) -> Result<usize, StorageError> { 1641 let mut records_deleted = 0; 1642 let mut batch = self.keyspace.batch(); 1643 let prefix = RecordLocationKey::from_prefix_to_db_bytes(did)?; 1644 for kv in self.records.prefix(prefix) { 1645 let (key_bytes, _) = kv?; 1646 batch.remove(&self.records, key_bytes); 1647 records_deleted += 1; 1648 if batch.len() >= MAX_BATCHED_ACCOUNT_DELETE_RECORDS { 1649 counter!("storage_delete_account_partial_commits").increment(1); 1650 batch.commit()?; 1651 batch = self.keyspace.batch(); 1652 } 1653 } 1654 counter!("storage_delete_account_completions").increment(1); 1655 counter!("storage_delete_account_records_deleted").increment(records_deleted as u64); 1656 batch.commit()?; 1657 Ok(records_deleted) 1658 } 1659} 1660 1661pub struct FjallBackground(FjallWriter); 1662 1663#[async_trait] 1664impl StoreBackground for FjallBackground { 1665 async fn run(mut self, backfill: bool) -> StorageResult<()> { 1666 let mut dirty_nsids = HashSet::new(); 1667 1668 // backfill condition here is iffy -- longer is good when doing the main ingest and then collection trims 1669 // shorter once those are done helps things catch up 1670 // the best setting for non-backfill is non-obvious.. it can be pretty slow and still be fine 1671 let mut rollup = 1672 tokio::time::interval(Duration::from_micros(if backfill { 100 } else { 32_000 })); 1673 rollup.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 1674 1675 // backfill condition again iffy. collection trims should probably happen in their own phase. 1676 let mut trim = tokio::time::interval(Duration::from_secs(if backfill { 18 } else { 9 })); 1677 trim.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 1678 1679 loop { 1680 tokio::select! { 1681 _ = rollup.tick() => { 1682 let mut db = self.0.clone(); 1683 let (n, dirty) = tokio::task::spawn_blocking(move || db.step_rollup()).await??; 1684 if n == 0 { 1685 rollup.reset_after(Duration::from_millis(1_200)); // we're caught up, take a break 1686 } 1687 dirty_nsids.extend(dirty); 1688 log::trace!("rolled up {n} items ({} collections now dirty)", dirty_nsids.len()); 1689 }, 1690 _ = trim.tick() => { 1691 let n = dirty_nsids.len(); 1692 log::trace!("trimming {n} nsids: {dirty_nsids:?}"); 1693 let t0 = Instant::now(); 1694 let (mut total_danglers, mut total_deleted) = (0, 0); 1695 let mut completed = HashSet::new(); 1696 for collection in &dirty_nsids { 1697 let mut db = self.0.clone(); 1698 let c = collection.clone(); 1699 let (danglers, deleted, ended_early) = tokio::task::spawn_blocking(move || db.trim_collection(&c, 512, false)).await??; 1700 total_danglers += danglers; 1701 total_deleted += deleted; 1702 if !ended_early { 1703 completed.insert(collection.clone()); 1704 } 1705 if total_deleted > 10_000_000 { 1706 log::info!("trim stopped early, more than 10M records already deleted."); 1707 break; 1708 } 1709 } 1710 let dt = t0.elapsed(); 1711 log::trace!("finished trimming {n} nsids in {dt:?}: {total_danglers} dangling and {total_deleted} total removed."); 1712 histogram!("storage_trim_dirty_nsids").record(completed.len() as f64); 1713 histogram!("storage_trim_duration").record(dt.as_micros() as f64); 1714 counter!("storage_trim_removed", "dangling" => "true").increment(total_danglers as u64); 1715 if total_deleted >= total_danglers { 1716 counter!("storage_trim_removed", "dangling" => "false").increment((total_deleted - total_danglers) as u64); 1717 } else { 1718 // TODO: probably think through what's happening here 1719 log::warn!("weird trim case: more danglers than deleted? metric will be missing for dangling=false. deleted={total_deleted} danglers={total_danglers}"); 1720 } 1721 for c in completed { 1722 dirty_nsids.remove(&c); 1723 } 1724 }, 1725 }; 1726 } 1727 } 1728} 1729 1730/// Get a value from a fixed key 1731fn get_static_neu<K: StaticStr, V: DbBytes>(global: &PartitionHandle) -> StorageResult<Option<V>> { 1732 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1733 let value = global 1734 .get(&key_bytes)? 1735 .map(|value_bytes| db_complete(&value_bytes)) 1736 .transpose()?; 1737 Ok(value) 1738} 1739 1740/// Get a value from a fixed key 1741fn get_snapshot_static_neu<K: StaticStr, V: DbBytes>( 1742 global: &fjall::Snapshot, 1743) -> StorageResult<Option<V>> { 1744 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1745 let value = global 1746 .get(&key_bytes)? 1747 .map(|value_bytes| db_complete(&value_bytes)) 1748 .transpose()?; 1749 Ok(value) 1750} 1751 1752/// Set a value to a fixed key 1753fn insert_static_neu<K: StaticStr>( 1754 global: &PartitionHandle, 1755 value: impl DbBytes, 1756) -> StorageResult<()> { 1757 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1758 let value_bytes = value.to_db_bytes()?; 1759 global.insert(&key_bytes, &value_bytes)?; 1760 Ok(()) 1761} 1762 1763/// Set a value to a fixed key, erroring if the value already exists 1764/// 1765/// Intended for single-threaded init: not safe under concurrency, since there 1766/// is no transaction between checking if the already exists and writing it. 1767fn init_static_neu<K: StaticStr>( 1768 global: &PartitionHandle, 1769 value: impl DbBytes, 1770) -> StorageResult<()> { 1771 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1772 if global.get(&key_bytes)?.is_some() { 1773 return Err(StorageError::InitError(format!( 1774 "init failed: value for key {key_bytes:?} already exists" 1775 ))); 1776 } 1777 let value_bytes = value.to_db_bytes()?; 1778 global.insert(&key_bytes, &value_bytes)?; 1779 Ok(()) 1780} 1781 1782/// Set a value to a fixed key 1783fn insert_batch_static_neu<K: StaticStr>( 1784 batch: &mut FjallBatch, 1785 global: &PartitionHandle, 1786 value: impl DbBytes, 1787) -> StorageResult<()> { 1788 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1789 let value_bytes = value.to_db_bytes()?; 1790 batch.insert(global, &key_bytes, &value_bytes); 1791 Ok(()) 1792} 1793 1794#[derive(Debug, serde::Serialize, schemars::JsonSchema)] 1795pub struct StorageInfo { 1796 pub keyspace_disk_space: u64, 1797 pub keyspace_journal_count: usize, 1798 pub keyspace_sequence: u64, 1799 pub global_approximate_len: usize, 1800} 1801 1802////////// temp stuff to remove: 1803 1804#[cfg(test)] 1805mod tests { 1806 use super::*; 1807 use crate::{DeleteAccount, RecordKey, UFOsCommit}; 1808 use jetstream::events::{CommitEvent, CommitOp}; 1809 use jetstream::exports::Cid; 1810 use serde_json::value::RawValue; 1811 1812 fn fjall_db() -> (FjallReader, FjallWriter) { 1813 let (read, write, _, _) = FjallStorage::init( 1814 tempfile::tempdir().unwrap(), 1815 "offline test (no real jetstream endpoint)".to_string(), 1816 false, 1817 FjallConfig { temp: true }, 1818 ) 1819 .unwrap(); 1820 (read, write) 1821 } 1822 1823 const TEST_BATCH_LIMIT: usize = 16; 1824 fn beginning() -> HourTruncatedCursor { 1825 Cursor::from_start().into() 1826 } 1827 1828 #[derive(Debug, Default)] 1829 struct TestBatch { 1830 pub batch: EventBatch<TEST_BATCH_LIMIT>, 1831 } 1832 1833 impl TestBatch { 1834 #[allow(clippy::too_many_arguments)] 1835 pub fn create( 1836 &mut self, 1837 did: &str, 1838 collection: &str, 1839 rkey: &str, 1840 record: &str, 1841 rev: Option<&str>, 1842 cid: Option<Cid>, 1843 cursor: u64, 1844 ) -> Nsid { 1845 let did = Did::new(did.to_string()).unwrap(); 1846 let collection = Nsid::new(collection.to_string()).unwrap(); 1847 let record = RawValue::from_string(record.to_string()).unwrap(); 1848 let cid = cid.unwrap_or( 1849 "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy" 1850 .parse() 1851 .unwrap(), 1852 ); 1853 1854 let event = CommitEvent { 1855 collection, 1856 rkey: RecordKey::new(rkey.to_string()).unwrap(), 1857 rev: rev.unwrap_or("asdf").to_string(), 1858 operation: CommitOp::Create, 1859 record: Some(record), 1860 cid: Some(cid), 1861 }; 1862 1863 let (commit, collection) = 1864 UFOsCommit::from_commit_info(event, did.clone(), Cursor::from_raw_u64(cursor)) 1865 .unwrap(); 1866 1867 self.batch 1868 .commits_by_nsid 1869 .entry(collection.clone()) 1870 .or_default() 1871 .truncating_insert(commit, &[0u8; 16]) 1872 .unwrap(); 1873 1874 collection 1875 } 1876 #[allow(clippy::too_many_arguments)] 1877 pub fn update( 1878 &mut self, 1879 did: &str, 1880 collection: &str, 1881 rkey: &str, 1882 record: &str, 1883 rev: Option<&str>, 1884 cid: Option<Cid>, 1885 cursor: u64, 1886 ) -> Nsid { 1887 let did = Did::new(did.to_string()).unwrap(); 1888 let collection = Nsid::new(collection.to_string()).unwrap(); 1889 let record = RawValue::from_string(record.to_string()).unwrap(); 1890 let cid = cid.unwrap_or( 1891 "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy" 1892 .parse() 1893 .unwrap(), 1894 ); 1895 1896 let event = CommitEvent { 1897 collection, 1898 rkey: RecordKey::new(rkey.to_string()).unwrap(), 1899 rev: rev.unwrap_or("asdf").to_string(), 1900 operation: CommitOp::Update, 1901 record: Some(record), 1902 cid: Some(cid), 1903 }; 1904 1905 let (commit, collection) = 1906 UFOsCommit::from_commit_info(event, did.clone(), Cursor::from_raw_u64(cursor)) 1907 .unwrap(); 1908 1909 self.batch 1910 .commits_by_nsid 1911 .entry(collection.clone()) 1912 .or_default() 1913 .truncating_insert(commit, &[0u8; 16]) 1914 .unwrap(); 1915 1916 collection 1917 } 1918 #[allow(clippy::too_many_arguments)] 1919 pub fn delete( 1920 &mut self, 1921 did: &str, 1922 collection: &str, 1923 rkey: &str, 1924 rev: Option<&str>, 1925 cursor: u64, 1926 ) -> Nsid { 1927 let did = Did::new(did.to_string()).unwrap(); 1928 let collection = Nsid::new(collection.to_string()).unwrap(); 1929 let event = CommitEvent { 1930 collection, 1931 rkey: RecordKey::new(rkey.to_string()).unwrap(), 1932 rev: rev.unwrap_or("asdf").to_string(), 1933 operation: CommitOp::Delete, 1934 record: None, 1935 cid: None, 1936 }; 1937 1938 let (commit, collection) = 1939 UFOsCommit::from_commit_info(event, did, Cursor::from_raw_u64(cursor)).unwrap(); 1940 1941 self.batch 1942 .commits_by_nsid 1943 .entry(collection.clone()) 1944 .or_default() 1945 .truncating_insert(commit, &[0u8; 16]) 1946 .unwrap(); 1947 1948 collection 1949 } 1950 pub fn delete_account(&mut self, did: &str, cursor: u64) -> Did { 1951 let did = Did::new(did.to_string()).unwrap(); 1952 self.batch.account_removes.push(DeleteAccount { 1953 did: did.clone(), 1954 cursor: Cursor::from_raw_u64(cursor), 1955 }); 1956 did 1957 } 1958 } 1959 1960 #[test] 1961 fn test_hello() -> anyhow::Result<()> { 1962 let (read, mut write) = fjall_db(); 1963 write.insert_batch::<TEST_BATCH_LIMIT>(EventBatch::default())?; 1964 let JustCount { 1965 creates, 1966 dids_estimate, 1967 .. 1968 } = read.get_collection_counts( 1969 &Nsid::new("a.b.c".to_string()).unwrap(), 1970 beginning(), 1971 None, 1972 )?; 1973 assert_eq!(creates, 0); 1974 assert_eq!(dids_estimate, 0); 1975 Ok(()) 1976 } 1977 1978 #[test] 1979 fn test_insert_one() -> anyhow::Result<()> { 1980 let (read, mut write) = fjall_db(); 1981 1982 let mut batch = TestBatch::default(); 1983 let collection = batch.create( 1984 "did:plc:inze6wrmsm7pjl7yta3oig77", 1985 "a.b.c", 1986 "asdf", 1987 "{}", 1988 Some("rev-z"), 1989 None, 1990 100, 1991 ); 1992 write.insert_batch(batch.batch)?; 1993 write.step_rollup()?; 1994 1995 let JustCount { 1996 creates, 1997 dids_estimate, 1998 .. 1999 } = read.get_collection_counts(&collection, beginning(), None)?; 2000 assert_eq!(creates, 1); 2001 assert_eq!(dids_estimate, 1); 2002 let JustCount { 2003 creates, 2004 dids_estimate, 2005 .. 2006 } = read.get_collection_counts( 2007 &Nsid::new("d.e.f".to_string()).unwrap(), 2008 beginning(), 2009 None, 2010 )?; 2011 assert_eq!(creates, 0); 2012 assert_eq!(dids_estimate, 0); 2013 2014 let records = read.get_records_by_collections([collection].into(), 2, false)?; 2015 assert_eq!(records.len(), 1); 2016 let rec = &records[0]; 2017 assert_eq!(rec.record.get(), "{}"); 2018 assert!(!rec.is_update); 2019 2020 let records = read.get_records_by_collections( 2021 [Nsid::new("d.e.f".to_string()).unwrap()].into(), 2022 2, 2023 false, 2024 )?; 2025 assert_eq!(records.len(), 0); 2026 2027 Ok(()) 2028 } 2029 2030 #[test] 2031 fn test_get_multi_collection() -> anyhow::Result<()> { 2032 let (read, mut write) = fjall_db(); 2033 2034 let mut batch = TestBatch::default(); 2035 batch.create( 2036 "did:plc:inze6wrmsm7pjl7yta3oig77", 2037 "a.a.a", 2038 "aaa", 2039 r#""earliest""#, 2040 Some("rev-a"), 2041 None, 2042 100, 2043 ); 2044 batch.create( 2045 "did:plc:inze6wrmsm7pjl7yta3oig77", 2046 "a.a.b", 2047 "aab", 2048 r#""in between""#, 2049 Some("rev-ab"), 2050 None, 2051 101, 2052 ); 2053 batch.create( 2054 "did:plc:inze6wrmsm7pjl7yta3oig77", 2055 "a.a.a", 2056 "aaa-2", 2057 r#""last""#, 2058 Some("rev-a-2"), 2059 None, 2060 102, 2061 ); 2062 write.insert_batch(batch.batch)?; 2063 2064 let records = read.get_records_by_collections( 2065 HashSet::from([ 2066 Nsid::new("a.a.a".to_string()).unwrap(), 2067 Nsid::new("a.a.b".to_string()).unwrap(), 2068 Nsid::new("a.a.c".to_string()).unwrap(), 2069 ]), 2070 100, 2071 false, 2072 )?; 2073 assert_eq!(records.len(), 3); 2074 assert_eq!(records[0].record.get(), r#""last""#); 2075 assert_eq!( 2076 records[0].collection, 2077 Nsid::new("a.a.a".to_string()).unwrap() 2078 ); 2079 assert_eq!(records[1].record.get(), r#""in between""#); 2080 assert_eq!( 2081 records[1].collection, 2082 Nsid::new("a.a.b".to_string()).unwrap() 2083 ); 2084 assert_eq!(records[2].record.get(), r#""earliest""#); 2085 assert_eq!( 2086 records[2].collection, 2087 Nsid::new("a.a.a".to_string()).unwrap() 2088 ); 2089 2090 Ok(()) 2091 } 2092 2093 #[test] 2094 fn test_get_multi_collection_expanded() -> anyhow::Result<()> { 2095 let (read, mut write) = fjall_db(); 2096 2097 let mut batch = TestBatch::default(); 2098 // insert some older ones in aab 2099 for i in 1..=3 { 2100 batch.create( 2101 "did:plc:inze6wrmsm7pjl7yta3oig77", 2102 "a.a.b", 2103 &format!("aab-{i}"), 2104 &format!(r#""b {i}""#), 2105 Some(&format!("rev-b-{i}")), 2106 None, 2107 100 + i, 2108 ); 2109 } 2110 // and some newer ones in aaa 2111 for i in 1..=3 { 2112 batch.create( 2113 "did:plc:inze6wrmsm7pjl7yta3oig77", 2114 "a.a.a", 2115 &format!("aaa-{i}"), 2116 &format!(r#""a {i}""#), 2117 Some(&format!("rev-a-{i}")), 2118 None, 2119 200 + i, 2120 ); 2121 } 2122 write.insert_batch(batch.batch)?; 2123 2124 let records = read.get_records_by_collections( 2125 HashSet::from([ 2126 Nsid::new("a.a.a".to_string()).unwrap(), 2127 Nsid::new("a.a.b".to_string()).unwrap(), 2128 Nsid::new("a.a.c".to_string()).unwrap(), 2129 ]), 2130 2, 2131 true, 2132 )?; 2133 assert_eq!(records.len(), 4); 2134 assert_eq!(records[0].record.get(), r#""a 3""#); 2135 assert_eq!( 2136 records[0].collection, 2137 Nsid::new("a.a.a".to_string()).unwrap() 2138 ); 2139 2140 assert_eq!(records[3].record.get(), r#""b 2""#); 2141 assert_eq!( 2142 records[3].collection, 2143 Nsid::new("a.a.b".to_string()).unwrap() 2144 ); 2145 2146 Ok(()) 2147 } 2148 2149 #[test] 2150 fn test_update_one() -> anyhow::Result<()> { 2151 let (read, mut write) = fjall_db(); 2152 2153 let mut batch = TestBatch::default(); 2154 let collection = batch.create( 2155 "did:plc:inze6wrmsm7pjl7yta3oig77", 2156 "a.b.c", 2157 "rkey-asdf", 2158 "{}", 2159 Some("rev-a"), 2160 None, 2161 100, 2162 ); 2163 write.insert_batch(batch.batch)?; 2164 2165 let mut batch = TestBatch::default(); 2166 batch.update( 2167 "did:plc:inze6wrmsm7pjl7yta3oig77", 2168 "a.b.c", 2169 "rkey-asdf", 2170 r#"{"ch": "ch-ch-ch-changes"}"#, 2171 Some("rev-z"), 2172 None, 2173 101, 2174 ); 2175 write.insert_batch(batch.batch)?; 2176 write.step_rollup()?; 2177 2178 let JustCount { 2179 creates, 2180 dids_estimate, 2181 .. 2182 } = read.get_collection_counts(&collection, beginning(), None)?; 2183 assert_eq!(creates, 1); 2184 assert_eq!(dids_estimate, 1); 2185 2186 let records = read.get_records_by_collections([collection].into(), 2, false)?; 2187 assert_eq!(records.len(), 1); 2188 let rec = &records[0]; 2189 assert_eq!(rec.record.get(), r#"{"ch": "ch-ch-ch-changes"}"#); 2190 assert!(rec.is_update); 2191 Ok(()) 2192 } 2193 2194 #[test] 2195 fn test_delete_one() -> anyhow::Result<()> { 2196 let (read, mut write) = fjall_db(); 2197 2198 let mut batch = TestBatch::default(); 2199 let collection = batch.create( 2200 "did:plc:inze6wrmsm7pjl7yta3oig77", 2201 "a.b.c", 2202 "rkey-asdf", 2203 "{}", 2204 Some("rev-a"), 2205 None, 2206 100, 2207 ); 2208 write.insert_batch(batch.batch)?; 2209 2210 let mut batch = TestBatch::default(); 2211 batch.delete( 2212 "did:plc:inze6wrmsm7pjl7yta3oig77", 2213 "a.b.c", 2214 "rkey-asdf", 2215 Some("rev-z"), 2216 101, 2217 ); 2218 write.insert_batch(batch.batch)?; 2219 write.step_rollup()?; 2220 2221 let JustCount { 2222 creates, 2223 dids_estimate, 2224 .. 2225 } = read.get_collection_counts(&collection, beginning(), None)?; 2226 assert_eq!(creates, 1); 2227 assert_eq!(dids_estimate, 1); 2228 2229 let records = read.get_records_by_collections([collection].into(), 2, false)?; 2230 assert_eq!(records.len(), 0); 2231 2232 Ok(()) 2233 } 2234 2235 #[test] 2236 fn test_collection_trim() -> anyhow::Result<()> { 2237 let (read, mut write) = fjall_db(); 2238 2239 let mut batch = TestBatch::default(); 2240 batch.create( 2241 "did:plc:inze6wrmsm7pjl7yta3oig77", 2242 "a.a.a", 2243 "rkey-aaa", 2244 "{}", 2245 Some("rev-aaa"), 2246 None, 2247 10_000, 2248 ); 2249 let mut last_b_cursor; 2250 for i in 1..=10 { 2251 last_b_cursor = 11_000 + i; 2252 batch.create( 2253 &format!("did:plc:inze6wrmsm7pjl7yta3oig7{}", i % 3), 2254 "a.a.b", 2255 &format!("rkey-bbb-{i}"), 2256 &format!(r#"{{"n": {i}}}"#), 2257 Some(&format!("rev-bbb-{i}")), 2258 None, 2259 last_b_cursor, 2260 ); 2261 } 2262 batch.create( 2263 "did:plc:inze6wrmsm7pjl7yta3oig77", 2264 "a.a.c", 2265 "rkey-ccc", 2266 "{}", 2267 Some("rev-ccc"), 2268 None, 2269 12_000, 2270 ); 2271 2272 write.insert_batch(batch.batch)?; 2273 2274 let records = read.get_records_by_collections( 2275 HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 2276 100, 2277 false, 2278 )?; 2279 assert_eq!(records.len(), 1); 2280 let records = read.get_records_by_collections( 2281 HashSet::from([Nsid::new("a.a.b".to_string()).unwrap()]), 2282 100, 2283 false, 2284 )?; 2285 assert_eq!(records.len(), 10); 2286 let records = read.get_records_by_collections( 2287 HashSet::from([Nsid::new("a.a.c".to_string()).unwrap()]), 2288 100, 2289 false, 2290 )?; 2291 assert_eq!(records.len(), 1); 2292 let records = read.get_records_by_collections( 2293 HashSet::from([Nsid::new("a.a.d".to_string()).unwrap()]), 2294 100, 2295 false, 2296 )?; 2297 assert_eq!(records.len(), 0); 2298 2299 write.trim_collection(&Nsid::new("a.a.a".to_string()).unwrap(), 6, false)?; 2300 write.trim_collection(&Nsid::new("a.a.b".to_string()).unwrap(), 6, false)?; 2301 write.trim_collection(&Nsid::new("a.a.c".to_string()).unwrap(), 6, false)?; 2302 write.trim_collection(&Nsid::new("a.a.d".to_string()).unwrap(), 6, false)?; 2303 2304 let records = read.get_records_by_collections( 2305 HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 2306 100, 2307 false, 2308 )?; 2309 assert_eq!(records.len(), 1); 2310 let records = read.get_records_by_collections( 2311 HashSet::from([Nsid::new("a.a.b".to_string()).unwrap()]), 2312 100, 2313 false, 2314 )?; 2315 assert_eq!(records.len(), 6); 2316 let records = read.get_records_by_collections( 2317 HashSet::from([Nsid::new("a.a.c".to_string()).unwrap()]), 2318 100, 2319 false, 2320 )?; 2321 assert_eq!(records.len(), 1); 2322 let records = read.get_records_by_collections( 2323 HashSet::from([Nsid::new("a.a.d".to_string()).unwrap()]), 2324 100, 2325 false, 2326 )?; 2327 assert_eq!(records.len(), 0); 2328 2329 Ok(()) 2330 } 2331 2332 #[test] 2333 fn test_delete_account() -> anyhow::Result<()> { 2334 let (read, mut write) = fjall_db(); 2335 2336 let mut batch = TestBatch::default(); 2337 batch.create( 2338 "did:plc:person-a", 2339 "a.a.a", 2340 "rkey-aaa", 2341 "{}", 2342 Some("rev-aaa"), 2343 None, 2344 10_000, 2345 ); 2346 for i in 1..=2 { 2347 batch.create( 2348 "did:plc:person-b", 2349 "a.a.a", 2350 &format!("rkey-bbb-{i}"), 2351 &format!(r#"{{"n": {i}}}"#), 2352 Some(&format!("rev-bbb-{i}")), 2353 None, 2354 11_000 + i, 2355 ); 2356 } 2357 write.insert_batch(batch.batch)?; 2358 2359 let records = read.get_records_by_collections( 2360 HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 2361 100, 2362 false, 2363 )?; 2364 assert_eq!(records.len(), 3); 2365 2366 let records_deleted = 2367 write.delete_account(&Did::new("did:plc:person-b".to_string()).unwrap())?; 2368 assert_eq!(records_deleted, 2); 2369 2370 let records = read.get_records_by_collections( 2371 HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 2372 100, 2373 false, 2374 )?; 2375 assert_eq!(records.len(), 1); 2376 2377 Ok(()) 2378 } 2379 2380 #[test] 2381 fn rollup_delete_account_removes_record() -> anyhow::Result<()> { 2382 let (read, mut write) = fjall_db(); 2383 2384 let mut batch = TestBatch::default(); 2385 batch.create( 2386 "did:plc:person-a", 2387 "a.a.a", 2388 "rkey-aaa", 2389 "{}", 2390 Some("rev-aaa"), 2391 None, 2392 10_000, 2393 ); 2394 write.insert_batch(batch.batch)?; 2395 2396 let mut batch = TestBatch::default(); 2397 batch.delete_account("did:plc:person-a", 9_999); // queue it before the rollup 2398 write.insert_batch(batch.batch)?; 2399 2400 write.step_rollup()?; 2401 2402 let records = read.get_records_by_collections( 2403 [Nsid::new("a.a.a".to_string()).unwrap()].into(), 2404 1, 2405 false, 2406 )?; 2407 assert_eq!(records.len(), 0); 2408 2409 Ok(()) 2410 } 2411 2412 #[test] 2413 fn rollup_delete_live_count_step() -> anyhow::Result<()> { 2414 let (read, mut write) = fjall_db(); 2415 2416 let mut batch = TestBatch::default(); 2417 batch.create( 2418 "did:plc:person-a", 2419 "a.a.a", 2420 "rkey-aaa", 2421 "{}", 2422 Some("rev-aaa"), 2423 None, 2424 10_000, 2425 ); 2426 write.insert_batch(batch.batch)?; 2427 2428 let (n, _) = write.step_rollup()?; 2429 assert_eq!(n, 1); 2430 2431 let mut batch = TestBatch::default(); 2432 batch.delete_account("did:plc:person-a", 10_001); 2433 write.insert_batch(batch.batch)?; 2434 2435 let records = read.get_records_by_collections( 2436 [Nsid::new("a.a.a".to_string()).unwrap()].into(), 2437 1, 2438 false, 2439 )?; 2440 assert_eq!(records.len(), 1); 2441 2442 let (n, _) = write.step_rollup()?; 2443 assert_eq!(n, 1); 2444 2445 let records = read.get_records_by_collections( 2446 [Nsid::new("a.a.a".to_string()).unwrap()].into(), 2447 1, 2448 false, 2449 )?; 2450 assert_eq!(records.len(), 0); 2451 2452 let mut batch = TestBatch::default(); 2453 batch.delete_account("did:plc:person-a", 9_999); 2454 write.insert_batch(batch.batch)?; 2455 2456 let (n, _) = write.step_rollup()?; 2457 assert_eq!(n, 0); 2458 2459 Ok(()) 2460 } 2461 2462 #[test] 2463 fn rollup_multiple_count_batches() -> anyhow::Result<()> { 2464 let (_read, mut write) = fjall_db(); 2465 2466 let mut batch = TestBatch::default(); 2467 batch.create( 2468 "did:plc:person-a", 2469 "a.a.a", 2470 "rkey-aaa", 2471 "{}", 2472 Some("rev-aaa"), 2473 None, 2474 10_000, 2475 ); 2476 write.insert_batch(batch.batch)?; 2477 2478 let mut batch = TestBatch::default(); 2479 batch.create( 2480 "did:plc:person-a", 2481 "a.a.a", 2482 "rkey-aab", 2483 "{}", 2484 Some("rev-aab"), 2485 None, 2486 10_001, 2487 ); 2488 write.insert_batch(batch.batch)?; 2489 2490 let (n, _) = write.step_rollup()?; 2491 assert_eq!(n, 2); 2492 2493 let (n, _) = write.step_rollup()?; 2494 assert_eq!(n, 0); 2495 2496 Ok(()) 2497 } 2498 2499 #[test] 2500 fn counts_before_and_after_rollup() -> anyhow::Result<()> { 2501 let (read, mut write) = fjall_db(); 2502 2503 let mut batch = TestBatch::default(); 2504 batch.create( 2505 "did:plc:person-a", 2506 "a.a.a", 2507 "rkey-aaa", 2508 "{}", 2509 Some("rev-aaa"), 2510 None, 2511 10_000, 2512 ); 2513 batch.create( 2514 "did:plc:person-b", 2515 "a.a.a", 2516 "rkey-bbb", 2517 "{}", 2518 Some("rev-bbb"), 2519 None, 2520 10_001, 2521 ); 2522 write.insert_batch(batch.batch)?; 2523 2524 let mut batch = TestBatch::default(); 2525 batch.delete_account("did:plc:person-a", 11_000); 2526 write.insert_batch(batch.batch)?; 2527 2528 let mut batch = TestBatch::default(); 2529 batch.create( 2530 "did:plc:person-a", 2531 "a.a.a", 2532 "rkey-aac", 2533 "{}", 2534 Some("rev-aac"), 2535 None, 2536 12_000, 2537 ); 2538 write.insert_batch(batch.batch)?; 2539 2540 // before any rollup 2541 let JustCount { 2542 creates, 2543 dids_estimate, 2544 .. 2545 } = read.get_collection_counts( 2546 &Nsid::new("a.a.a".to_string()).unwrap(), 2547 beginning(), 2548 None, 2549 )?; 2550 assert_eq!(creates, 0); 2551 assert_eq!(dids_estimate, 0); 2552 2553 // first batch rolled up 2554 let (n, _) = write.step_rollup()?; 2555 assert_eq!(n, 1); 2556 2557 let JustCount { 2558 creates, 2559 dids_estimate, 2560 .. 2561 } = read.get_collection_counts( 2562 &Nsid::new("a.a.a".to_string()).unwrap(), 2563 beginning(), 2564 None, 2565 )?; 2566 assert_eq!(creates, 2); 2567 assert_eq!(dids_estimate, 2); 2568 2569 // delete account rolled up 2570 let (n, _) = write.step_rollup()?; 2571 assert_eq!(n, 1); 2572 2573 let JustCount { 2574 creates, 2575 dids_estimate, 2576 .. 2577 } = read.get_collection_counts( 2578 &Nsid::new("a.a.a".to_string()).unwrap(), 2579 beginning(), 2580 None, 2581 )?; 2582 assert_eq!(creates, 2); 2583 assert_eq!(dids_estimate, 2); 2584 2585 // second batch rolled up 2586 let (n, _) = write.step_rollup()?; 2587 assert_eq!(n, 1); 2588 2589 let JustCount { 2590 creates, 2591 dids_estimate, 2592 .. 2593 } = read.get_collection_counts( 2594 &Nsid::new("a.a.a".to_string()).unwrap(), 2595 beginning(), 2596 None, 2597 )?; 2598 assert_eq!(creates, 3); 2599 assert_eq!(dids_estimate, 2); 2600 2601 // no more rollups left 2602 let (n, _) = write.step_rollup()?; 2603 assert_eq!(n, 0); 2604 2605 Ok(()) 2606 } 2607 2608 #[test] 2609 fn get_prefix_children_lexi_empty() { 2610 let (read, _) = fjall_db(); 2611 let ( 2612 JustCount { 2613 creates, 2614 dids_estimate, 2615 .. 2616 }, 2617 children, 2618 cursor, 2619 ) = read 2620 .get_prefix( 2621 NsidPrefix::new("aaa.aaa").unwrap(), 2622 10, 2623 OrderCollectionsBy::Lexi { cursor: None }, 2624 None, 2625 None, 2626 ) 2627 .unwrap(); 2628 2629 assert_eq!(creates, 0); 2630 assert_eq!(dids_estimate, 0); 2631 assert_eq!(children, vec![]); 2632 assert_eq!(cursor, None); 2633 } 2634 2635 #[test] 2636 fn get_prefix_excludes_exact_collection() -> anyhow::Result<()> { 2637 let (read, mut write) = fjall_db(); 2638 2639 let mut batch = TestBatch::default(); 2640 batch.create( 2641 "did:plc:person-a", 2642 "a.a.a", 2643 "rkey-aaa", 2644 "{}", 2645 Some("rev-aaa"), 2646 None, 2647 10_000, 2648 ); 2649 write.insert_batch(batch.batch)?; 2650 write.step_rollup()?; 2651 2652 let ( 2653 JustCount { 2654 creates, 2655 dids_estimate, 2656 .. 2657 }, 2658 children, 2659 cursor, 2660 ) = read.get_prefix( 2661 NsidPrefix::new("a.a.a").unwrap(), 2662 10, 2663 OrderCollectionsBy::Lexi { cursor: None }, 2664 None, 2665 None, 2666 )?; 2667 assert_eq!(creates, 0); 2668 assert_eq!(dids_estimate, 0); 2669 assert_eq!(children, vec![]); 2670 assert_eq!(cursor, None); 2671 Ok(()) 2672 } 2673 2674 #[test] 2675 fn get_prefix_excludes_neighbour_collection() -> anyhow::Result<()> { 2676 let (read, mut write) = fjall_db(); 2677 2678 let mut batch = TestBatch::default(); 2679 batch.create( 2680 "did:plc:person-a", 2681 "a.a.aa", 2682 "rkey-aaa", 2683 "{}", 2684 Some("rev-aaa"), 2685 None, 2686 10_000, 2687 ); 2688 write.insert_batch(batch.batch)?; 2689 write.step_rollup()?; 2690 2691 let ( 2692 JustCount { 2693 creates, 2694 dids_estimate, 2695 .. 2696 }, 2697 children, 2698 cursor, 2699 ) = read.get_prefix( 2700 NsidPrefix::new("a.a.a").unwrap(), 2701 10, 2702 OrderCollectionsBy::Lexi { cursor: None }, 2703 None, 2704 None, 2705 )?; 2706 assert_eq!(creates, 0); 2707 assert_eq!(dids_estimate, 0); 2708 assert_eq!(children, vec![]); 2709 assert_eq!(cursor, None); 2710 Ok(()) 2711 } 2712 2713 #[test] 2714 fn get_prefix_includes_child_collection() -> anyhow::Result<()> { 2715 let (read, mut write) = fjall_db(); 2716 2717 let mut batch = TestBatch::default(); 2718 batch.create( 2719 "did:plc:person-a", 2720 "a.a.a", 2721 "rkey-aaa", 2722 "{}", 2723 Some("rev-aaa"), 2724 None, 2725 10_000, 2726 ); 2727 write.insert_batch(batch.batch)?; 2728 write.step_rollup()?; 2729 2730 let ( 2731 JustCount { 2732 creates, 2733 dids_estimate, 2734 .. 2735 }, 2736 children, 2737 cursor, 2738 ) = read.get_prefix( 2739 NsidPrefix::new("a.a").unwrap(), 2740 10, 2741 OrderCollectionsBy::Lexi { cursor: None }, 2742 None, 2743 None, 2744 )?; 2745 assert_eq!(creates, 1); 2746 assert_eq!(dids_estimate, 1); 2747 assert_eq!( 2748 children, 2749 vec![PrefixChild::Collection(NsidCount { 2750 nsid: "a.a.a".to_string(), 2751 creates: 1, 2752 updates: 0, 2753 deletes: 0, 2754 dids_estimate: 1 2755 }),] 2756 ); 2757 assert_eq!(cursor, None); 2758 Ok(()) 2759 } 2760 2761 #[test] 2762 fn get_prefix_includes_child_prefix() -> anyhow::Result<()> { 2763 let (read, mut write) = fjall_db(); 2764 2765 let mut batch = TestBatch::default(); 2766 batch.create( 2767 "did:plc:person-a", 2768 "a.a.a.a", 2769 "rkey-aaaa", 2770 "{}", 2771 Some("rev-aaaa"), 2772 None, 2773 10_000, 2774 ); 2775 write.insert_batch(batch.batch)?; 2776 write.step_rollup()?; 2777 2778 let ( 2779 JustCount { 2780 creates, 2781 dids_estimate, 2782 .. 2783 }, 2784 children, 2785 cursor, 2786 ) = read.get_prefix( 2787 NsidPrefix::new("a.a").unwrap(), 2788 10, 2789 OrderCollectionsBy::Lexi { cursor: None }, 2790 None, 2791 None, 2792 )?; 2793 assert_eq!(creates, 1); 2794 assert_eq!(dids_estimate, 1); 2795 assert_eq!( 2796 children, 2797 vec![PrefixChild::Prefix(PrefixCount { 2798 prefix: "a.a.a".to_string(), 2799 creates: 1, 2800 updates: 0, 2801 deletes: 0, 2802 dids_estimate: 1, 2803 }),] 2804 ); 2805 assert_eq!(cursor, None); 2806 Ok(()) 2807 } 2808 2809 #[test] 2810 fn get_prefix_merges_child_prefixes() -> anyhow::Result<()> { 2811 let (read, mut write) = fjall_db(); 2812 2813 let mut batch = TestBatch::default(); 2814 batch.create( 2815 "did:plc:person-a", 2816 "a.a.a.a", 2817 "rkey-aaaa", 2818 "{}", 2819 Some("rev-aaaa"), 2820 None, 2821 10_000, 2822 ); 2823 batch.create( 2824 "did:plc:person-a", 2825 "a.a.a.b", 2826 "rkey-aaab", 2827 "{}", 2828 Some("rev-aaab"), 2829 None, 2830 10_001, 2831 ); 2832 write.insert_batch(batch.batch)?; 2833 write.step_rollup()?; 2834 2835 let ( 2836 JustCount { 2837 creates, 2838 dids_estimate, 2839 .. 2840 }, 2841 children, 2842 cursor, 2843 ) = read.get_prefix( 2844 NsidPrefix::new("a.a").unwrap(), 2845 10, 2846 OrderCollectionsBy::Lexi { cursor: None }, 2847 None, 2848 None, 2849 )?; 2850 assert_eq!(creates, 2); 2851 assert_eq!(dids_estimate, 1); 2852 assert_eq!( 2853 children, 2854 vec![PrefixChild::Prefix(PrefixCount { 2855 prefix: "a.a.a".to_string(), 2856 creates: 2, 2857 updates: 0, 2858 deletes: 0, 2859 dids_estimate: 1 2860 }),] 2861 ); 2862 assert_eq!(cursor, None); 2863 Ok(()) 2864 } 2865 2866 #[test] 2867 fn get_prefix_exact_and_child_and_prefix() -> anyhow::Result<()> { 2868 let (read, mut write) = fjall_db(); 2869 2870 let mut batch = TestBatch::default(); 2871 // exact: 2872 batch.create( 2873 "did:plc:person-a", 2874 "a.a.a", 2875 "rkey-aaa", 2876 "{}", 2877 Some("rev-aaa"), 2878 None, 2879 10_000, 2880 ); 2881 // child: 2882 batch.create( 2883 "did:plc:person-a", 2884 "a.a.a.a", 2885 "rkey-aaaa", 2886 "{}", 2887 Some("rev-aaaa"), 2888 None, 2889 10_001, 2890 ); 2891 // prefix: 2892 batch.create( 2893 "did:plc:person-a", 2894 "a.a.a.a.a", 2895 "rkey-aaaaa", 2896 "{}", 2897 Some("rev-aaaaa"), 2898 None, 2899 10_002, 2900 ); 2901 write.insert_batch(batch.batch)?; 2902 write.step_rollup()?; 2903 2904 let ( 2905 JustCount { 2906 creates, 2907 dids_estimate, 2908 .. 2909 }, 2910 children, 2911 cursor, 2912 ) = read.get_prefix( 2913 NsidPrefix::new("a.a.a").unwrap(), 2914 10, 2915 OrderCollectionsBy::Lexi { cursor: None }, 2916 None, 2917 None, 2918 )?; 2919 assert_eq!(creates, 2); 2920 assert_eq!(dids_estimate, 1); 2921 assert_eq!( 2922 children, 2923 vec![ 2924 PrefixChild::Collection(NsidCount { 2925 nsid: "a.a.a.a".to_string(), 2926 creates: 1, 2927 updates: 0, 2928 deletes: 0, 2929 dids_estimate: 1 2930 }), 2931 PrefixChild::Prefix(PrefixCount { 2932 prefix: "a.a.a.a".to_string(), 2933 creates: 1, 2934 updates: 0, 2935 deletes: 0, 2936 dids_estimate: 1 2937 }), 2938 ] 2939 ); 2940 assert_eq!(cursor, None); 2941 Ok(()) 2942 } 2943}