Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::db_types::{ 2 db_complete, DbBytes, DbStaticStr, EncodingResult, StaticStr, SubPrefixBytes, 3}; 4use crate::error::StorageError; 5use crate::storage::{StorageResult, StorageWhatever, StoreBackground, StoreReader, StoreWriter}; 6use crate::store_types::{ 7 AllTimeDidsKey, AllTimeRecordsKey, AllTimeRollupKey, CommitCounts, CountsValue, CursorBucket, 8 DeleteAccountQueueKey, DeleteAccountQueueVal, HourTruncatedCursor, HourlyDidsKey, 9 HourlyRecordsKey, HourlyRollupKey, HourlyRollupStaticPrefix, JetstreamCursorKey, 10 JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey, 11 NewRollupCursorKey, NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal, 12 RecordLocationKey, RecordLocationMeta, RecordLocationVal, RecordRawValue, SketchSecretKey, 13 SketchSecretPrefix, TakeoffKey, TakeoffValue, TrimCollectionCursorKey, WeekTruncatedCursor, 14 WeeklyDidsKey, WeeklyRecordsKey, WeeklyRollupKey, WithCollection, WithRank, HOUR_IN_MICROS, 15 WEEK_IN_MICROS, 16}; 17use crate::{ 18 nice_duration, CommitAction, ConsumerInfo, Did, EncodingError, EventBatch, JustCount, Nsid, 19 NsidCount, NsidPrefix, OrderCollectionsBy, PrefixChild, PrefixCount, UFOsRecord, 20}; 21use async_trait::async_trait; 22use fjall::{ 23 Batch as FjallBatch, Config, Keyspace, PartitionCreateOptions, PartitionHandle, Snapshot, 24}; 25use jetstream::events::Cursor; 26use lsm_tree::AbstractTree; 27use metrics::{ 28 counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram, Unit, 29}; 30use std::collections::{HashMap, HashSet}; 31use std::iter::Peekable; 32use std::ops::Bound; 33use std::path::Path; 34use std::sync::{ 35 atomic::{AtomicBool, Ordering}, 36 Arc, 37}; 38use std::time::{Duration, Instant, SystemTime}; 39 40const MAX_BATCHED_ACCOUNT_DELETE_RECORDS: usize = 1024; 41const MAX_BATCHED_ROLLUP_COUNTS: usize = 256; 42 43/// 44/// new data format, roughly: 45/// 46/// Partition: 'global' 47/// 48/// - Global sequence counter (is the jetstream cursor -- monotonic with many gaps) 49/// - key: "js_cursor" (literal) 50/// - val: u64 51/// 52/// - Jetstream server endpoint (persisted because the cursor can't be used on another instance without data loss) 53/// - key: "js_endpoint" (literal) 54/// - val: string (URL of the instance) 55/// 56/// - Launch date 57/// - key: "takeoff" (literal) 58/// - val: u64 (micros timestamp, not from jetstream for now so not precise) 59/// 60/// - Cardinality estimator secret 61/// - key: "sketch_secret" (literal) 62/// - val: [u8; 16] 63/// 64/// - Rollup cursor (bg work: roll stats into hourlies, delete accounts, old record deletes) 65/// - key: "rollup_cursor" (literal) 66/// - val: u64 (tracks behind js_cursor) 67/// 68/// - Feed trim cursor (bg work: delete oldest excess records) 69/// - key: "trim_cursor" || nullstr (nsid) 70/// - val: u64 (earliest previously-removed feed entry jetstream cursor) 71/// 72/// Partition: 'feed' 73/// 74/// - Per-collection list of record references ordered by jetstream cursor 75/// - key: nullstr || u64 (collection nsid null-terminated, jetstream cursor) 76/// - val: nullstr || nullstr || nullstr (did, rkey, rev. rev is mostly a sanity-check for now.) 77/// 78/// 79/// Partition: 'records' 80/// 81/// - Actual records by their atproto location 82/// - key: nullstr || nullstr || nullstr (did, collection, rkey) 83/// - val: u64 || bool || nullstr || rawval (js_cursor, is_update, rev, actual record) 84/// 85/// 86/// Partition: 'rollups' 87/// 88/// - Live (batched) records counts and dids estimate per collection 89/// - key: "live_counts" || u64 || nullstr (js_cursor, nsid) 90/// - val: u64 || HLL (count (not cursor), estimator) 91/// 92/// 93/// - Hourly total record counts and dids estimate per collection 94/// - key: "hourly_counts" || u64 || nullstr (hour, nsid) 95/// - val: u64 || HLL (count (not cursor), estimator) 96/// 97/// - Hourly record count ranking 98/// - key: "hourly_rank_records" || u64 || u64 || nullstr (hour, count, nsid) 99/// - val: [empty] 100/// 101/// - Hourly did estimate ranking 102/// - key: "hourly_rank_dids" || u64 || u64 || nullstr (hour, dids estimate, nsid) 103/// - val: [empty] 104/// 105/// 106/// - Weekly total record counts and dids estimate per collection 107/// - key: "weekly_counts" || u64 || nullstr (week, nsid) 108/// - val: u64 || HLL (count (not cursor), estimator) 109/// 110/// - Weekly record count ranking 111/// - key: "weekly_rank_records" || u64 || u64 || nullstr (week, count, nsid) 112/// - val: [empty] 113/// 114/// - Weekly did estimate ranking 115/// - key: "weekly_rank_dids" || u64 || u64 || nullstr (week, dids estimate, nsid) 116/// - val: [empty] 117/// 118/// 119/// - All-time total record counts and dids estimate per collection 120/// - key: "ever_counts" || nullstr (nsid) 121/// - val: u64 || HLL (count (not cursor), estimator) 122/// 123/// - All-time total record record count ranking 124/// - key: "ever_rank_records" || u64 || nullstr (count, nsid) 125/// - val: [empty] 126/// 127/// - All-time did estimate ranking 128/// - key: "ever_rank_dids" || u64 || nullstr (dids estimate, nsid) 129/// - val: [empty] 130/// 131/// 132/// Partition: 'queues' 133/// 134/// - Delete account queue 135/// - key: "delete_acount" || u64 (js_cursor) 136/// - val: nullstr (did) 137/// 138/// 139/// TODO: moderation actions 140/// TODO: account privacy preferences. Might wait for the protocol-level (PDS-level?) stuff to land. Will probably do lazy fetching + caching on read. 141#[derive(Debug)] 142pub struct FjallStorage {} 143 144#[derive(Debug, Default)] 145pub struct FjallConfig { 146 /// drop the db when the storage is dropped 147 /// 148 /// this is only meant for tests 149 #[cfg(test)] 150 pub temp: bool, 151} 152 153impl StorageWhatever<FjallReader, FjallWriter, FjallBackground, FjallConfig> for FjallStorage { 154 fn init( 155 path: impl AsRef<Path>, 156 endpoint: String, 157 force_endpoint: bool, 158 _config: FjallConfig, 159 ) -> StorageResult<(FjallReader, FjallWriter, Option<Cursor>, SketchSecretPrefix)> { 160 let keyspace = { 161 let config = Config::new(path); 162 163 // #[cfg(not(test))] 164 // let config = config.fsync_ms(Some(4_000)); 165 166 config.open()? 167 }; 168 169 let global = keyspace.open_partition("global", PartitionCreateOptions::default())?; 170 let feeds = keyspace.open_partition("feeds", PartitionCreateOptions::default())?; 171 let records = keyspace.open_partition("records", PartitionCreateOptions::default())?; 172 let rollups = keyspace.open_partition("rollups", PartitionCreateOptions::default())?; 173 let queues = keyspace.open_partition("queues", PartitionCreateOptions::default())?; 174 175 let js_cursor = get_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?; 176 177 let sketch_secret = if js_cursor.is_some() { 178 let stored_endpoint = 179 get_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)?; 180 let JetstreamEndpointValue(stored) = stored_endpoint.ok_or(StorageError::InitError( 181 "found cursor but missing js_endpoint, refusing to start.".to_string(), 182 ))?; 183 184 let Some(stored_secret) = 185 get_static_neu::<SketchSecretKey, SketchSecretPrefix>(&global)? 186 else { 187 return Err(StorageError::InitError( 188 "found cursor but missing sketch_secret, refusing to start.".to_string(), 189 )); 190 }; 191 192 if stored != endpoint { 193 if force_endpoint { 194 log::warn!("forcing a jetstream switch from {stored:?} to {endpoint:?}"); 195 insert_static_neu::<JetstreamEndpointKey>( 196 &global, 197 JetstreamEndpointValue(endpoint.to_string()), 198 )?; 199 } else { 200 return Err(StorageError::InitError(format!( 201 "stored js_endpoint {stored:?} differs from provided {endpoint:?}, refusing to start without --jetstream-force."))); 202 } 203 } 204 stored_secret 205 } else { 206 log::info!("initializing a fresh db!"); 207 init_static_neu::<JetstreamEndpointKey>( 208 &global, 209 JetstreamEndpointValue(endpoint.to_string()), 210 )?; 211 212 log::info!("generating new secret for cardinality sketches..."); 213 let mut sketch_secret: SketchSecretPrefix = [0u8; 16]; 214 getrandom::fill(&mut sketch_secret).map_err(|e| { 215 StorageError::InitError(format!( 216 "failed to get a random secret for cardinality sketches: {e:?}" 217 )) 218 })?; 219 init_static_neu::<SketchSecretKey>(&global, sketch_secret)?; 220 221 init_static_neu::<TakeoffKey>(&global, Cursor::at(SystemTime::now()))?; 222 init_static_neu::<NewRollupCursorKey>(&global, Cursor::from_start())?; 223 224 sketch_secret 225 }; 226 227 let reader = FjallReader { 228 keyspace: keyspace.clone(), 229 global: global.clone(), 230 feeds: feeds.clone(), 231 records: records.clone(), 232 rollups: rollups.clone(), 233 queues: queues.clone(), 234 }; 235 reader.describe_metrics(); 236 let writer = FjallWriter { 237 bg_taken: Arc::new(AtomicBool::new(false)), 238 keyspace, 239 global, 240 feeds, 241 records, 242 rollups, 243 queues, 244 }; 245 writer.describe_metrics(); 246 Ok((reader, writer, js_cursor, sketch_secret)) 247 } 248} 249 250type FjallRKV = fjall::Result<(fjall::Slice, fjall::Slice)>; 251 252#[derive(Clone)] 253pub struct FjallReader { 254 keyspace: Keyspace, 255 global: PartitionHandle, 256 feeds: PartitionHandle, 257 records: PartitionHandle, 258 rollups: PartitionHandle, 259 queues: PartitionHandle, 260} 261 262/// An iterator that knows how to skip over deleted/invalidated records 263struct RecordIterator { 264 db_iter: Box<dyn Iterator<Item = FjallRKV>>, 265 records: PartitionHandle, 266 limit: usize, 267 fetched: usize, 268} 269impl RecordIterator { 270 pub fn new( 271 feeds: &PartitionHandle, 272 records: PartitionHandle, 273 collection: &Nsid, 274 limit: usize, 275 ) -> StorageResult<Self> { 276 let prefix = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?; 277 let db_iter = feeds.prefix(prefix).rev(); 278 Ok(Self { 279 db_iter: Box::new(db_iter), 280 records, 281 limit, 282 fetched: 0, 283 }) 284 } 285 fn get_record(&self, db_next: FjallRKV) -> StorageResult<Option<UFOsRecord>> { 286 let (key_bytes, val_bytes) = db_next?; 287 let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?; 288 let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?; 289 let location_key: RecordLocationKey = (&feed_key, &feed_val).into(); 290 291 let Some(location_val_bytes) = self.records.get(location_key.to_db_bytes()?)? else { 292 // record was deleted (hopefully) 293 return Ok(None); 294 }; 295 296 let (meta, n) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?; 297 298 if meta.cursor() != feed_key.cursor() { 299 // older/different version 300 return Ok(None); 301 } 302 if meta.rev != feed_val.rev() { 303 // weird... 304 log::warn!("record lookup: cursor match but rev did not...? excluding."); 305 return Ok(None); 306 } 307 let Some(raw_value_bytes) = location_val_bytes.get(n..) else { 308 log::warn!( 309 "record lookup: found record but could not get bytes to decode the record??" 310 ); 311 return Ok(None); 312 }; 313 let rawval = db_complete::<RecordRawValue>(raw_value_bytes)?; 314 Ok(Some(UFOsRecord { 315 collection: feed_key.collection().clone(), 316 cursor: feed_key.cursor(), 317 did: feed_val.did().clone(), 318 rkey: feed_val.rkey().clone(), 319 rev: meta.rev.to_string(), 320 record: rawval.try_into()?, 321 is_update: meta.is_update, 322 })) 323 } 324} 325impl Iterator for RecordIterator { 326 type Item = StorageResult<Option<UFOsRecord>>; 327 fn next(&mut self) -> Option<Self::Item> { 328 if self.fetched == self.limit { 329 return Some(Ok(None)); 330 } 331 let record = loop { 332 let db_next = self.db_iter.next()?; // None short-circuits here 333 match self.get_record(db_next) { 334 Err(e) => return Some(Err(e)), 335 Ok(Some(record)) => break record, 336 Ok(None) => continue, 337 } 338 }; 339 self.fetched += 1; 340 Some(Ok(Some(record))) 341 } 342} 343 344type GetCounts = Box<dyn FnOnce() -> StorageResult<CountsValue>>; 345type GetByterCounts = StorageResult<(Nsid, GetCounts)>; 346type NsidCounter = Box<dyn Iterator<Item = GetByterCounts>>; 347fn get_lexi_iter<T: WithCollection + DbBytes + 'static>( 348 snapshot: &Snapshot, 349 start: Bound<Vec<u8>>, 350 end: Bound<Vec<u8>>, 351) -> StorageResult<NsidCounter> { 352 Ok(Box::new(snapshot.range((start, end)).map(|kv| { 353 let (k_bytes, v_bytes) = kv?; 354 let key = db_complete::<T>(&k_bytes)?; 355 let nsid = key.collection().clone(); 356 let get_counts: GetCounts = Box::new(move || Ok(db_complete::<CountsValue>(&v_bytes)?)); 357 Ok((nsid, get_counts)) 358 }))) 359} 360type GetRollupKey = Arc<dyn Fn(&Nsid) -> EncodingResult<Vec<u8>>>; 361fn get_lookup_iter<T: WithCollection + WithRank + DbBytes + 'static>( 362 snapshot: lsm_tree::Snapshot, 363 start: Bound<Vec<u8>>, 364 end: Bound<Vec<u8>>, 365 get_rollup_key: GetRollupKey, 366) -> StorageResult<NsidCounter> { 367 Ok(Box::new(snapshot.range((start, end)).rev().map( 368 move |kv| { 369 let (k_bytes, _) = kv?; 370 let key = db_complete::<T>(&k_bytes)?; 371 let nsid = key.collection().clone(); 372 let get_counts: GetCounts = Box::new({ 373 let nsid = nsid.clone(); 374 let snapshot = snapshot.clone(); 375 let get_rollup_key = get_rollup_key.clone(); 376 move || { 377 let db_count_bytes = snapshot.get(get_rollup_key(&nsid)?)?.expect( 378 "integrity: all-time rank rollup must have corresponding all-time count rollup", 379 ); 380 Ok(db_complete::<CountsValue>(&db_count_bytes)?) 381 } 382 }); 383 Ok((nsid, get_counts)) 384 }, 385 ))) 386} 387 388type CollectionSerieses = HashMap<Nsid, Vec<CountsValue>>; 389 390impl FjallReader { 391 fn describe_metrics(&self) { 392 describe_gauge!( 393 "storage_fjall_l0_run_count", 394 Unit::Count, 395 "number of L0 runs in a partition" 396 ); 397 describe_gauge!( 398 "storage_fjall_keyspace_disk_space", 399 Unit::Bytes, 400 "total storage used according to fjall" 401 ); 402 describe_gauge!( 403 "storage_fjall_journal_count", 404 Unit::Count, 405 "total keyspace journals according to fjall" 406 ); 407 describe_gauge!( 408 "storage_fjall_keyspace_sequence", 409 Unit::Count, 410 "fjall keyspace sequence" 411 ); 412 } 413 414 fn get_storage_stats(&self) -> StorageResult<serde_json::Value> { 415 let rollup_cursor = 416 get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)? 417 .map(|c| c.to_raw_u64()); 418 419 Ok(serde_json::json!({ 420 "keyspace_disk_space": self.keyspace.disk_space(), 421 "keyspace_journal_count": self.keyspace.journal_count(), 422 "keyspace_sequence": self.keyspace.instant(), 423 "rollup_cursor": rollup_cursor, 424 })) 425 } 426 427 fn get_consumer_info(&self) -> StorageResult<ConsumerInfo> { 428 let global = self.global.snapshot(); 429 430 let endpoint = 431 get_snapshot_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)? 432 .ok_or(StorageError::BadStateError( 433 "Could not find jetstream endpoint".to_string(), 434 ))? 435 .0; 436 437 let started_at = get_snapshot_static_neu::<TakeoffKey, TakeoffValue>(&global)? 438 .ok_or(StorageError::BadStateError( 439 "Could not find jetstream takeoff time".to_string(), 440 ))? 441 .to_raw_u64(); 442 443 let latest_cursor = 444 get_snapshot_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)? 445 .map(|c| c.to_raw_u64()); 446 447 let rollup_cursor = 448 get_snapshot_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&global)? 449 .map(|c| c.to_raw_u64()); 450 451 Ok(ConsumerInfo::Jetstream { 452 endpoint, 453 started_at, 454 latest_cursor, 455 rollup_cursor, 456 }) 457 } 458 459 fn get_earliest_hour(&self, rollups: Option<&Snapshot>) -> StorageResult<HourTruncatedCursor> { 460 let cursor = rollups 461 .unwrap_or(&self.rollups.snapshot()) 462 .prefix(HourlyRollupStaticPrefix::default().to_db_bytes()?) 463 .next() 464 .transpose()? 465 .map(|(key_bytes, _)| db_complete::<HourlyRollupKey>(&key_bytes)) 466 .transpose()? 467 .map(|key| key.cursor()) 468 .unwrap_or_else(|| Cursor::from_start().into()); 469 Ok(cursor) 470 } 471 472 fn get_lexi_collections( 473 &self, 474 snapshot: Snapshot, 475 limit: usize, 476 cursor: Option<Vec<u8>>, 477 buckets: Vec<CursorBucket>, 478 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> { 479 let cursor_nsid = cursor.as_deref().map(db_complete::<Nsid>).transpose()?; 480 let mut iters: Vec<Peekable<NsidCounter>> = Vec::with_capacity(buckets.len()); 481 for bucket in &buckets { 482 let it: NsidCounter = match bucket { 483 CursorBucket::Hour(t) => { 484 let start = cursor_nsid 485 .as_ref() 486 .map(|nsid| HourlyRollupKey::after_nsid(*t, nsid)) 487 .unwrap_or_else(|| HourlyRollupKey::start(*t))?; 488 let end = HourlyRollupKey::end(*t)?; 489 get_lexi_iter::<HourlyRollupKey>(&snapshot, start, end)? 490 } 491 CursorBucket::Week(t) => { 492 let start = cursor_nsid 493 .as_ref() 494 .map(|nsid| WeeklyRollupKey::after_nsid(*t, nsid)) 495 .unwrap_or_else(|| WeeklyRollupKey::start(*t))?; 496 let end = WeeklyRollupKey::end(*t)?; 497 get_lexi_iter::<WeeklyRollupKey>(&snapshot, start, end)? 498 } 499 CursorBucket::AllTime => { 500 let start = cursor_nsid 501 .as_ref() 502 .map(AllTimeRollupKey::after_nsid) 503 .unwrap_or_else(AllTimeRollupKey::start)?; 504 let end = AllTimeRollupKey::end()?; 505 get_lexi_iter::<AllTimeRollupKey>(&snapshot, start, end)? 506 } 507 }; 508 iters.push(it.peekable()); 509 } 510 511 let mut out = Vec::new(); 512 let mut current_nsid = None; 513 for _ in 0..limit { 514 // double-scan the iters for each element: this could be eliminated but we're starting simple. 515 // first scan: find the lowest nsid 516 // second scan: take + merge, and advance all iters with lowest nsid 517 let mut lowest: Option<Nsid> = None; 518 for iter in &mut iters { 519 if let Some(bla) = iter.peek_mut() { 520 let (nsid, _) = match bla { 521 Ok(v) => v, 522 Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?, 523 }; 524 lowest = match lowest { 525 Some(ref current) if nsid.as_str() > current.as_str() => lowest, 526 _ => Some(nsid.clone()), 527 }; 528 } 529 } 530 current_nsid = lowest.clone(); 531 let Some(nsid) = lowest else { break }; 532 533 let mut merged = CountsValue::default(); 534 for iter in &mut iters { 535 // unwrap: potential fjall error was already checked & bailed over when peeking in the first loop 536 if let Some(Ok((_, get_counts))) = iter.next_if(|v| v.as_ref().unwrap().0 == nsid) { 537 let counts = get_counts()?; 538 merged.merge(&counts); 539 } 540 } 541 out.push(NsidCount::new(&nsid, &merged)); 542 } 543 544 let next_cursor = current_nsid.map(|s| s.to_db_bytes()).transpose()?; 545 Ok((out, next_cursor)) 546 } 547 548 fn get_ordered_collections( 549 &self, 550 snapshot: Snapshot, 551 limit: usize, 552 order: OrderCollectionsBy, 553 buckets: Vec<CursorBucket>, 554 ) -> StorageResult<Vec<NsidCount>> { 555 let mut iters: Vec<NsidCounter> = Vec::with_capacity(buckets.len()); 556 557 for bucket in buckets { 558 let it: NsidCounter = match (&order, bucket) { 559 (OrderCollectionsBy::RecordsCreated, CursorBucket::Hour(t)) => { 560 get_lookup_iter::<HourlyRecordsKey>( 561 snapshot.clone(), 562 HourlyRecordsKey::start(t)?, 563 HourlyRecordsKey::end(t)?, 564 Arc::new({ 565 move |collection| HourlyRollupKey::new(t, collection).to_db_bytes() 566 }), 567 )? 568 } 569 (OrderCollectionsBy::DidsEstimate, CursorBucket::Hour(t)) => { 570 get_lookup_iter::<HourlyDidsKey>( 571 snapshot.clone(), 572 HourlyDidsKey::start(t)?, 573 HourlyDidsKey::end(t)?, 574 Arc::new({ 575 move |collection| HourlyRollupKey::new(t, collection).to_db_bytes() 576 }), 577 )? 578 } 579 (OrderCollectionsBy::RecordsCreated, CursorBucket::Week(t)) => { 580 get_lookup_iter::<WeeklyRecordsKey>( 581 snapshot.clone(), 582 WeeklyRecordsKey::start(t)?, 583 WeeklyRecordsKey::end(t)?, 584 Arc::new({ 585 move |collection| WeeklyRollupKey::new(t, collection).to_db_bytes() 586 }), 587 )? 588 } 589 (OrderCollectionsBy::DidsEstimate, CursorBucket::Week(t)) => { 590 get_lookup_iter::<WeeklyDidsKey>( 591 snapshot.clone(), 592 WeeklyDidsKey::start(t)?, 593 WeeklyDidsKey::end(t)?, 594 Arc::new({ 595 move |collection| WeeklyRollupKey::new(t, collection).to_db_bytes() 596 }), 597 )? 598 } 599 (OrderCollectionsBy::RecordsCreated, CursorBucket::AllTime) => { 600 get_lookup_iter::<AllTimeRecordsKey>( 601 snapshot.clone(), 602 AllTimeRecordsKey::start()?, 603 AllTimeRecordsKey::end()?, 604 Arc::new(|collection| AllTimeRollupKey::new(collection).to_db_bytes()), 605 )? 606 } 607 (OrderCollectionsBy::DidsEstimate, CursorBucket::AllTime) => { 608 get_lookup_iter::<AllTimeDidsKey>( 609 snapshot.clone(), 610 AllTimeDidsKey::start()?, 611 AllTimeDidsKey::end()?, 612 Arc::new(|collection| AllTimeRollupKey::new(collection).to_db_bytes()), 613 )? 614 } 615 (OrderCollectionsBy::Lexi { .. }, _) => unreachable!(), 616 }; 617 iters.push(it); 618 } 619 620 // overfetch by taking a bit more than the limit 621 // merge by collection 622 // sort by requested order, take limit, discard all remaining 623 // 624 // this isn't guaranteed to be correct, but it will hopefully be close most of the time: 625 // - it's possible that some NSIDs might score low during some time-buckets, and miss being merged 626 // - overfetching hopefully helps a bit by catching nsids near the threshold more often, but. yeah. 627 // 628 // this thing is heavy, there's probably a better way 629 let mut ranked: HashMap<Nsid, CountsValue> = HashMap::with_capacity(limit * 2); 630 for iter in iters { 631 for pair in iter.take((limit as f64 * 1.3).ceil() as usize) { 632 let (nsid, get_counts) = pair?; 633 let counts = get_counts()?; 634 ranked.entry(nsid).or_default().merge(&counts); 635 } 636 } 637 let mut ranked: Vec<(Nsid, CountsValue)> = ranked.into_iter().collect(); 638 match order { 639 OrderCollectionsBy::RecordsCreated => ranked.sort_by_key(|(_, c)| c.counts().creates), 640 OrderCollectionsBy::DidsEstimate => ranked.sort_by_key(|(_, c)| c.dids().estimate()), 641 OrderCollectionsBy::Lexi { .. } => unreachable!(), 642 } 643 let counts = ranked 644 .into_iter() 645 .rev() 646 .take(limit) 647 .map(|(nsid, cv)| NsidCount::new(&nsid, &cv)) 648 .collect(); 649 Ok(counts) 650 } 651 652 fn get_collections( 653 &self, 654 limit: usize, 655 order: OrderCollectionsBy, 656 since: Option<HourTruncatedCursor>, 657 until: Option<HourTruncatedCursor>, 658 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> { 659 let snapshot = self.rollups.snapshot(); 660 let buckets = if let (None, None) = (since, until) { 661 vec![CursorBucket::AllTime] 662 } else { 663 let mut lower = self.get_earliest_hour(Some(&snapshot))?; 664 if let Some(specified) = since { 665 if specified > lower { 666 lower = specified; 667 } 668 } 669 let upper = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into()); 670 CursorBucket::buckets_spanning(lower, upper) 671 }; 672 match order { 673 OrderCollectionsBy::Lexi { cursor } => { 674 self.get_lexi_collections(snapshot, limit, cursor, buckets) 675 } 676 _ => Ok(( 677 self.get_ordered_collections(snapshot, limit, order, buckets)?, 678 None, 679 )), 680 } 681 } 682 683 fn get_lexi_prefix( 684 &self, 685 snapshot: Snapshot, 686 prefix: NsidPrefix, 687 limit: usize, 688 cursor: Option<Vec<u8>>, 689 buckets: Vec<CursorBucket>, 690 ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)> { 691 // let prefix_sub_with_null = prefix.as_str().to_string().to_db_bytes()?; 692 let prefix_sub = String::sub_prefix(&prefix.terminated())?; // with trailing dot to ensure full segment match 693 let cursor_child = cursor 694 .as_deref() 695 .map(|encoded_bytes| { 696 let decoded: String = db_complete(encoded_bytes)?; 697 // TODO: write some tests for cursors, there's probably bugs here 698 let as_sub_prefix_with_null = decoded.to_db_bytes()?; 699 Ok::<_, EncodingError>(as_sub_prefix_with_null) 700 }) 701 .transpose()?; 702 let mut iters: Vec<NsidCounter> = Vec::with_capacity(buckets.len()); 703 for bucket in &buckets { 704 let it: NsidCounter = match bucket { 705 CursorBucket::Hour(t) => { 706 let start = cursor_child 707 .as_ref() 708 .map(|child| HourlyRollupKey::after_nsid_prefix(*t, child)) 709 .unwrap_or_else(|| HourlyRollupKey::after_nsid_prefix(*t, &prefix_sub))?; 710 let end = HourlyRollupKey::nsid_prefix_end(*t, &prefix_sub)?; 711 get_lexi_iter::<HourlyRollupKey>(&snapshot, start, end)? 712 } 713 CursorBucket::Week(t) => { 714 let start = cursor_child 715 .as_ref() 716 .map(|child| WeeklyRollupKey::after_nsid_prefix(*t, child)) 717 .unwrap_or_else(|| WeeklyRollupKey::after_nsid_prefix(*t, &prefix_sub))?; 718 let end = WeeklyRollupKey::nsid_prefix_end(*t, &prefix_sub)?; 719 get_lexi_iter::<WeeklyRollupKey>(&snapshot, start, end)? 720 } 721 CursorBucket::AllTime => { 722 let start = cursor_child 723 .as_ref() 724 .map(|child| AllTimeRollupKey::after_nsid_prefix(child)) 725 .unwrap_or_else(|| AllTimeRollupKey::after_nsid_prefix(&prefix_sub))?; 726 let end = AllTimeRollupKey::nsid_prefix_end(&prefix_sub)?; 727 get_lexi_iter::<AllTimeRollupKey>(&snapshot, start, end)? 728 } 729 }; 730 iters.push(it); 731 } 732 733 // with apologies 734 let mut iters: Vec<_> = iters 735 .into_iter() 736 .map(|it| { 737 it.map(|bla| { 738 bla.map(|(nsid, v)| { 739 let Some(child) = Child::from_prefix(&nsid, &prefix) else { 740 panic!("failed from_prefix: {nsid:?} {prefix:?} (bad iter bounds?)"); 741 }; 742 (child, v) 743 }) 744 }) 745 .peekable() 746 }) 747 .collect(); 748 749 let mut items = Vec::new(); 750 let mut prefix_count = CountsValue::default(); 751 #[derive(Debug, Clone, PartialEq)] 752 enum Child { 753 FullNsid(Nsid), 754 ChildPrefix(String), 755 } 756 impl Child { 757 fn from_prefix(nsid: &Nsid, prefix: &NsidPrefix) -> Option<Self> { 758 if prefix.is_group_of(nsid) { 759 return Some(Child::FullNsid(nsid.clone())); 760 } 761 let suffix = nsid.as_str().strip_prefix(&format!("{}.", prefix.0))?; 762 let (segment, _) = suffix.split_once('.').unwrap(); 763 let child_prefix = format!("{}.{segment}", prefix.0); 764 Some(Child::ChildPrefix(child_prefix)) 765 } 766 fn is_before(&self, other: &Child) -> bool { 767 match (self, other) { 768 (Child::FullNsid(s), Child::ChildPrefix(o)) if s.as_str() == o => true, 769 (Child::ChildPrefix(s), Child::FullNsid(o)) if s == o.as_str() => false, 770 (Child::FullNsid(s), Child::FullNsid(o)) => s.as_str() < o.as_str(), 771 (Child::ChildPrefix(s), Child::ChildPrefix(o)) => s < o, 772 (Child::FullNsid(s), Child::ChildPrefix(o)) => s.to_string() < *o, 773 (Child::ChildPrefix(s), Child::FullNsid(o)) => *s < o.to_string(), 774 } 775 } 776 fn into_inner(self) -> String { 777 match self { 778 Child::FullNsid(s) => s.to_string(), 779 Child::ChildPrefix(s) => s, 780 } 781 } 782 } 783 let mut current_child: Option<Child> = None; 784 for _ in 0..limit { 785 // double-scan the iters for each element: this could be eliminated but we're starting simple. 786 // first scan: find the lowest nsid 787 // second scan: take + merge, and advance all iters with lowest nsid 788 let mut lowest: Option<Child> = None; 789 for iter in &mut iters { 790 if let Some(bla) = iter.peek_mut() { 791 let (child, _) = match bla { 792 Ok(v) => v, 793 Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?, 794 }; 795 796 lowest = match lowest { 797 Some(ref current) if current.is_before(child) => lowest, 798 _ => Some(child.clone()), 799 }; 800 } 801 } 802 current_child = lowest.clone(); 803 let Some(child) = lowest else { break }; 804 805 let mut merged = CountsValue::default(); 806 for iter in &mut iters { 807 // unwrap: potential fjall error was already checked & bailed over when peeking in the first loop 808 while let Some(Ok((_, get_counts))) = 809 iter.next_if(|v| v.as_ref().unwrap().0 == child) 810 { 811 let counts = get_counts()?; 812 prefix_count.merge(&counts); 813 merged.merge(&counts); 814 } 815 } 816 items.push(match child { 817 Child::FullNsid(nsid) => PrefixChild::Collection(NsidCount::new(&nsid, &merged)), 818 Child::ChildPrefix(prefix) => { 819 PrefixChild::Prefix(PrefixCount::new(&prefix, &merged)) 820 } 821 }); 822 } 823 824 // TODO: could serialize the prefix count (with sketch) into the cursor so that uniqs can actually count up? 825 // ....er the sketch is probably too big 826 // TODO: this is probably buggy on child-type boundaries bleh 827 let next_cursor = current_child 828 .map(|s| s.into_inner().to_db_bytes()) 829 .transpose()?; 830 831 Ok(((&prefix_count).into(), items, next_cursor)) 832 } 833 834 fn get_prefix( 835 &self, 836 prefix: NsidPrefix, 837 limit: usize, 838 order: OrderCollectionsBy, 839 since: Option<HourTruncatedCursor>, 840 until: Option<HourTruncatedCursor>, 841 ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)> { 842 let snapshot = self.rollups.snapshot(); 843 let buckets = if let (None, None) = (since, until) { 844 vec![CursorBucket::AllTime] 845 } else { 846 let mut lower = self.get_earliest_hour(Some(&snapshot))?; 847 if let Some(specified) = since { 848 if specified > lower { 849 lower = specified; 850 } 851 } 852 let upper = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into()); 853 CursorBucket::buckets_spanning(lower, upper) 854 }; 855 match order { 856 OrderCollectionsBy::Lexi { cursor } => { 857 self.get_lexi_prefix(snapshot, prefix, limit, cursor, buckets) 858 } 859 _ => todo!(), 860 } 861 } 862 863 /// - step: output series time step, in seconds 864 fn get_timeseries( 865 &self, 866 collections: Vec<Nsid>, 867 since: HourTruncatedCursor, 868 until: Option<HourTruncatedCursor>, 869 step: u64, 870 ) -> StorageResult<(Vec<HourTruncatedCursor>, CollectionSerieses)> { 871 if step > WEEK_IN_MICROS { 872 panic!("week-stepping is todo"); 873 } 874 let until = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into()); 875 let Ok(dt) = Cursor::from(until).duration_since(&Cursor::from(since)) else { 876 return Ok(( 877 // empty: until < since 878 vec![], 879 collections.into_iter().map(|c| (c, vec![])).collect(), 880 )); 881 }; 882 let n_hours = (dt.as_micros() as u64) / HOUR_IN_MICROS; 883 let mut counts_by_hour = Vec::with_capacity(n_hours as usize); 884 let snapshot = self.rollups.snapshot(); 885 for hour in (0..n_hours).map(|i| since.nth_next(i)) { 886 let mut counts = Vec::with_capacity(collections.len()); 887 for nsid in &collections { 888 let count = snapshot 889 .get(&HourlyRollupKey::new(hour, nsid).to_db_bytes()?)? 890 .as_deref() 891 .map(db_complete::<CountsValue>) 892 .transpose()? 893 .unwrap_or_default(); 894 counts.push(count); 895 } 896 counts_by_hour.push((hour, counts)); 897 } 898 899 let step_hours = step / (HOUR_IN_MICROS / 1_000_000); 900 let mut output_hours = Vec::with_capacity(step_hours as usize); 901 let mut output_series: CollectionSerieses = collections 902 .iter() 903 .map(|c| (c.clone(), Vec::with_capacity(step_hours as usize))) 904 .collect(); 905 906 for chunk in counts_by_hour.chunks(step_hours as usize) { 907 output_hours.push(chunk[0].0); // always guaranteed to have at least one element in a chunks chunk 908 for (i, collection) in collections.iter().enumerate() { 909 let mut c = CountsValue::default(); 910 for (_, counts) in chunk { 911 c.merge(&counts[i]); 912 } 913 output_series 914 .get_mut(collection) 915 .expect("output series is initialized with all collections") 916 .push(c); 917 } 918 } 919 920 Ok((output_hours, output_series)) 921 } 922 923 fn get_collection_counts( 924 &self, 925 collection: &Nsid, 926 since: HourTruncatedCursor, 927 until: Option<HourTruncatedCursor>, 928 ) -> StorageResult<JustCount> { 929 // grab snapshots in case rollups happen while we're working 930 let rollups = self.rollups.snapshot(); 931 932 let until = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into()); 933 let buckets = CursorBucket::buckets_spanning(since, until); 934 let mut total_counts = CountsValue::default(); 935 936 for bucket in buckets { 937 let key = match bucket { 938 CursorBucket::Hour(t) => HourlyRollupKey::new(t, collection).to_db_bytes()?, 939 CursorBucket::Week(t) => WeeklyRollupKey::new(t, collection).to_db_bytes()?, 940 CursorBucket::AllTime => unreachable!(), // TODO: fall back on this if the time span spans the whole dataset? 941 }; 942 let count = rollups 943 .get(&key)? 944 .as_deref() 945 .map(db_complete::<CountsValue>) 946 .transpose()? 947 .unwrap_or_default(); 948 total_counts.merge(&count); 949 } 950 951 Ok((&total_counts).into()) 952 } 953 954 fn get_records_by_collections( 955 &self, 956 collections: HashSet<Nsid>, 957 limit: usize, 958 expand_each_collection: bool, 959 ) -> StorageResult<Vec<UFOsRecord>> { 960 if collections.is_empty() { 961 return Ok(vec![]); 962 } 963 let mut record_iterators = Vec::new(); 964 for collection in collections { 965 let iter = RecordIterator::new(&self.feeds, self.records.clone(), &collection, limit)?; 966 record_iterators.push(iter.peekable()); 967 } 968 let mut merged = Vec::new(); 969 loop { 970 let mut latest: Option<(Cursor, usize)> = None; // ugh 971 for (i, iter) in record_iterators.iter_mut().enumerate() { 972 let Some(it) = iter.peek_mut() else { 973 continue; 974 }; 975 let it = match it { 976 Ok(v) => v, 977 Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?, 978 }; 979 let Some(rec) = it else { 980 if expand_each_collection { 981 continue; 982 } else { 983 break; 984 } 985 }; 986 if let Some((cursor, _)) = latest { 987 if rec.cursor > cursor { 988 latest = Some((rec.cursor, i)) 989 } 990 } else { 991 latest = Some((rec.cursor, i)); 992 } 993 } 994 let Some((_, idx)) = latest else { 995 break; 996 }; 997 // yeah yeah whateverrrrrrrrrrrrrrrr 998 merged.push(record_iterators[idx].next().unwrap().unwrap().unwrap()); 999 } 1000 Ok(merged) 1001 } 1002 1003 fn search_collections(&self, terms: Vec<String>) -> StorageResult<Vec<NsidCount>> { 1004 let start = AllTimeRollupKey::start()?; 1005 let end = AllTimeRollupKey::end()?; 1006 let mut matches = Vec::new(); 1007 let limit = 16; // TODO: param 1008 for kv in self.rollups.range((start, end)) { 1009 let (key_bytes, val_bytes) = kv?; 1010 let key = db_complete::<AllTimeRollupKey>(&key_bytes)?; 1011 let nsid = key.collection(); 1012 for term in &terms { 1013 if nsid.contains(term) { 1014 let counts = db_complete::<CountsValue>(&val_bytes)?; 1015 matches.push(NsidCount::new(nsid, &counts)); 1016 break; 1017 } 1018 } 1019 if matches.len() >= limit { 1020 break; 1021 } 1022 } 1023 // TODO: indicate incomplete results 1024 Ok(matches) 1025 } 1026} 1027 1028#[async_trait] 1029impl StoreReader for FjallReader { 1030 fn name(&self) -> String { 1031 "fjall storage v2".into() 1032 } 1033 fn update_metrics(&self) { 1034 gauge!("storage_fjall_l0_run_count", "partition" => "global") 1035 .set(self.global.tree.l0_run_count() as f64); 1036 gauge!("storage_fjall_l0_run_count", "partition" => "feeds") 1037 .set(self.feeds.tree.l0_run_count() as f64); 1038 gauge!("storage_fjall_l0_run_count", "partition" => "records") 1039 .set(self.records.tree.l0_run_count() as f64); 1040 gauge!("storage_fjall_l0_run_count", "partition" => "rollups") 1041 .set(self.rollups.tree.l0_run_count() as f64); 1042 gauge!("storage_fjall_l0_run_count", "partition" => "queues") 1043 .set(self.queues.tree.l0_run_count() as f64); 1044 gauge!("storage_fjall_keyspace_disk_space").set(self.keyspace.disk_space() as f64); 1045 gauge!("storage_fjall_journal_count").set(self.keyspace.journal_count() as f64); 1046 gauge!("storage_fjall_keyspace_sequence").set(self.keyspace.instant() as f64); 1047 } 1048 async fn get_storage_stats(&self) -> StorageResult<serde_json::Value> { 1049 let s = self.clone(); 1050 tokio::task::spawn_blocking(move || FjallReader::get_storage_stats(&s)).await? 1051 } 1052 async fn get_consumer_info(&self) -> StorageResult<ConsumerInfo> { 1053 let s = self.clone(); 1054 tokio::task::spawn_blocking(move || FjallReader::get_consumer_info(&s)).await? 1055 } 1056 async fn get_collections( 1057 &self, 1058 limit: usize, 1059 order: OrderCollectionsBy, 1060 since: Option<HourTruncatedCursor>, 1061 until: Option<HourTruncatedCursor>, 1062 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> { 1063 let s = self.clone(); 1064 tokio::task::spawn_blocking(move || { 1065 FjallReader::get_collections(&s, limit, order, since, until) 1066 }) 1067 .await? 1068 } 1069 async fn get_prefix( 1070 &self, 1071 prefix: NsidPrefix, 1072 limit: usize, 1073 order: OrderCollectionsBy, 1074 since: Option<HourTruncatedCursor>, 1075 until: Option<HourTruncatedCursor>, 1076 ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)> { 1077 let s = self.clone(); 1078 tokio::task::spawn_blocking(move || { 1079 FjallReader::get_prefix(&s, prefix, limit, order, since, until) 1080 }) 1081 .await? 1082 } 1083 async fn get_timeseries( 1084 &self, 1085 collections: Vec<Nsid>, 1086 since: HourTruncatedCursor, 1087 until: Option<HourTruncatedCursor>, 1088 step: u64, 1089 ) -> StorageResult<(Vec<HourTruncatedCursor>, CollectionSerieses)> { 1090 let s = self.clone(); 1091 tokio::task::spawn_blocking(move || { 1092 FjallReader::get_timeseries(&s, collections, since, until, step) 1093 }) 1094 .await? 1095 } 1096 async fn get_collection_counts( 1097 &self, 1098 collection: &Nsid, 1099 since: HourTruncatedCursor, 1100 until: Option<HourTruncatedCursor>, 1101 ) -> StorageResult<JustCount> { 1102 let s = self.clone(); 1103 let collection = collection.clone(); 1104 tokio::task::spawn_blocking(move || { 1105 FjallReader::get_collection_counts(&s, &collection, since, until) 1106 }) 1107 .await? 1108 } 1109 async fn get_records_by_collections( 1110 &self, 1111 collections: HashSet<Nsid>, 1112 limit: usize, 1113 expand_each_collection: bool, 1114 ) -> StorageResult<Vec<UFOsRecord>> { 1115 let s = self.clone(); 1116 tokio::task::spawn_blocking(move || { 1117 FjallReader::get_records_by_collections(&s, collections, limit, expand_each_collection) 1118 }) 1119 .await? 1120 } 1121 async fn search_collections(&self, terms: Vec<String>) -> StorageResult<Vec<NsidCount>> { 1122 let s = self.clone(); 1123 tokio::task::spawn_blocking(move || FjallReader::search_collections(&s, terms)).await? 1124 } 1125} 1126 1127#[derive(Clone)] 1128pub struct FjallWriter { 1129 bg_taken: Arc<AtomicBool>, 1130 keyspace: Keyspace, 1131 global: PartitionHandle, 1132 feeds: PartitionHandle, 1133 records: PartitionHandle, 1134 rollups: PartitionHandle, 1135 queues: PartitionHandle, 1136} 1137 1138impl FjallWriter { 1139 fn describe_metrics(&self) { 1140 describe_histogram!( 1141 "storage_insert_batch_db_batch_items", 1142 Unit::Count, 1143 "how many items are in the fjall batch for batched inserts" 1144 ); 1145 describe_histogram!( 1146 "storage_rollup_counts_db_batch_items", 1147 Unit::Count, 1148 "how many items are in the fjall batch for a timlies rollup" 1149 ); 1150 describe_counter!( 1151 "storage_delete_account_partial_commits", 1152 Unit::Count, 1153 "fjall checkpoint commits for cleaning up accounts with too many records" 1154 ); 1155 describe_counter!( 1156 "storage_delete_account_completions", 1157 Unit::Count, 1158 "total count of account deletes handled" 1159 ); 1160 describe_counter!( 1161 "storage_delete_account_records_deleted", 1162 Unit::Count, 1163 "total records deleted when handling account deletes" 1164 ); 1165 describe_histogram!( 1166 "storage_trim_dirty_nsids", 1167 Unit::Count, 1168 "number of NSIDs trimmed" 1169 ); 1170 describe_histogram!( 1171 "storage_trim_duration", 1172 Unit::Microseconds, 1173 "how long it took to trim the dirty NSIDs" 1174 ); 1175 describe_counter!( 1176 "storage_trim_removed", 1177 Unit::Count, 1178 "how many records were removed during trim" 1179 ); 1180 } 1181 fn rollup_delete_account( 1182 &mut self, 1183 cursor: Cursor, 1184 key_bytes: &[u8], 1185 val_bytes: &[u8], 1186 ) -> StorageResult<usize> { 1187 let did = db_complete::<DeleteAccountQueueVal>(val_bytes)?; 1188 self.delete_account(&did)?; 1189 let mut batch = self.keyspace.batch(); 1190 batch.remove(&self.queues, key_bytes); 1191 insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, cursor)?; 1192 batch.commit()?; 1193 Ok(1) 1194 } 1195 1196 fn rollup_live_counts( 1197 &mut self, 1198 timelies: impl Iterator<Item = Result<(fjall::Slice, fjall::Slice), fjall::Error>>, 1199 cursor_exclusive_limit: Option<Cursor>, 1200 rollup_limit: usize, 1201 ) -> StorageResult<(usize, HashSet<Nsid>)> { 1202 // current strategy is to buffer counts in mem before writing the rollups 1203 // we *could* read+write every single batch to rollup.. but their merge is associative so 1204 // ...so save the db some work up front? is this worth it? who knows... 1205 1206 let mut dirty_nsids = HashSet::new(); 1207 1208 #[derive(Eq, Hash, PartialEq)] 1209 enum Rollup { 1210 Hourly(HourTruncatedCursor), 1211 Weekly(WeekTruncatedCursor), 1212 AllTime, 1213 } 1214 1215 let mut batch = self.keyspace.batch(); 1216 let mut cursors_advanced = 0; 1217 let mut last_cursor = Cursor::from_start(); 1218 let mut counts_by_rollup: HashMap<(Nsid, Rollup), CountsValue> = HashMap::new(); 1219 1220 for (i, kv) in timelies.enumerate() { 1221 if i >= rollup_limit { 1222 break; 1223 } 1224 1225 let (key_bytes, val_bytes) = kv?; 1226 let key = db_complete::<LiveCountsKey>(&key_bytes)?; 1227 1228 if cursor_exclusive_limit 1229 .map(|limit| key.cursor() > limit) 1230 .unwrap_or(false) 1231 { 1232 break; 1233 } 1234 1235 dirty_nsids.insert(key.collection().clone()); 1236 1237 batch.remove(&self.rollups, key_bytes); 1238 let val = db_complete::<CountsValue>(&val_bytes)?; 1239 counts_by_rollup 1240 .entry(( 1241 key.collection().clone(), 1242 Rollup::Hourly(key.cursor().into()), 1243 )) 1244 .or_default() 1245 .merge(&val); 1246 counts_by_rollup 1247 .entry(( 1248 key.collection().clone(), 1249 Rollup::Weekly(key.cursor().into()), 1250 )) 1251 .or_default() 1252 .merge(&val); 1253 counts_by_rollup 1254 .entry((key.collection().clone(), Rollup::AllTime)) 1255 .or_default() 1256 .merge(&val); 1257 1258 cursors_advanced += 1; 1259 last_cursor = key.cursor(); 1260 } 1261 1262 // go through each new rollup thing and merge it with whatever might already be in the db 1263 for ((nsid, rollup), counts) in counts_by_rollup { 1264 let rollup_key_bytes = match rollup { 1265 Rollup::Hourly(hourly_cursor) => { 1266 HourlyRollupKey::new(hourly_cursor, &nsid).to_db_bytes()? 1267 } 1268 Rollup::Weekly(weekly_cursor) => { 1269 WeeklyRollupKey::new(weekly_cursor, &nsid).to_db_bytes()? 1270 } 1271 Rollup::AllTime => AllTimeRollupKey::new(&nsid).to_db_bytes()?, 1272 }; 1273 let mut rolled: CountsValue = self 1274 .rollups 1275 .get(&rollup_key_bytes)? 1276 .as_deref() 1277 .map(db_complete::<CountsValue>) 1278 .transpose()? 1279 .unwrap_or_default(); 1280 1281 // now that we have values, we can know the exising ranks 1282 let before_creates_count = rolled.counts().creates; 1283 let before_dids_estimate = rolled.dids().estimate() as u64; 1284 1285 // update the rollup 1286 rolled.merge(&counts); 1287 1288 // new ranks 1289 let new_creates_count = rolled.counts().creates; 1290 let new_dids_estimate = rolled.dids().estimate() as u64; 1291 1292 // update create-ranked secondary index if rank changed 1293 if new_creates_count != before_creates_count { 1294 let (old_k, new_k) = match rollup { 1295 Rollup::Hourly(cursor) => ( 1296 HourlyRecordsKey::new(cursor, before_creates_count.into(), &nsid) 1297 .to_db_bytes()?, 1298 HourlyRecordsKey::new(cursor, new_creates_count.into(), &nsid) 1299 .to_db_bytes()?, 1300 ), 1301 Rollup::Weekly(cursor) => ( 1302 WeeklyRecordsKey::new(cursor, before_creates_count.into(), &nsid) 1303 .to_db_bytes()?, 1304 WeeklyRecordsKey::new(cursor, new_creates_count.into(), &nsid) 1305 .to_db_bytes()?, 1306 ), 1307 Rollup::AllTime => ( 1308 AllTimeRecordsKey::new(before_creates_count.into(), &nsid).to_db_bytes()?, 1309 AllTimeRecordsKey::new(new_creates_count.into(), &nsid).to_db_bytes()?, 1310 ), 1311 }; 1312 // remove_weak is allowed here because the secondary ranking index only ever inserts once at a key 1313 batch.remove_weak(&self.rollups, &old_k); 1314 batch.insert(&self.rollups, &new_k, ""); 1315 } 1316 1317 // update dids-ranked secondary index if rank changed 1318 if new_dids_estimate != before_dids_estimate { 1319 let (old_k, new_k) = match rollup { 1320 Rollup::Hourly(cursor) => ( 1321 HourlyDidsKey::new(cursor, before_dids_estimate.into(), &nsid) 1322 .to_db_bytes()?, 1323 HourlyDidsKey::new(cursor, new_dids_estimate.into(), &nsid) 1324 .to_db_bytes()?, 1325 ), 1326 Rollup::Weekly(cursor) => ( 1327 WeeklyDidsKey::new(cursor, before_dids_estimate.into(), &nsid) 1328 .to_db_bytes()?, 1329 WeeklyDidsKey::new(cursor, new_dids_estimate.into(), &nsid) 1330 .to_db_bytes()?, 1331 ), 1332 Rollup::AllTime => ( 1333 AllTimeDidsKey::new(before_dids_estimate.into(), &nsid).to_db_bytes()?, 1334 AllTimeDidsKey::new(new_dids_estimate.into(), &nsid).to_db_bytes()?, 1335 ), 1336 }; 1337 // remove_weak is allowed here because the secondary ranking index only ever inserts once at a key 1338 batch.remove_weak(&self.rollups, &old_k); 1339 batch.insert(&self.rollups, &new_k, ""); 1340 } 1341 1342 // replace the main counts rollup 1343 batch.insert(&self.rollups, &rollup_key_bytes, &rolled.to_db_bytes()?); 1344 } 1345 1346 insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, last_cursor)?; 1347 1348 histogram!("storage_rollup_counts_db_batch_items").record(batch.len() as f64); 1349 batch.commit()?; 1350 Ok((cursors_advanced, dirty_nsids)) 1351 } 1352} 1353 1354impl StoreWriter<FjallBackground> for FjallWriter { 1355 fn background_tasks(&mut self, reroll: bool) -> StorageResult<FjallBackground> { 1356 if self.bg_taken.swap(true, Ordering::SeqCst) { 1357 return Err(StorageError::BackgroundAlreadyStarted); 1358 } 1359 if reroll { 1360 log::info!("reroll: resetting rollup cursor..."); 1361 insert_static_neu::<NewRollupCursorKey>(&self.global, Cursor::from_start())?; 1362 log::info!("reroll: clearing trim cursors..."); 1363 let mut batch = self.keyspace.batch(); 1364 for kv in self 1365 .global 1366 .prefix(TrimCollectionCursorKey::from_prefix_to_db_bytes( 1367 &Default::default(), 1368 )?) 1369 { 1370 let (k, _) = kv?; 1371 batch.remove(&self.global, k); 1372 } 1373 let n = batch.len(); 1374 batch.commit()?; 1375 log::info!("reroll: cleared {n} trim cursors."); 1376 } 1377 Ok(FjallBackground(self.clone())) 1378 } 1379 1380 fn insert_batch<const LIMIT: usize>( 1381 &mut self, 1382 event_batch: EventBatch<LIMIT>, 1383 ) -> StorageResult<()> { 1384 if event_batch.is_empty() { 1385 return Ok(()); 1386 } 1387 1388 let mut batch = self.keyspace.batch(); 1389 1390 // would be nice not to have to iterate everything at once here 1391 let latest = event_batch.latest_cursor().unwrap(); 1392 1393 for (nsid, commits) in event_batch.commits_by_nsid { 1394 for commit in commits.commits { 1395 let location_key: RecordLocationKey = (&commit, &nsid).into(); 1396 1397 match commit.action { 1398 CommitAction::Cut => { 1399 batch.remove(&self.records, &location_key.to_db_bytes()?); 1400 } 1401 CommitAction::Put(put_action) => { 1402 let feed_key = NsidRecordFeedKey::from_pair(nsid.clone(), commit.cursor); 1403 let feed_val: NsidRecordFeedVal = 1404 (&commit.did, &commit.rkey, commit.rev.as_str()).into(); 1405 batch.insert( 1406 &self.feeds, 1407 feed_key.to_db_bytes()?, 1408 feed_val.to_db_bytes()?, 1409 ); 1410 1411 let location_val: RecordLocationVal = 1412 (commit.cursor, commit.rev.as_str(), put_action).into(); 1413 batch.insert( 1414 &self.records, 1415 &location_key.to_db_bytes()?, 1416 &location_val.to_db_bytes()?, 1417 ); 1418 } 1419 } 1420 } 1421 let live_counts_key: LiveCountsKey = (latest, &nsid).into(); 1422 let counts_value = CountsValue::new( 1423 CommitCounts { 1424 creates: commits.creates as u64, 1425 updates: commits.updates as u64, 1426 deletes: commits.deletes as u64, 1427 }, 1428 commits.dids_estimate, 1429 ); 1430 batch.insert( 1431 &self.rollups, 1432 &live_counts_key.to_db_bytes()?, 1433 &counts_value.to_db_bytes()?, 1434 ); 1435 } 1436 1437 for remove in event_batch.account_removes { 1438 let queue_key = DeleteAccountQueueKey::new(remove.cursor); 1439 let queue_val: DeleteAccountQueueVal = remove.did; 1440 batch.insert( 1441 &self.queues, 1442 &queue_key.to_db_bytes()?, 1443 &queue_val.to_db_bytes()?, 1444 ); 1445 } 1446 1447 batch.insert( 1448 &self.global, 1449 DbStaticStr::<JetstreamCursorKey>::default().to_db_bytes()?, 1450 latest.to_db_bytes()?, 1451 ); 1452 1453 histogram!("storage_insert_batch_db_batch_items").record(batch.len() as f64); 1454 batch.commit()?; 1455 Ok(()) 1456 } 1457 1458 fn step_rollup(&mut self) -> StorageResult<(usize, HashSet<Nsid>)> { 1459 let mut dirty_nsids = HashSet::new(); 1460 1461 let rollup_cursor = 1462 get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?.ok_or( 1463 StorageError::BadStateError("Could not find current rollup cursor".to_string()), 1464 )?; 1465 1466 // timelies 1467 let live_counts_range = LiveCountsKey::range_from_cursor(rollup_cursor)?; 1468 let mut timely_iter = self.rollups.range(live_counts_range).peekable(); 1469 1470 let timely_next = timely_iter 1471 .peek_mut() 1472 .map(|kv| -> StorageResult<LiveCountsKey> { 1473 match kv { 1474 Err(e) => Err(std::mem::replace(e, fjall::Error::Poisoned))?, 1475 Ok((key_bytes, _)) => { 1476 let key = db_complete::<LiveCountsKey>(key_bytes)?; 1477 Ok(key) 1478 } 1479 } 1480 }) 1481 .transpose()?; 1482 1483 // delete accounts 1484 let delete_accounts_range = 1485 DeleteAccountQueueKey::new(rollup_cursor).range_to_prefix_end()?; 1486 1487 let next_delete = self 1488 .queues 1489 .range(delete_accounts_range) 1490 .next() 1491 .transpose()? 1492 .map(|(key_bytes, val_bytes)| { 1493 db_complete::<DeleteAccountQueueKey>(&key_bytes) 1494 .map(|k| (k.suffix, key_bytes, val_bytes)) 1495 }) 1496 .transpose()?; 1497 1498 let cursors_stepped = match (timely_next, next_delete) { 1499 (Some(timely), Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => { 1500 if timely.cursor() < delete_cursor { 1501 let (n, dirty) = self.rollup_live_counts( 1502 timely_iter, 1503 Some(delete_cursor), 1504 MAX_BATCHED_ROLLUP_COUNTS, 1505 )?; 1506 dirty_nsids.extend(dirty); 1507 n 1508 } else { 1509 self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)? 1510 } 1511 } 1512 (Some(_), None) => { 1513 let (n, dirty) = 1514 self.rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS)?; 1515 dirty_nsids.extend(dirty); 1516 n 1517 } 1518 (None, Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => { 1519 self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)? 1520 } 1521 (None, None) => 0, 1522 }; 1523 1524 Ok((cursors_stepped, dirty_nsids)) 1525 } 1526 1527 fn trim_collection( 1528 &mut self, 1529 collection: &Nsid, 1530 limit: usize, 1531 full_scan: bool, 1532 ) -> StorageResult<(usize, usize, bool)> { 1533 let mut dangling_feed_keys_cleaned = 0; 1534 let mut records_deleted = 0; 1535 1536 let live_range = if full_scan { 1537 let start = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?; 1538 let end = NsidRecordFeedKey::prefix_range_end(collection)?; 1539 start..end 1540 } else { 1541 let feed_trim_cursor_key = 1542 TrimCollectionCursorKey::new(collection.clone()).to_db_bytes()?; 1543 let trim_cursor = self 1544 .global 1545 .get(&feed_trim_cursor_key)? 1546 .map(|value_bytes| db_complete(&value_bytes)) 1547 .transpose()? 1548 .unwrap_or(Cursor::from_start()); 1549 NsidRecordFeedKey::from_pair(collection.clone(), trim_cursor).range_to_prefix_end()? 1550 }; 1551 1552 let mut live_records_found = 0; 1553 let mut candidate_new_feed_lower_cursor = None; 1554 let ended_early = false; 1555 let mut current_cursor: Option<Cursor> = None; 1556 for (i, kv) in self.feeds.range(live_range).rev().enumerate() { 1557 if i > 0 && i % 500_000 == 0 { 1558 log::info!( 1559 "trim: at {i} for {:?} (now at {})", 1560 collection.to_string(), 1561 current_cursor 1562 .map(|c| c 1563 .elapsed() 1564 .map(nice_duration) 1565 .unwrap_or("[not past]".into())) 1566 .unwrap_or("??".into()), 1567 ); 1568 } 1569 let (key_bytes, val_bytes) = kv?; 1570 let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?; 1571 let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?; 1572 let location_key: RecordLocationKey = (&feed_key, &feed_val).into(); 1573 let location_key_bytes = location_key.to_db_bytes()?; 1574 1575 let Some(location_val_bytes) = self.records.get(&location_key_bytes)? else { 1576 // record was deleted (hopefully) 1577 self.feeds.remove(&*key_bytes)?; 1578 dangling_feed_keys_cleaned += 1; 1579 continue; 1580 }; 1581 1582 let (meta, _) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?; 1583 current_cursor = Some(meta.cursor()); 1584 1585 if meta.cursor() != feed_key.cursor() { 1586 // older/different version 1587 self.feeds.remove(&*key_bytes)?; 1588 dangling_feed_keys_cleaned += 1; 1589 continue; 1590 } 1591 if meta.rev != feed_val.rev() { 1592 // weird... 1593 log::warn!("record lookup: cursor match but rev did not...? removing."); 1594 self.records.remove(&location_key_bytes)?; 1595 self.feeds.remove(&*key_bytes)?; 1596 dangling_feed_keys_cleaned += 1; 1597 continue; 1598 } 1599 1600 live_records_found += 1; 1601 if live_records_found <= limit { 1602 continue; 1603 } 1604 if candidate_new_feed_lower_cursor.is_none() { 1605 candidate_new_feed_lower_cursor = Some(feed_key.cursor()); 1606 } 1607 1608 self.records.remove(&location_key_bytes)?; 1609 self.feeds.remove(key_bytes)?; 1610 records_deleted += 1; 1611 } 1612 1613 if !ended_early { 1614 if let Some(new_cursor) = candidate_new_feed_lower_cursor { 1615 self.global.insert( 1616 &TrimCollectionCursorKey::new(collection.clone()).to_db_bytes()?, 1617 &new_cursor.to_db_bytes()?, 1618 )?; 1619 } 1620 } 1621 1622 log::trace!("trim_collection ({collection:?}) removed {dangling_feed_keys_cleaned} dangling feed entries and {records_deleted} records (ended early? {ended_early})"); 1623 Ok((dangling_feed_keys_cleaned, records_deleted, ended_early)) 1624 } 1625 1626 fn delete_account(&mut self, did: &Did) -> Result<usize, StorageError> { 1627 let mut records_deleted = 0; 1628 let mut batch = self.keyspace.batch(); 1629 let prefix = RecordLocationKey::from_prefix_to_db_bytes(did)?; 1630 for kv in self.records.prefix(prefix) { 1631 let (key_bytes, _) = kv?; 1632 batch.remove(&self.records, key_bytes); 1633 records_deleted += 1; 1634 if batch.len() >= MAX_BATCHED_ACCOUNT_DELETE_RECORDS { 1635 counter!("storage_delete_account_partial_commits").increment(1); 1636 batch.commit()?; 1637 batch = self.keyspace.batch(); 1638 } 1639 } 1640 counter!("storage_delete_account_completions").increment(1); 1641 counter!("storage_delete_account_records_deleted").increment(records_deleted as u64); 1642 batch.commit()?; 1643 Ok(records_deleted) 1644 } 1645} 1646 1647pub struct FjallBackground(FjallWriter); 1648 1649#[async_trait] 1650impl StoreBackground for FjallBackground { 1651 async fn run(mut self, backfill: bool) -> StorageResult<()> { 1652 let mut dirty_nsids = HashSet::new(); 1653 1654 // backfill condition here is iffy -- longer is good when doing the main ingest and then collection trims 1655 // shorter once those are done helps things catch up 1656 // the best setting for non-backfill is non-obvious.. it can be pretty slow and still be fine 1657 let mut rollup = 1658 tokio::time::interval(Duration::from_micros(if backfill { 100 } else { 32_000 })); 1659 rollup.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 1660 1661 // backfill condition again iffy. collection trims should probably happen in their own phase. 1662 let mut trim = tokio::time::interval(Duration::from_secs(if backfill { 18 } else { 9 })); 1663 trim.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 1664 1665 loop { 1666 tokio::select! { 1667 _ = rollup.tick() => { 1668 let mut db = self.0.clone(); 1669 let (n, dirty) = tokio::task::spawn_blocking(move || db.step_rollup()).await??; 1670 if n == 0 { 1671 rollup.reset_after(Duration::from_millis(1_200)); // we're caught up, take a break 1672 } 1673 dirty_nsids.extend(dirty); 1674 log::trace!("rolled up {n} items ({} collections now dirty)", dirty_nsids.len()); 1675 }, 1676 _ = trim.tick() => { 1677 let n = dirty_nsids.len(); 1678 log::trace!("trimming {n} nsids: {dirty_nsids:?}"); 1679 let t0 = Instant::now(); 1680 let (mut total_danglers, mut total_deleted) = (0, 0); 1681 let mut completed = HashSet::new(); 1682 for collection in &dirty_nsids { 1683 let mut db = self.0.clone(); 1684 let c = collection.clone(); 1685 let (danglers, deleted, ended_early) = tokio::task::spawn_blocking(move || db.trim_collection(&c, 512, false)).await??; 1686 total_danglers += danglers; 1687 total_deleted += deleted; 1688 if !ended_early { 1689 completed.insert(collection.clone()); 1690 } 1691 if total_deleted > 10_000_000 { 1692 log::info!("trim stopped early, more than 10M records already deleted."); 1693 break; 1694 } 1695 } 1696 let dt = t0.elapsed(); 1697 log::trace!("finished trimming {n} nsids in {dt:?}: {total_danglers} dangling and {total_deleted} total removed."); 1698 histogram!("storage_trim_dirty_nsids").record(completed.len() as f64); 1699 histogram!("storage_trim_duration").record(dt.as_micros() as f64); 1700 counter!("storage_trim_removed", "dangling" => "true").increment(total_danglers as u64); 1701 if total_deleted >= total_danglers { 1702 counter!("storage_trim_removed", "dangling" => "false").increment((total_deleted - total_danglers) as u64); 1703 } else { 1704 // TODO: probably think through what's happening here 1705 log::warn!("weird trim case: more danglers than deleted? metric will be missing for dangling=false. deleted={total_deleted} danglers={total_danglers}"); 1706 } 1707 for c in completed { 1708 dirty_nsids.remove(&c); 1709 } 1710 }, 1711 }; 1712 } 1713 } 1714} 1715 1716/// Get a value from a fixed key 1717fn get_static_neu<K: StaticStr, V: DbBytes>(global: &PartitionHandle) -> StorageResult<Option<V>> { 1718 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1719 let value = global 1720 .get(&key_bytes)? 1721 .map(|value_bytes| db_complete(&value_bytes)) 1722 .transpose()?; 1723 Ok(value) 1724} 1725 1726/// Get a value from a fixed key 1727fn get_snapshot_static_neu<K: StaticStr, V: DbBytes>( 1728 global: &fjall::Snapshot, 1729) -> StorageResult<Option<V>> { 1730 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1731 let value = global 1732 .get(&key_bytes)? 1733 .map(|value_bytes| db_complete(&value_bytes)) 1734 .transpose()?; 1735 Ok(value) 1736} 1737 1738/// Set a value to a fixed key 1739fn insert_static_neu<K: StaticStr>( 1740 global: &PartitionHandle, 1741 value: impl DbBytes, 1742) -> StorageResult<()> { 1743 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1744 let value_bytes = value.to_db_bytes()?; 1745 global.insert(&key_bytes, &value_bytes)?; 1746 Ok(()) 1747} 1748 1749/// Set a value to a fixed key, erroring if the value already exists 1750/// 1751/// Intended for single-threaded init: not safe under concurrency, since there 1752/// is no transaction between checking if the already exists and writing it. 1753fn init_static_neu<K: StaticStr>( 1754 global: &PartitionHandle, 1755 value: impl DbBytes, 1756) -> StorageResult<()> { 1757 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1758 if global.get(&key_bytes)?.is_some() { 1759 return Err(StorageError::InitError(format!( 1760 "init failed: value for key {key_bytes:?} already exists" 1761 ))); 1762 } 1763 let value_bytes = value.to_db_bytes()?; 1764 global.insert(&key_bytes, &value_bytes)?; 1765 Ok(()) 1766} 1767 1768/// Set a value to a fixed key 1769fn insert_batch_static_neu<K: StaticStr>( 1770 batch: &mut FjallBatch, 1771 global: &PartitionHandle, 1772 value: impl DbBytes, 1773) -> StorageResult<()> { 1774 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1775 let value_bytes = value.to_db_bytes()?; 1776 batch.insert(global, &key_bytes, &value_bytes); 1777 Ok(()) 1778} 1779 1780#[derive(Debug, serde::Serialize, schemars::JsonSchema)] 1781pub struct StorageInfo { 1782 pub keyspace_disk_space: u64, 1783 pub keyspace_journal_count: usize, 1784 pub keyspace_sequence: u64, 1785 pub global_approximate_len: usize, 1786} 1787 1788////////// temp stuff to remove: 1789 1790#[cfg(test)] 1791mod tests { 1792 use super::*; 1793 use crate::{DeleteAccount, RecordKey, UFOsCommit}; 1794 use jetstream::events::{CommitEvent, CommitOp}; 1795 use jetstream::exports::Cid; 1796 use serde_json::value::RawValue; 1797 1798 fn fjall_db() -> (FjallReader, FjallWriter) { 1799 let (read, write, _, _) = FjallStorage::init( 1800 tempfile::tempdir().unwrap(), 1801 "offline test (no real jetstream endpoint)".to_string(), 1802 false, 1803 FjallConfig { temp: true }, 1804 ) 1805 .unwrap(); 1806 (read, write) 1807 } 1808 1809 const TEST_BATCH_LIMIT: usize = 16; 1810 fn beginning() -> HourTruncatedCursor { 1811 Cursor::from_start().into() 1812 } 1813 1814 #[derive(Debug, Default)] 1815 struct TestBatch { 1816 pub batch: EventBatch<TEST_BATCH_LIMIT>, 1817 } 1818 1819 impl TestBatch { 1820 #[allow(clippy::too_many_arguments)] 1821 pub fn create( 1822 &mut self, 1823 did: &str, 1824 collection: &str, 1825 rkey: &str, 1826 record: &str, 1827 rev: Option<&str>, 1828 cid: Option<Cid>, 1829 cursor: u64, 1830 ) -> Nsid { 1831 let did = Did::new(did.to_string()).unwrap(); 1832 let collection = Nsid::new(collection.to_string()).unwrap(); 1833 let record = RawValue::from_string(record.to_string()).unwrap(); 1834 let cid = cid.unwrap_or( 1835 "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy" 1836 .parse() 1837 .unwrap(), 1838 ); 1839 1840 let event = CommitEvent { 1841 collection, 1842 rkey: RecordKey::new(rkey.to_string()).unwrap(), 1843 rev: rev.unwrap_or("asdf").to_string(), 1844 operation: CommitOp::Create, 1845 record: Some(record), 1846 cid: Some(cid), 1847 }; 1848 1849 let (commit, collection) = 1850 UFOsCommit::from_commit_info(event, did.clone(), Cursor::from_raw_u64(cursor)) 1851 .unwrap(); 1852 1853 self.batch 1854 .commits_by_nsid 1855 .entry(collection.clone()) 1856 .or_default() 1857 .truncating_insert(commit, &[0u8; 16]) 1858 .unwrap(); 1859 1860 collection 1861 } 1862 #[allow(clippy::too_many_arguments)] 1863 pub fn update( 1864 &mut self, 1865 did: &str, 1866 collection: &str, 1867 rkey: &str, 1868 record: &str, 1869 rev: Option<&str>, 1870 cid: Option<Cid>, 1871 cursor: u64, 1872 ) -> Nsid { 1873 let did = Did::new(did.to_string()).unwrap(); 1874 let collection = Nsid::new(collection.to_string()).unwrap(); 1875 let record = RawValue::from_string(record.to_string()).unwrap(); 1876 let cid = cid.unwrap_or( 1877 "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy" 1878 .parse() 1879 .unwrap(), 1880 ); 1881 1882 let event = CommitEvent { 1883 collection, 1884 rkey: RecordKey::new(rkey.to_string()).unwrap(), 1885 rev: rev.unwrap_or("asdf").to_string(), 1886 operation: CommitOp::Update, 1887 record: Some(record), 1888 cid: Some(cid), 1889 }; 1890 1891 let (commit, collection) = 1892 UFOsCommit::from_commit_info(event, did.clone(), Cursor::from_raw_u64(cursor)) 1893 .unwrap(); 1894 1895 self.batch 1896 .commits_by_nsid 1897 .entry(collection.clone()) 1898 .or_default() 1899 .truncating_insert(commit, &[0u8; 16]) 1900 .unwrap(); 1901 1902 collection 1903 } 1904 #[allow(clippy::too_many_arguments)] 1905 pub fn delete( 1906 &mut self, 1907 did: &str, 1908 collection: &str, 1909 rkey: &str, 1910 rev: Option<&str>, 1911 cursor: u64, 1912 ) -> Nsid { 1913 let did = Did::new(did.to_string()).unwrap(); 1914 let collection = Nsid::new(collection.to_string()).unwrap(); 1915 let event = CommitEvent { 1916 collection, 1917 rkey: RecordKey::new(rkey.to_string()).unwrap(), 1918 rev: rev.unwrap_or("asdf").to_string(), 1919 operation: CommitOp::Delete, 1920 record: None, 1921 cid: None, 1922 }; 1923 1924 let (commit, collection) = 1925 UFOsCommit::from_commit_info(event, did, Cursor::from_raw_u64(cursor)).unwrap(); 1926 1927 self.batch 1928 .commits_by_nsid 1929 .entry(collection.clone()) 1930 .or_default() 1931 .truncating_insert(commit, &[0u8; 16]) 1932 .unwrap(); 1933 1934 collection 1935 } 1936 pub fn delete_account(&mut self, did: &str, cursor: u64) -> Did { 1937 let did = Did::new(did.to_string()).unwrap(); 1938 self.batch.account_removes.push(DeleteAccount { 1939 did: did.clone(), 1940 cursor: Cursor::from_raw_u64(cursor), 1941 }); 1942 did 1943 } 1944 } 1945 1946 #[test] 1947 fn test_hello() -> anyhow::Result<()> { 1948 let (read, mut write) = fjall_db(); 1949 write.insert_batch::<TEST_BATCH_LIMIT>(EventBatch::default())?; 1950 let JustCount { 1951 creates, 1952 dids_estimate, 1953 .. 1954 } = read.get_collection_counts( 1955 &Nsid::new("a.b.c".to_string()).unwrap(), 1956 beginning(), 1957 None, 1958 )?; 1959 assert_eq!(creates, 0); 1960 assert_eq!(dids_estimate, 0); 1961 Ok(()) 1962 } 1963 1964 #[test] 1965 fn test_insert_one() -> anyhow::Result<()> { 1966 let (read, mut write) = fjall_db(); 1967 1968 let mut batch = TestBatch::default(); 1969 let collection = batch.create( 1970 "did:plc:inze6wrmsm7pjl7yta3oig77", 1971 "a.b.c", 1972 "asdf", 1973 "{}", 1974 Some("rev-z"), 1975 None, 1976 100, 1977 ); 1978 write.insert_batch(batch.batch)?; 1979 write.step_rollup()?; 1980 1981 let JustCount { 1982 creates, 1983 dids_estimate, 1984 .. 1985 } = read.get_collection_counts(&collection, beginning(), None)?; 1986 assert_eq!(creates, 1); 1987 assert_eq!(dids_estimate, 1); 1988 let JustCount { 1989 creates, 1990 dids_estimate, 1991 .. 1992 } = read.get_collection_counts( 1993 &Nsid::new("d.e.f".to_string()).unwrap(), 1994 beginning(), 1995 None, 1996 )?; 1997 assert_eq!(creates, 0); 1998 assert_eq!(dids_estimate, 0); 1999 2000 let records = read.get_records_by_collections([collection].into(), 2, false)?; 2001 assert_eq!(records.len(), 1); 2002 let rec = &records[0]; 2003 assert_eq!(rec.record.get(), "{}"); 2004 assert!(!rec.is_update); 2005 2006 let records = read.get_records_by_collections( 2007 [Nsid::new("d.e.f".to_string()).unwrap()].into(), 2008 2, 2009 false, 2010 )?; 2011 assert_eq!(records.len(), 0); 2012 2013 Ok(()) 2014 } 2015 2016 #[test] 2017 fn test_get_multi_collection() -> anyhow::Result<()> { 2018 let (read, mut write) = fjall_db(); 2019 2020 let mut batch = TestBatch::default(); 2021 batch.create( 2022 "did:plc:inze6wrmsm7pjl7yta3oig77", 2023 "a.a.a", 2024 "aaa", 2025 r#""earliest""#, 2026 Some("rev-a"), 2027 None, 2028 100, 2029 ); 2030 batch.create( 2031 "did:plc:inze6wrmsm7pjl7yta3oig77", 2032 "a.a.b", 2033 "aab", 2034 r#""in between""#, 2035 Some("rev-ab"), 2036 None, 2037 101, 2038 ); 2039 batch.create( 2040 "did:plc:inze6wrmsm7pjl7yta3oig77", 2041 "a.a.a", 2042 "aaa-2", 2043 r#""last""#, 2044 Some("rev-a-2"), 2045 None, 2046 102, 2047 ); 2048 write.insert_batch(batch.batch)?; 2049 2050 let records = read.get_records_by_collections( 2051 HashSet::from([ 2052 Nsid::new("a.a.a".to_string()).unwrap(), 2053 Nsid::new("a.a.b".to_string()).unwrap(), 2054 Nsid::new("a.a.c".to_string()).unwrap(), 2055 ]), 2056 100, 2057 false, 2058 )?; 2059 assert_eq!(records.len(), 3); 2060 assert_eq!(records[0].record.get(), r#""last""#); 2061 assert_eq!( 2062 records[0].collection, 2063 Nsid::new("a.a.a".to_string()).unwrap() 2064 ); 2065 assert_eq!(records[1].record.get(), r#""in between""#); 2066 assert_eq!( 2067 records[1].collection, 2068 Nsid::new("a.a.b".to_string()).unwrap() 2069 ); 2070 assert_eq!(records[2].record.get(), r#""earliest""#); 2071 assert_eq!( 2072 records[2].collection, 2073 Nsid::new("a.a.a".to_string()).unwrap() 2074 ); 2075 2076 Ok(()) 2077 } 2078 2079 #[test] 2080 fn test_get_multi_collection_expanded() -> anyhow::Result<()> { 2081 let (read, mut write) = fjall_db(); 2082 2083 let mut batch = TestBatch::default(); 2084 // insert some older ones in aab 2085 for i in 1..=3 { 2086 batch.create( 2087 "did:plc:inze6wrmsm7pjl7yta3oig77", 2088 "a.a.b", 2089 &format!("aab-{i}"), 2090 &format!(r#""b {i}""#), 2091 Some(&format!("rev-b-{i}")), 2092 None, 2093 100 + i, 2094 ); 2095 } 2096 // and some newer ones in aaa 2097 for i in 1..=3 { 2098 batch.create( 2099 "did:plc:inze6wrmsm7pjl7yta3oig77", 2100 "a.a.a", 2101 &format!("aaa-{i}"), 2102 &format!(r#""a {i}""#), 2103 Some(&format!("rev-a-{i}")), 2104 None, 2105 200 + i, 2106 ); 2107 } 2108 write.insert_batch(batch.batch)?; 2109 2110 let records = read.get_records_by_collections( 2111 HashSet::from([ 2112 Nsid::new("a.a.a".to_string()).unwrap(), 2113 Nsid::new("a.a.b".to_string()).unwrap(), 2114 Nsid::new("a.a.c".to_string()).unwrap(), 2115 ]), 2116 2, 2117 true, 2118 )?; 2119 assert_eq!(records.len(), 4); 2120 assert_eq!(records[0].record.get(), r#""a 3""#); 2121 assert_eq!( 2122 records[0].collection, 2123 Nsid::new("a.a.a".to_string()).unwrap() 2124 ); 2125 2126 assert_eq!(records[3].record.get(), r#""b 2""#); 2127 assert_eq!( 2128 records[3].collection, 2129 Nsid::new("a.a.b".to_string()).unwrap() 2130 ); 2131 2132 Ok(()) 2133 } 2134 2135 #[test] 2136 fn test_update_one() -> anyhow::Result<()> { 2137 let (read, mut write) = fjall_db(); 2138 2139 let mut batch = TestBatch::default(); 2140 let collection = batch.create( 2141 "did:plc:inze6wrmsm7pjl7yta3oig77", 2142 "a.b.c", 2143 "rkey-asdf", 2144 "{}", 2145 Some("rev-a"), 2146 None, 2147 100, 2148 ); 2149 write.insert_batch(batch.batch)?; 2150 2151 let mut batch = TestBatch::default(); 2152 batch.update( 2153 "did:plc:inze6wrmsm7pjl7yta3oig77", 2154 "a.b.c", 2155 "rkey-asdf", 2156 r#"{"ch": "ch-ch-ch-changes"}"#, 2157 Some("rev-z"), 2158 None, 2159 101, 2160 ); 2161 write.insert_batch(batch.batch)?; 2162 write.step_rollup()?; 2163 2164 let JustCount { 2165 creates, 2166 dids_estimate, 2167 .. 2168 } = read.get_collection_counts(&collection, beginning(), None)?; 2169 assert_eq!(creates, 1); 2170 assert_eq!(dids_estimate, 1); 2171 2172 let records = read.get_records_by_collections([collection].into(), 2, false)?; 2173 assert_eq!(records.len(), 1); 2174 let rec = &records[0]; 2175 assert_eq!(rec.record.get(), r#"{"ch": "ch-ch-ch-changes"}"#); 2176 assert!(rec.is_update); 2177 Ok(()) 2178 } 2179 2180 #[test] 2181 fn test_delete_one() -> anyhow::Result<()> { 2182 let (read, mut write) = fjall_db(); 2183 2184 let mut batch = TestBatch::default(); 2185 let collection = batch.create( 2186 "did:plc:inze6wrmsm7pjl7yta3oig77", 2187 "a.b.c", 2188 "rkey-asdf", 2189 "{}", 2190 Some("rev-a"), 2191 None, 2192 100, 2193 ); 2194 write.insert_batch(batch.batch)?; 2195 2196 let mut batch = TestBatch::default(); 2197 batch.delete( 2198 "did:plc:inze6wrmsm7pjl7yta3oig77", 2199 "a.b.c", 2200 "rkey-asdf", 2201 Some("rev-z"), 2202 101, 2203 ); 2204 write.insert_batch(batch.batch)?; 2205 write.step_rollup()?; 2206 2207 let JustCount { 2208 creates, 2209 dids_estimate, 2210 .. 2211 } = read.get_collection_counts(&collection, beginning(), None)?; 2212 assert_eq!(creates, 1); 2213 assert_eq!(dids_estimate, 1); 2214 2215 let records = read.get_records_by_collections([collection].into(), 2, false)?; 2216 assert_eq!(records.len(), 0); 2217 2218 Ok(()) 2219 } 2220 2221 #[test] 2222 fn test_collection_trim() -> anyhow::Result<()> { 2223 let (read, mut write) = fjall_db(); 2224 2225 let mut batch = TestBatch::default(); 2226 batch.create( 2227 "did:plc:inze6wrmsm7pjl7yta3oig77", 2228 "a.a.a", 2229 "rkey-aaa", 2230 "{}", 2231 Some("rev-aaa"), 2232 None, 2233 10_000, 2234 ); 2235 let mut last_b_cursor; 2236 for i in 1..=10 { 2237 last_b_cursor = 11_000 + i; 2238 batch.create( 2239 &format!("did:plc:inze6wrmsm7pjl7yta3oig7{}", i % 3), 2240 "a.a.b", 2241 &format!("rkey-bbb-{i}"), 2242 &format!(r#"{{"n": {i}}}"#), 2243 Some(&format!("rev-bbb-{i}")), 2244 None, 2245 last_b_cursor, 2246 ); 2247 } 2248 batch.create( 2249 "did:plc:inze6wrmsm7pjl7yta3oig77", 2250 "a.a.c", 2251 "rkey-ccc", 2252 "{}", 2253 Some("rev-ccc"), 2254 None, 2255 12_000, 2256 ); 2257 2258 write.insert_batch(batch.batch)?; 2259 2260 let records = read.get_records_by_collections( 2261 HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 2262 100, 2263 false, 2264 )?; 2265 assert_eq!(records.len(), 1); 2266 let records = read.get_records_by_collections( 2267 HashSet::from([Nsid::new("a.a.b".to_string()).unwrap()]), 2268 100, 2269 false, 2270 )?; 2271 assert_eq!(records.len(), 10); 2272 let records = read.get_records_by_collections( 2273 HashSet::from([Nsid::new("a.a.c".to_string()).unwrap()]), 2274 100, 2275 false, 2276 )?; 2277 assert_eq!(records.len(), 1); 2278 let records = read.get_records_by_collections( 2279 HashSet::from([Nsid::new("a.a.d".to_string()).unwrap()]), 2280 100, 2281 false, 2282 )?; 2283 assert_eq!(records.len(), 0); 2284 2285 write.trim_collection(&Nsid::new("a.a.a".to_string()).unwrap(), 6, false)?; 2286 write.trim_collection(&Nsid::new("a.a.b".to_string()).unwrap(), 6, false)?; 2287 write.trim_collection(&Nsid::new("a.a.c".to_string()).unwrap(), 6, false)?; 2288 write.trim_collection(&Nsid::new("a.a.d".to_string()).unwrap(), 6, false)?; 2289 2290 let records = read.get_records_by_collections( 2291 HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 2292 100, 2293 false, 2294 )?; 2295 assert_eq!(records.len(), 1); 2296 let records = read.get_records_by_collections( 2297 HashSet::from([Nsid::new("a.a.b".to_string()).unwrap()]), 2298 100, 2299 false, 2300 )?; 2301 assert_eq!(records.len(), 6); 2302 let records = read.get_records_by_collections( 2303 HashSet::from([Nsid::new("a.a.c".to_string()).unwrap()]), 2304 100, 2305 false, 2306 )?; 2307 assert_eq!(records.len(), 1); 2308 let records = read.get_records_by_collections( 2309 HashSet::from([Nsid::new("a.a.d".to_string()).unwrap()]), 2310 100, 2311 false, 2312 )?; 2313 assert_eq!(records.len(), 0); 2314 2315 Ok(()) 2316 } 2317 2318 #[test] 2319 fn test_delete_account() -> anyhow::Result<()> { 2320 let (read, mut write) = fjall_db(); 2321 2322 let mut batch = TestBatch::default(); 2323 batch.create( 2324 "did:plc:person-a", 2325 "a.a.a", 2326 "rkey-aaa", 2327 "{}", 2328 Some("rev-aaa"), 2329 None, 2330 10_000, 2331 ); 2332 for i in 1..=2 { 2333 batch.create( 2334 "did:plc:person-b", 2335 "a.a.a", 2336 &format!("rkey-bbb-{i}"), 2337 &format!(r#"{{"n": {i}}}"#), 2338 Some(&format!("rev-bbb-{i}")), 2339 None, 2340 11_000 + i, 2341 ); 2342 } 2343 write.insert_batch(batch.batch)?; 2344 2345 let records = read.get_records_by_collections( 2346 HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 2347 100, 2348 false, 2349 )?; 2350 assert_eq!(records.len(), 3); 2351 2352 let records_deleted = 2353 write.delete_account(&Did::new("did:plc:person-b".to_string()).unwrap())?; 2354 assert_eq!(records_deleted, 2); 2355 2356 let records = read.get_records_by_collections( 2357 HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 2358 100, 2359 false, 2360 )?; 2361 assert_eq!(records.len(), 1); 2362 2363 Ok(()) 2364 } 2365 2366 #[test] 2367 fn rollup_delete_account_removes_record() -> anyhow::Result<()> { 2368 let (read, mut write) = fjall_db(); 2369 2370 let mut batch = TestBatch::default(); 2371 batch.create( 2372 "did:plc:person-a", 2373 "a.a.a", 2374 "rkey-aaa", 2375 "{}", 2376 Some("rev-aaa"), 2377 None, 2378 10_000, 2379 ); 2380 write.insert_batch(batch.batch)?; 2381 2382 let mut batch = TestBatch::default(); 2383 batch.delete_account("did:plc:person-a", 9_999); // queue it before the rollup 2384 write.insert_batch(batch.batch)?; 2385 2386 write.step_rollup()?; 2387 2388 let records = read.get_records_by_collections( 2389 [Nsid::new("a.a.a".to_string()).unwrap()].into(), 2390 1, 2391 false, 2392 )?; 2393 assert_eq!(records.len(), 0); 2394 2395 Ok(()) 2396 } 2397 2398 #[test] 2399 fn rollup_delete_live_count_step() -> anyhow::Result<()> { 2400 let (read, mut write) = fjall_db(); 2401 2402 let mut batch = TestBatch::default(); 2403 batch.create( 2404 "did:plc:person-a", 2405 "a.a.a", 2406 "rkey-aaa", 2407 "{}", 2408 Some("rev-aaa"), 2409 None, 2410 10_000, 2411 ); 2412 write.insert_batch(batch.batch)?; 2413 2414 let (n, _) = write.step_rollup()?; 2415 assert_eq!(n, 1); 2416 2417 let mut batch = TestBatch::default(); 2418 batch.delete_account("did:plc:person-a", 10_001); 2419 write.insert_batch(batch.batch)?; 2420 2421 let records = read.get_records_by_collections( 2422 [Nsid::new("a.a.a".to_string()).unwrap()].into(), 2423 1, 2424 false, 2425 )?; 2426 assert_eq!(records.len(), 1); 2427 2428 let (n, _) = write.step_rollup()?; 2429 assert_eq!(n, 1); 2430 2431 let records = read.get_records_by_collections( 2432 [Nsid::new("a.a.a".to_string()).unwrap()].into(), 2433 1, 2434 false, 2435 )?; 2436 assert_eq!(records.len(), 0); 2437 2438 let mut batch = TestBatch::default(); 2439 batch.delete_account("did:plc:person-a", 9_999); 2440 write.insert_batch(batch.batch)?; 2441 2442 let (n, _) = write.step_rollup()?; 2443 assert_eq!(n, 0); 2444 2445 Ok(()) 2446 } 2447 2448 #[test] 2449 fn rollup_multiple_count_batches() -> anyhow::Result<()> { 2450 let (_read, mut write) = fjall_db(); 2451 2452 let mut batch = TestBatch::default(); 2453 batch.create( 2454 "did:plc:person-a", 2455 "a.a.a", 2456 "rkey-aaa", 2457 "{}", 2458 Some("rev-aaa"), 2459 None, 2460 10_000, 2461 ); 2462 write.insert_batch(batch.batch)?; 2463 2464 let mut batch = TestBatch::default(); 2465 batch.create( 2466 "did:plc:person-a", 2467 "a.a.a", 2468 "rkey-aab", 2469 "{}", 2470 Some("rev-aab"), 2471 None, 2472 10_001, 2473 ); 2474 write.insert_batch(batch.batch)?; 2475 2476 let (n, _) = write.step_rollup()?; 2477 assert_eq!(n, 2); 2478 2479 let (n, _) = write.step_rollup()?; 2480 assert_eq!(n, 0); 2481 2482 Ok(()) 2483 } 2484 2485 #[test] 2486 fn counts_before_and_after_rollup() -> anyhow::Result<()> { 2487 let (read, mut write) = fjall_db(); 2488 2489 let mut batch = TestBatch::default(); 2490 batch.create( 2491 "did:plc:person-a", 2492 "a.a.a", 2493 "rkey-aaa", 2494 "{}", 2495 Some("rev-aaa"), 2496 None, 2497 10_000, 2498 ); 2499 batch.create( 2500 "did:plc:person-b", 2501 "a.a.a", 2502 "rkey-bbb", 2503 "{}", 2504 Some("rev-bbb"), 2505 None, 2506 10_001, 2507 ); 2508 write.insert_batch(batch.batch)?; 2509 2510 let mut batch = TestBatch::default(); 2511 batch.delete_account("did:plc:person-a", 11_000); 2512 write.insert_batch(batch.batch)?; 2513 2514 let mut batch = TestBatch::default(); 2515 batch.create( 2516 "did:plc:person-a", 2517 "a.a.a", 2518 "rkey-aac", 2519 "{}", 2520 Some("rev-aac"), 2521 None, 2522 12_000, 2523 ); 2524 write.insert_batch(batch.batch)?; 2525 2526 // before any rollup 2527 let JustCount { 2528 creates, 2529 dids_estimate, 2530 .. 2531 } = read.get_collection_counts( 2532 &Nsid::new("a.a.a".to_string()).unwrap(), 2533 beginning(), 2534 None, 2535 )?; 2536 assert_eq!(creates, 0); 2537 assert_eq!(dids_estimate, 0); 2538 2539 // first batch rolled up 2540 let (n, _) = write.step_rollup()?; 2541 assert_eq!(n, 1); 2542 2543 let JustCount { 2544 creates, 2545 dids_estimate, 2546 .. 2547 } = read.get_collection_counts( 2548 &Nsid::new("a.a.a".to_string()).unwrap(), 2549 beginning(), 2550 None, 2551 )?; 2552 assert_eq!(creates, 2); 2553 assert_eq!(dids_estimate, 2); 2554 2555 // delete account rolled up 2556 let (n, _) = write.step_rollup()?; 2557 assert_eq!(n, 1); 2558 2559 let JustCount { 2560 creates, 2561 dids_estimate, 2562 .. 2563 } = read.get_collection_counts( 2564 &Nsid::new("a.a.a".to_string()).unwrap(), 2565 beginning(), 2566 None, 2567 )?; 2568 assert_eq!(creates, 2); 2569 assert_eq!(dids_estimate, 2); 2570 2571 // second batch rolled up 2572 let (n, _) = write.step_rollup()?; 2573 assert_eq!(n, 1); 2574 2575 let JustCount { 2576 creates, 2577 dids_estimate, 2578 .. 2579 } = read.get_collection_counts( 2580 &Nsid::new("a.a.a".to_string()).unwrap(), 2581 beginning(), 2582 None, 2583 )?; 2584 assert_eq!(creates, 3); 2585 assert_eq!(dids_estimate, 2); 2586 2587 // no more rollups left 2588 let (n, _) = write.step_rollup()?; 2589 assert_eq!(n, 0); 2590 2591 Ok(()) 2592 } 2593 2594 #[test] 2595 fn get_prefix_children_lexi_empty() { 2596 let (read, _) = fjall_db(); 2597 let ( 2598 JustCount { 2599 creates, 2600 dids_estimate, 2601 .. 2602 }, 2603 children, 2604 cursor, 2605 ) = read 2606 .get_prefix( 2607 NsidPrefix::new("aaa.aaa").unwrap(), 2608 10, 2609 OrderCollectionsBy::Lexi { cursor: None }, 2610 None, 2611 None, 2612 ) 2613 .unwrap(); 2614 2615 assert_eq!(creates, 0); 2616 assert_eq!(dids_estimate, 0); 2617 assert_eq!(children, vec![]); 2618 assert_eq!(cursor, None); 2619 } 2620 2621 #[test] 2622 fn get_prefix_excludes_exact_collection() -> anyhow::Result<()> { 2623 let (read, mut write) = fjall_db(); 2624 2625 let mut batch = TestBatch::default(); 2626 batch.create( 2627 "did:plc:person-a", 2628 "a.a.a", 2629 "rkey-aaa", 2630 "{}", 2631 Some("rev-aaa"), 2632 None, 2633 10_000, 2634 ); 2635 write.insert_batch(batch.batch)?; 2636 write.step_rollup()?; 2637 2638 let ( 2639 JustCount { 2640 creates, 2641 dids_estimate, 2642 .. 2643 }, 2644 children, 2645 cursor, 2646 ) = read.get_prefix( 2647 NsidPrefix::new("a.a.a").unwrap(), 2648 10, 2649 OrderCollectionsBy::Lexi { cursor: None }, 2650 None, 2651 None, 2652 )?; 2653 assert_eq!(creates, 0); 2654 assert_eq!(dids_estimate, 0); 2655 assert_eq!(children, vec![]); 2656 assert_eq!(cursor, None); 2657 Ok(()) 2658 } 2659 2660 #[test] 2661 fn get_prefix_excludes_neighbour_collection() -> anyhow::Result<()> { 2662 let (read, mut write) = fjall_db(); 2663 2664 let mut batch = TestBatch::default(); 2665 batch.create( 2666 "did:plc:person-a", 2667 "a.a.aa", 2668 "rkey-aaa", 2669 "{}", 2670 Some("rev-aaa"), 2671 None, 2672 10_000, 2673 ); 2674 write.insert_batch(batch.batch)?; 2675 write.step_rollup()?; 2676 2677 let ( 2678 JustCount { 2679 creates, 2680 dids_estimate, 2681 .. 2682 }, 2683 children, 2684 cursor, 2685 ) = read.get_prefix( 2686 NsidPrefix::new("a.a.a").unwrap(), 2687 10, 2688 OrderCollectionsBy::Lexi { cursor: None }, 2689 None, 2690 None, 2691 )?; 2692 assert_eq!(creates, 0); 2693 assert_eq!(dids_estimate, 0); 2694 assert_eq!(children, vec![]); 2695 assert_eq!(cursor, None); 2696 Ok(()) 2697 } 2698 2699 #[test] 2700 fn get_prefix_includes_child_collection() -> anyhow::Result<()> { 2701 let (read, mut write) = fjall_db(); 2702 2703 let mut batch = TestBatch::default(); 2704 batch.create( 2705 "did:plc:person-a", 2706 "a.a.a", 2707 "rkey-aaa", 2708 "{}", 2709 Some("rev-aaa"), 2710 None, 2711 10_000, 2712 ); 2713 write.insert_batch(batch.batch)?; 2714 write.step_rollup()?; 2715 2716 let ( 2717 JustCount { 2718 creates, 2719 dids_estimate, 2720 .. 2721 }, 2722 children, 2723 cursor, 2724 ) = read.get_prefix( 2725 NsidPrefix::new("a.a").unwrap(), 2726 10, 2727 OrderCollectionsBy::Lexi { cursor: None }, 2728 None, 2729 None, 2730 )?; 2731 assert_eq!(creates, 1); 2732 assert_eq!(dids_estimate, 1); 2733 assert_eq!( 2734 children, 2735 vec![PrefixChild::Collection(NsidCount { 2736 nsid: "a.a.a".to_string(), 2737 creates: 1, 2738 updates: 0, 2739 deletes: 0, 2740 dids_estimate: 1 2741 }),] 2742 ); 2743 assert_eq!(cursor, None); 2744 Ok(()) 2745 } 2746 2747 #[test] 2748 fn get_prefix_includes_child_prefix() -> anyhow::Result<()> { 2749 let (read, mut write) = fjall_db(); 2750 2751 let mut batch = TestBatch::default(); 2752 batch.create( 2753 "did:plc:person-a", 2754 "a.a.a.a", 2755 "rkey-aaaa", 2756 "{}", 2757 Some("rev-aaaa"), 2758 None, 2759 10_000, 2760 ); 2761 write.insert_batch(batch.batch)?; 2762 write.step_rollup()?; 2763 2764 let ( 2765 JustCount { 2766 creates, 2767 dids_estimate, 2768 .. 2769 }, 2770 children, 2771 cursor, 2772 ) = read.get_prefix( 2773 NsidPrefix::new("a.a").unwrap(), 2774 10, 2775 OrderCollectionsBy::Lexi { cursor: None }, 2776 None, 2777 None, 2778 )?; 2779 assert_eq!(creates, 1); 2780 assert_eq!(dids_estimate, 1); 2781 assert_eq!( 2782 children, 2783 vec![PrefixChild::Prefix(PrefixCount { 2784 prefix: "a.a.a".to_string(), 2785 creates: 1, 2786 updates: 0, 2787 deletes: 0, 2788 dids_estimate: 1, 2789 }),] 2790 ); 2791 assert_eq!(cursor, None); 2792 Ok(()) 2793 } 2794 2795 #[test] 2796 fn get_prefix_merges_child_prefixes() -> anyhow::Result<()> { 2797 let (read, mut write) = fjall_db(); 2798 2799 let mut batch = TestBatch::default(); 2800 batch.create( 2801 "did:plc:person-a", 2802 "a.a.a.a", 2803 "rkey-aaaa", 2804 "{}", 2805 Some("rev-aaaa"), 2806 None, 2807 10_000, 2808 ); 2809 batch.create( 2810 "did:plc:person-a", 2811 "a.a.a.b", 2812 "rkey-aaab", 2813 "{}", 2814 Some("rev-aaab"), 2815 None, 2816 10_001, 2817 ); 2818 write.insert_batch(batch.batch)?; 2819 write.step_rollup()?; 2820 2821 let ( 2822 JustCount { 2823 creates, 2824 dids_estimate, 2825 .. 2826 }, 2827 children, 2828 cursor, 2829 ) = read.get_prefix( 2830 NsidPrefix::new("a.a").unwrap(), 2831 10, 2832 OrderCollectionsBy::Lexi { cursor: None }, 2833 None, 2834 None, 2835 )?; 2836 assert_eq!(creates, 2); 2837 assert_eq!(dids_estimate, 1); 2838 assert_eq!( 2839 children, 2840 vec![PrefixChild::Prefix(PrefixCount { 2841 prefix: "a.a.a".to_string(), 2842 creates: 2, 2843 updates: 0, 2844 deletes: 0, 2845 dids_estimate: 1 2846 }),] 2847 ); 2848 assert_eq!(cursor, None); 2849 Ok(()) 2850 } 2851 2852 #[test] 2853 fn get_prefix_exact_and_child_and_prefix() -> anyhow::Result<()> { 2854 let (read, mut write) = fjall_db(); 2855 2856 let mut batch = TestBatch::default(); 2857 // exact: 2858 batch.create( 2859 "did:plc:person-a", 2860 "a.a.a", 2861 "rkey-aaa", 2862 "{}", 2863 Some("rev-aaa"), 2864 None, 2865 10_000, 2866 ); 2867 // child: 2868 batch.create( 2869 "did:plc:person-a", 2870 "a.a.a.a", 2871 "rkey-aaaa", 2872 "{}", 2873 Some("rev-aaaa"), 2874 None, 2875 10_001, 2876 ); 2877 // prefix: 2878 batch.create( 2879 "did:plc:person-a", 2880 "a.a.a.a.a", 2881 "rkey-aaaaa", 2882 "{}", 2883 Some("rev-aaaaa"), 2884 None, 2885 10_002, 2886 ); 2887 write.insert_batch(batch.batch)?; 2888 write.step_rollup()?; 2889 2890 let ( 2891 JustCount { 2892 creates, 2893 dids_estimate, 2894 .. 2895 }, 2896 children, 2897 cursor, 2898 ) = read.get_prefix( 2899 NsidPrefix::new("a.a.a").unwrap(), 2900 10, 2901 OrderCollectionsBy::Lexi { cursor: None }, 2902 None, 2903 None, 2904 )?; 2905 assert_eq!(creates, 2); 2906 assert_eq!(dids_estimate, 1); 2907 assert_eq!( 2908 children, 2909 vec![ 2910 PrefixChild::Collection(NsidCount { 2911 nsid: "a.a.a.a".to_string(), 2912 creates: 1, 2913 updates: 0, 2914 deletes: 0, 2915 dids_estimate: 1 2916 }), 2917 PrefixChild::Prefix(PrefixCount { 2918 prefix: "a.a.a.a".to_string(), 2919 creates: 1, 2920 updates: 0, 2921 deletes: 0, 2922 dids_estimate: 1 2923 }), 2924 ] 2925 ); 2926 assert_eq!(cursor, None); 2927 Ok(()) 2928 } 2929}