Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::db_types::{ 2 db_complete, DbBytes, DbStaticStr, EncodingResult, StaticStr, SubPrefixBytes, 3}; 4use crate::error::StorageError; 5use crate::storage::{StorageResult, StorageWhatever, StoreBackground, StoreReader, StoreWriter}; 6use crate::store_types::{ 7 AllTimeDidsKey, AllTimeRecordsKey, AllTimeRollupKey, CommitCounts, CountsValue, CursorBucket, 8 DeleteAccountQueueKey, DeleteAccountQueueVal, HourTruncatedCursor, HourlyDidsKey, 9 HourlyRecordsKey, HourlyRollupKey, HourlyRollupStaticPrefix, JetstreamCursorKey, 10 JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey, 11 NewRollupCursorKey, NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal, 12 RecordLocationKey, RecordLocationMeta, RecordLocationVal, RecordRawValue, SketchSecretKey, 13 SketchSecretPrefix, TakeoffKey, TakeoffValue, TrimCollectionCursorKey, WeekTruncatedCursor, 14 WeeklyDidsKey, WeeklyRecordsKey, WeeklyRollupKey, WithCollection, WithRank, HOUR_IN_MICROS, 15 WEEK_IN_MICROS, 16}; 17use crate::{ 18 nice_duration, CommitAction, ConsumerInfo, Did, EncodingError, EventBatch, JustCount, Nsid, 19 NsidCount, NsidPrefix, OrderCollectionsBy, PrefixChild, PrefixCount, UFOsRecord, 20}; 21use async_trait::async_trait; 22use fjall::{ 23 Batch as FjallBatch, Config, Keyspace, PartitionCreateOptions, PartitionHandle, Snapshot, 24}; 25use jetstream::events::Cursor; 26use metrics::{counter, describe_counter, describe_histogram, histogram, Unit}; 27use std::collections::{HashMap, HashSet}; 28use std::iter::Peekable; 29use std::ops::Bound; 30use std::path::Path; 31use std::sync::{ 32 atomic::{AtomicBool, Ordering}, 33 Arc, 34}; 35use std::time::{Duration, Instant, SystemTime}; 36 37const MAX_BATCHED_ACCOUNT_DELETE_RECORDS: usize = 1024; 38const MAX_BATCHED_ROLLUP_COUNTS: usize = 256; 39 40/// 41/// new data format, roughly: 42/// 43/// Partition: 'global' 44/// 45/// - Global sequence counter (is the jetstream cursor -- monotonic with many gaps) 46/// - key: "js_cursor" (literal) 47/// - val: u64 48/// 49/// - Jetstream server endpoint (persisted because the cursor can't be used on another instance without data loss) 50/// - key: "js_endpoint" (literal) 51/// - val: string (URL of the instance) 52/// 53/// - Launch date 54/// - key: "takeoff" (literal) 55/// - val: u64 (micros timestamp, not from jetstream for now so not precise) 56/// 57/// - Cardinality estimator secret 58/// - key: "sketch_secret" (literal) 59/// - val: [u8; 16] 60/// 61/// - Rollup cursor (bg work: roll stats into hourlies, delete accounts, old record deletes) 62/// - key: "rollup_cursor" (literal) 63/// - val: u64 (tracks behind js_cursor) 64/// 65/// - Feed trim cursor (bg work: delete oldest excess records) 66/// - key: "trim_cursor" || nullstr (nsid) 67/// - val: u64 (earliest previously-removed feed entry jetstream cursor) 68/// 69/// Partition: 'feed' 70/// 71/// - Per-collection list of record references ordered by jetstream cursor 72/// - key: nullstr || u64 (collection nsid null-terminated, jetstream cursor) 73/// - val: nullstr || nullstr || nullstr (did, rkey, rev. rev is mostly a sanity-check for now.) 74/// 75/// 76/// Partition: 'records' 77/// 78/// - Actual records by their atproto location 79/// - key: nullstr || nullstr || nullstr (did, collection, rkey) 80/// - val: u64 || bool || nullstr || rawval (js_cursor, is_update, rev, actual record) 81/// 82/// 83/// Partition: 'rollups' 84/// 85/// - Live (batched) records counts and dids estimate per collection 86/// - key: "live_counts" || u64 || nullstr (js_cursor, nsid) 87/// - val: u64 || HLL (count (not cursor), estimator) 88/// 89/// 90/// - Hourly total record counts and dids estimate per collection 91/// - key: "hourly_counts" || u64 || nullstr (hour, nsid) 92/// - val: u64 || HLL (count (not cursor), estimator) 93/// 94/// - Hourly record count ranking 95/// - key: "hourly_rank_records" || u64 || u64 || nullstr (hour, count, nsid) 96/// - val: [empty] 97/// 98/// - Hourly did estimate ranking 99/// - key: "hourly_rank_dids" || u64 || u64 || nullstr (hour, dids estimate, nsid) 100/// - val: [empty] 101/// 102/// 103/// - Weekly total record counts and dids estimate per collection 104/// - key: "weekly_counts" || u64 || nullstr (week, nsid) 105/// - val: u64 || HLL (count (not cursor), estimator) 106/// 107/// - Weekly record count ranking 108/// - key: "weekly_rank_records" || u64 || u64 || nullstr (week, count, nsid) 109/// - val: [empty] 110/// 111/// - Weekly did estimate ranking 112/// - key: "weekly_rank_dids" || u64 || u64 || nullstr (week, dids estimate, nsid) 113/// - val: [empty] 114/// 115/// 116/// - All-time total record counts and dids estimate per collection 117/// - key: "ever_counts" || nullstr (nsid) 118/// - val: u64 || HLL (count (not cursor), estimator) 119/// 120/// - All-time total record record count ranking 121/// - key: "ever_rank_records" || u64 || nullstr (count, nsid) 122/// - val: [empty] 123/// 124/// - All-time did estimate ranking 125/// - key: "ever_rank_dids" || u64 || nullstr (dids estimate, nsid) 126/// - val: [empty] 127/// 128/// 129/// Partition: 'queues' 130/// 131/// - Delete account queue 132/// - key: "delete_acount" || u64 (js_cursor) 133/// - val: nullstr (did) 134/// 135/// 136/// TODO: moderation actions 137/// TODO: account privacy preferences. Might wait for the protocol-level (PDS-level?) stuff to land. Will probably do lazy fetching + caching on read. 138#[derive(Debug)] 139pub struct FjallStorage {} 140 141#[derive(Debug, Default)] 142pub struct FjallConfig { 143 /// drop the db when the storage is dropped 144 /// 145 /// this is only meant for tests 146 #[cfg(test)] 147 pub temp: bool, 148} 149 150impl StorageWhatever<FjallReader, FjallWriter, FjallBackground, FjallConfig> for FjallStorage { 151 fn init( 152 path: impl AsRef<Path>, 153 endpoint: String, 154 force_endpoint: bool, 155 _config: FjallConfig, 156 ) -> StorageResult<(FjallReader, FjallWriter, Option<Cursor>, SketchSecretPrefix)> { 157 let keyspace = { 158 let config = Config::new(path); 159 160 // #[cfg(not(test))] 161 // let config = config.fsync_ms(Some(4_000)); 162 163 config.open()? 164 }; 165 166 let global = keyspace.open_partition("global", PartitionCreateOptions::default())?; 167 let feeds = keyspace.open_partition("feeds", PartitionCreateOptions::default())?; 168 let records = keyspace.open_partition("records", PartitionCreateOptions::default())?; 169 let rollups = keyspace.open_partition("rollups", PartitionCreateOptions::default())?; 170 let queues = keyspace.open_partition("queues", PartitionCreateOptions::default())?; 171 172 let js_cursor = get_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?; 173 174 let sketch_secret = if js_cursor.is_some() { 175 let stored_endpoint = 176 get_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)?; 177 let JetstreamEndpointValue(stored) = stored_endpoint.ok_or(StorageError::InitError( 178 "found cursor but missing js_endpoint, refusing to start.".to_string(), 179 ))?; 180 181 let Some(stored_secret) = 182 get_static_neu::<SketchSecretKey, SketchSecretPrefix>(&global)? 183 else { 184 return Err(StorageError::InitError( 185 "found cursor but missing sketch_secret, refusing to start.".to_string(), 186 )); 187 }; 188 189 if stored != endpoint { 190 if force_endpoint { 191 log::warn!("forcing a jetstream switch from {stored:?} to {endpoint:?}"); 192 insert_static_neu::<JetstreamEndpointKey>( 193 &global, 194 JetstreamEndpointValue(endpoint.to_string()), 195 )?; 196 } else { 197 return Err(StorageError::InitError(format!( 198 "stored js_endpoint {stored:?} differs from provided {endpoint:?}, refusing to start without --jetstream-force."))); 199 } 200 } 201 stored_secret 202 } else { 203 log::info!("initializing a fresh db!"); 204 init_static_neu::<JetstreamEndpointKey>( 205 &global, 206 JetstreamEndpointValue(endpoint.to_string()), 207 )?; 208 209 log::info!("generating new secret for cardinality sketches..."); 210 let mut sketch_secret: SketchSecretPrefix = [0u8; 16]; 211 getrandom::fill(&mut sketch_secret).map_err(|e| { 212 StorageError::InitError(format!( 213 "failed to get a random secret for cardinality sketches: {e:?}" 214 )) 215 })?; 216 init_static_neu::<SketchSecretKey>(&global, sketch_secret)?; 217 218 init_static_neu::<TakeoffKey>(&global, Cursor::at(SystemTime::now()))?; 219 init_static_neu::<NewRollupCursorKey>(&global, Cursor::from_start())?; 220 221 sketch_secret 222 }; 223 224 let reader = FjallReader { 225 keyspace: keyspace.clone(), 226 global: global.clone(), 227 feeds: feeds.clone(), 228 records: records.clone(), 229 rollups: rollups.clone(), 230 }; 231 let writer = FjallWriter { 232 bg_taken: Arc::new(AtomicBool::new(false)), 233 keyspace, 234 global, 235 feeds, 236 records, 237 rollups, 238 queues, 239 }; 240 Ok((reader, writer, js_cursor, sketch_secret)) 241 } 242} 243 244type FjallRKV = fjall::Result<(fjall::Slice, fjall::Slice)>; 245 246#[derive(Clone)] 247pub struct FjallReader { 248 keyspace: Keyspace, 249 global: PartitionHandle, 250 feeds: PartitionHandle, 251 records: PartitionHandle, 252 rollups: PartitionHandle, 253} 254 255/// An iterator that knows how to skip over deleted/invalidated records 256struct RecordIterator { 257 db_iter: Box<dyn Iterator<Item = FjallRKV>>, 258 records: PartitionHandle, 259 limit: usize, 260 fetched: usize, 261} 262impl RecordIterator { 263 pub fn new( 264 feeds: &PartitionHandle, 265 records: PartitionHandle, 266 collection: &Nsid, 267 limit: usize, 268 ) -> StorageResult<Self> { 269 let prefix = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?; 270 let db_iter = feeds.prefix(prefix).rev(); 271 Ok(Self { 272 db_iter: Box::new(db_iter), 273 records, 274 limit, 275 fetched: 0, 276 }) 277 } 278 fn get_record(&self, db_next: FjallRKV) -> StorageResult<Option<UFOsRecord>> { 279 let (key_bytes, val_bytes) = db_next?; 280 let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?; 281 let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?; 282 let location_key: RecordLocationKey = (&feed_key, &feed_val).into(); 283 284 let Some(location_val_bytes) = self.records.get(location_key.to_db_bytes()?)? else { 285 // record was deleted (hopefully) 286 return Ok(None); 287 }; 288 289 let (meta, n) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?; 290 291 if meta.cursor() != feed_key.cursor() { 292 // older/different version 293 return Ok(None); 294 } 295 if meta.rev != feed_val.rev() { 296 // weird... 297 log::warn!("record lookup: cursor match but rev did not...? excluding."); 298 return Ok(None); 299 } 300 let Some(raw_value_bytes) = location_val_bytes.get(n..) else { 301 log::warn!( 302 "record lookup: found record but could not get bytes to decode the record??" 303 ); 304 return Ok(None); 305 }; 306 let rawval = db_complete::<RecordRawValue>(raw_value_bytes)?; 307 Ok(Some(UFOsRecord { 308 collection: feed_key.collection().clone(), 309 cursor: feed_key.cursor(), 310 did: feed_val.did().clone(), 311 rkey: feed_val.rkey().clone(), 312 rev: meta.rev.to_string(), 313 record: rawval.try_into()?, 314 is_update: meta.is_update, 315 })) 316 } 317} 318impl Iterator for RecordIterator { 319 type Item = StorageResult<Option<UFOsRecord>>; 320 fn next(&mut self) -> Option<Self::Item> { 321 if self.fetched == self.limit { 322 return Some(Ok(None)); 323 } 324 let record = loop { 325 let db_next = self.db_iter.next()?; // None short-circuits here 326 match self.get_record(db_next) { 327 Err(e) => return Some(Err(e)), 328 Ok(Some(record)) => break record, 329 Ok(None) => continue, 330 } 331 }; 332 self.fetched += 1; 333 Some(Ok(Some(record))) 334 } 335} 336 337type GetCounts = Box<dyn FnOnce() -> StorageResult<CountsValue>>; 338type GetByterCounts = StorageResult<(Nsid, GetCounts)>; 339type NsidCounter = Box<dyn Iterator<Item = GetByterCounts>>; 340fn get_lexi_iter<T: WithCollection + DbBytes + 'static>( 341 snapshot: &Snapshot, 342 start: Bound<Vec<u8>>, 343 end: Bound<Vec<u8>>, 344) -> StorageResult<NsidCounter> { 345 Ok(Box::new(snapshot.range((start, end)).map(|kv| { 346 let (k_bytes, v_bytes) = kv?; 347 let key = db_complete::<T>(&k_bytes)?; 348 let nsid = key.collection().clone(); 349 let get_counts: GetCounts = Box::new(move || Ok(db_complete::<CountsValue>(&v_bytes)?)); 350 Ok((nsid, get_counts)) 351 }))) 352} 353type GetRollupKey = Arc<dyn Fn(&Nsid) -> EncodingResult<Vec<u8>>>; 354fn get_lookup_iter<T: WithCollection + WithRank + DbBytes + 'static>( 355 snapshot: lsm_tree::Snapshot, 356 start: Bound<Vec<u8>>, 357 end: Bound<Vec<u8>>, 358 get_rollup_key: GetRollupKey, 359) -> StorageResult<NsidCounter> { 360 Ok(Box::new(snapshot.range((start, end)).rev().map( 361 move |kv| { 362 let (k_bytes, _) = kv?; 363 let key = db_complete::<T>(&k_bytes)?; 364 let nsid = key.collection().clone(); 365 let get_counts: GetCounts = Box::new({ 366 let nsid = nsid.clone(); 367 let snapshot = snapshot.clone(); 368 let get_rollup_key = get_rollup_key.clone(); 369 move || { 370 let db_count_bytes = snapshot.get(get_rollup_key(&nsid)?)?.expect( 371 "integrity: all-time rank rollup must have corresponding all-time count rollup", 372 ); 373 Ok(db_complete::<CountsValue>(&db_count_bytes)?) 374 } 375 }); 376 Ok((nsid, get_counts)) 377 }, 378 ))) 379} 380 381type CollectionSerieses = HashMap<Nsid, Vec<CountsValue>>; 382 383impl FjallReader { 384 fn get_storage_stats(&self) -> StorageResult<serde_json::Value> { 385 let rollup_cursor = 386 get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)? 387 .map(|c| c.to_raw_u64()); 388 389 Ok(serde_json::json!({ 390 "keyspace_disk_space": self.keyspace.disk_space(), 391 "keyspace_journal_count": self.keyspace.journal_count(), 392 "keyspace_sequence": self.keyspace.instant(), 393 "rollup_cursor": rollup_cursor, 394 })) 395 } 396 397 fn get_consumer_info(&self) -> StorageResult<ConsumerInfo> { 398 let global = self.global.snapshot(); 399 400 let endpoint = 401 get_snapshot_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)? 402 .ok_or(StorageError::BadStateError( 403 "Could not find jetstream endpoint".to_string(), 404 ))? 405 .0; 406 407 let started_at = get_snapshot_static_neu::<TakeoffKey, TakeoffValue>(&global)? 408 .ok_or(StorageError::BadStateError( 409 "Could not find jetstream takeoff time".to_string(), 410 ))? 411 .to_raw_u64(); 412 413 let latest_cursor = 414 get_snapshot_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)? 415 .map(|c| c.to_raw_u64()); 416 417 let rollup_cursor = 418 get_snapshot_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&global)? 419 .map(|c| c.to_raw_u64()); 420 421 Ok(ConsumerInfo::Jetstream { 422 endpoint, 423 started_at, 424 latest_cursor, 425 rollup_cursor, 426 }) 427 } 428 429 fn get_earliest_hour(&self, rollups: Option<&Snapshot>) -> StorageResult<HourTruncatedCursor> { 430 let cursor = rollups 431 .unwrap_or(&self.rollups.snapshot()) 432 .prefix(HourlyRollupStaticPrefix::default().to_db_bytes()?) 433 .next() 434 .transpose()? 435 .map(|(key_bytes, _)| db_complete::<HourlyRollupKey>(&key_bytes)) 436 .transpose()? 437 .map(|key| key.cursor()) 438 .unwrap_or_else(|| Cursor::from_start().into()); 439 Ok(cursor) 440 } 441 442 fn get_lexi_collections( 443 &self, 444 snapshot: Snapshot, 445 limit: usize, 446 cursor: Option<Vec<u8>>, 447 buckets: Vec<CursorBucket>, 448 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> { 449 let cursor_nsid = cursor.as_deref().map(db_complete::<Nsid>).transpose()?; 450 let mut iters: Vec<Peekable<NsidCounter>> = Vec::with_capacity(buckets.len()); 451 for bucket in &buckets { 452 let it: NsidCounter = match bucket { 453 CursorBucket::Hour(t) => { 454 let start = cursor_nsid 455 .as_ref() 456 .map(|nsid| HourlyRollupKey::after_nsid(*t, nsid)) 457 .unwrap_or_else(|| HourlyRollupKey::start(*t))?; 458 let end = HourlyRollupKey::end(*t)?; 459 get_lexi_iter::<HourlyRollupKey>(&snapshot, start, end)? 460 } 461 CursorBucket::Week(t) => { 462 let start = cursor_nsid 463 .as_ref() 464 .map(|nsid| WeeklyRollupKey::after_nsid(*t, nsid)) 465 .unwrap_or_else(|| WeeklyRollupKey::start(*t))?; 466 let end = WeeklyRollupKey::end(*t)?; 467 get_lexi_iter::<WeeklyRollupKey>(&snapshot, start, end)? 468 } 469 CursorBucket::AllTime => { 470 let start = cursor_nsid 471 .as_ref() 472 .map(AllTimeRollupKey::after_nsid) 473 .unwrap_or_else(AllTimeRollupKey::start)?; 474 let end = AllTimeRollupKey::end()?; 475 get_lexi_iter::<AllTimeRollupKey>(&snapshot, start, end)? 476 } 477 }; 478 iters.push(it.peekable()); 479 } 480 481 let mut out = Vec::new(); 482 let mut current_nsid = None; 483 for _ in 0..limit { 484 // double-scan the iters for each element: this could be eliminated but we're starting simple. 485 // first scan: find the lowest nsid 486 // second scan: take + merge, and advance all iters with lowest nsid 487 let mut lowest: Option<Nsid> = None; 488 for iter in &mut iters { 489 if let Some(bla) = iter.peek_mut() { 490 let (nsid, _) = match bla { 491 Ok(v) => v, 492 Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?, 493 }; 494 lowest = match lowest { 495 Some(ref current) if nsid.as_str() > current.as_str() => lowest, 496 _ => Some(nsid.clone()), 497 }; 498 } 499 } 500 current_nsid = lowest.clone(); 501 let Some(nsid) = lowest else { break }; 502 503 let mut merged = CountsValue::default(); 504 for iter in &mut iters { 505 // unwrap: potential fjall error was already checked & bailed over when peeking in the first loop 506 if let Some(Ok((_, get_counts))) = iter.next_if(|v| v.as_ref().unwrap().0 == nsid) { 507 let counts = get_counts()?; 508 merged.merge(&counts); 509 } 510 } 511 out.push(NsidCount::new(&nsid, &merged)); 512 } 513 514 let next_cursor = current_nsid.map(|s| s.to_db_bytes()).transpose()?; 515 Ok((out, next_cursor)) 516 } 517 518 fn get_ordered_collections( 519 &self, 520 snapshot: Snapshot, 521 limit: usize, 522 order: OrderCollectionsBy, 523 buckets: Vec<CursorBucket>, 524 ) -> StorageResult<Vec<NsidCount>> { 525 let mut iters: Vec<NsidCounter> = Vec::with_capacity(buckets.len()); 526 527 for bucket in buckets { 528 let it: NsidCounter = match (&order, bucket) { 529 (OrderCollectionsBy::RecordsCreated, CursorBucket::Hour(t)) => { 530 get_lookup_iter::<HourlyRecordsKey>( 531 snapshot.clone(), 532 HourlyRecordsKey::start(t)?, 533 HourlyRecordsKey::end(t)?, 534 Arc::new({ 535 move |collection| HourlyRollupKey::new(t, collection).to_db_bytes() 536 }), 537 )? 538 } 539 (OrderCollectionsBy::DidsEstimate, CursorBucket::Hour(t)) => { 540 get_lookup_iter::<HourlyDidsKey>( 541 snapshot.clone(), 542 HourlyDidsKey::start(t)?, 543 HourlyDidsKey::end(t)?, 544 Arc::new({ 545 move |collection| HourlyRollupKey::new(t, collection).to_db_bytes() 546 }), 547 )? 548 } 549 (OrderCollectionsBy::RecordsCreated, CursorBucket::Week(t)) => { 550 get_lookup_iter::<WeeklyRecordsKey>( 551 snapshot.clone(), 552 WeeklyRecordsKey::start(t)?, 553 WeeklyRecordsKey::end(t)?, 554 Arc::new({ 555 move |collection| WeeklyRollupKey::new(t, collection).to_db_bytes() 556 }), 557 )? 558 } 559 (OrderCollectionsBy::DidsEstimate, CursorBucket::Week(t)) => { 560 get_lookup_iter::<WeeklyDidsKey>( 561 snapshot.clone(), 562 WeeklyDidsKey::start(t)?, 563 WeeklyDidsKey::end(t)?, 564 Arc::new({ 565 move |collection| WeeklyRollupKey::new(t, collection).to_db_bytes() 566 }), 567 )? 568 } 569 (OrderCollectionsBy::RecordsCreated, CursorBucket::AllTime) => { 570 get_lookup_iter::<AllTimeRecordsKey>( 571 snapshot.clone(), 572 AllTimeRecordsKey::start()?, 573 AllTimeRecordsKey::end()?, 574 Arc::new(|collection| AllTimeRollupKey::new(collection).to_db_bytes()), 575 )? 576 } 577 (OrderCollectionsBy::DidsEstimate, CursorBucket::AllTime) => { 578 get_lookup_iter::<AllTimeDidsKey>( 579 snapshot.clone(), 580 AllTimeDidsKey::start()?, 581 AllTimeDidsKey::end()?, 582 Arc::new(|collection| AllTimeRollupKey::new(collection).to_db_bytes()), 583 )? 584 } 585 (OrderCollectionsBy::Lexi { .. }, _) => unreachable!(), 586 }; 587 iters.push(it); 588 } 589 590 // overfetch by taking a bit more than the limit 591 // merge by collection 592 // sort by requested order, take limit, discard all remaining 593 // 594 // this isn't guaranteed to be correct, but it will hopefully be close most of the time: 595 // - it's possible that some NSIDs might score low during some time-buckets, and miss being merged 596 // - overfetching hopefully helps a bit by catching nsids near the threshold more often, but. yeah. 597 // 598 // this thing is heavy, there's probably a better way 599 let mut ranked: HashMap<Nsid, CountsValue> = HashMap::with_capacity(limit * 2); 600 for iter in iters { 601 for pair in iter.take((limit as f64 * 1.3).ceil() as usize) { 602 let (nsid, get_counts) = pair?; 603 let counts = get_counts()?; 604 ranked.entry(nsid).or_default().merge(&counts); 605 } 606 } 607 let mut ranked: Vec<(Nsid, CountsValue)> = ranked.into_iter().collect(); 608 match order { 609 OrderCollectionsBy::RecordsCreated => ranked.sort_by_key(|(_, c)| c.counts().creates), 610 OrderCollectionsBy::DidsEstimate => ranked.sort_by_key(|(_, c)| c.dids().estimate()), 611 OrderCollectionsBy::Lexi { .. } => unreachable!(), 612 } 613 let counts = ranked 614 .into_iter() 615 .rev() 616 .take(limit) 617 .map(|(nsid, cv)| NsidCount::new(&nsid, &cv)) 618 .collect(); 619 Ok(counts) 620 } 621 622 fn get_collections( 623 &self, 624 limit: usize, 625 order: OrderCollectionsBy, 626 since: Option<HourTruncatedCursor>, 627 until: Option<HourTruncatedCursor>, 628 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> { 629 let snapshot = self.rollups.snapshot(); 630 let buckets = if let (None, None) = (since, until) { 631 vec![CursorBucket::AllTime] 632 } else { 633 let mut lower = self.get_earliest_hour(Some(&snapshot))?; 634 if let Some(specified) = since { 635 if specified > lower { 636 lower = specified; 637 } 638 } 639 let upper = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into()); 640 CursorBucket::buckets_spanning(lower, upper) 641 }; 642 match order { 643 OrderCollectionsBy::Lexi { cursor } => { 644 self.get_lexi_collections(snapshot, limit, cursor, buckets) 645 } 646 _ => Ok(( 647 self.get_ordered_collections(snapshot, limit, order, buckets)?, 648 None, 649 )), 650 } 651 } 652 653 fn get_lexi_prefix( 654 &self, 655 snapshot: Snapshot, 656 prefix: NsidPrefix, 657 limit: usize, 658 cursor: Option<Vec<u8>>, 659 buckets: Vec<CursorBucket>, 660 ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)> { 661 // let prefix_sub_with_null = prefix.as_str().to_string().to_db_bytes()?; 662 let prefix_sub = String::sub_prefix(&prefix.terminated())?; // with trailing dot to ensure full segment match 663 let cursor_child = cursor 664 .as_deref() 665 .map(|encoded_bytes| { 666 let decoded: String = db_complete(encoded_bytes)?; 667 // TODO: write some tests for cursors, there's probably bugs here 668 let as_sub_prefix_with_null = decoded.to_db_bytes()?; 669 Ok::<_, EncodingError>(as_sub_prefix_with_null) 670 }) 671 .transpose()?; 672 let mut iters: Vec<NsidCounter> = Vec::with_capacity(buckets.len()); 673 for bucket in &buckets { 674 let it: NsidCounter = match bucket { 675 CursorBucket::Hour(t) => { 676 let start = cursor_child 677 .as_ref() 678 .map(|child| HourlyRollupKey::after_nsid_prefix(*t, child)) 679 .unwrap_or_else(|| HourlyRollupKey::after_nsid_prefix(*t, &prefix_sub))?; 680 let end = HourlyRollupKey::nsid_prefix_end(*t, &prefix_sub)?; 681 get_lexi_iter::<HourlyRollupKey>(&snapshot, start, end)? 682 } 683 CursorBucket::Week(t) => { 684 let start = cursor_child 685 .as_ref() 686 .map(|child| WeeklyRollupKey::after_nsid_prefix(*t, child)) 687 .unwrap_or_else(|| WeeklyRollupKey::after_nsid_prefix(*t, &prefix_sub))?; 688 let end = WeeklyRollupKey::nsid_prefix_end(*t, &prefix_sub)?; 689 get_lexi_iter::<WeeklyRollupKey>(&snapshot, start, end)? 690 } 691 CursorBucket::AllTime => { 692 let start = cursor_child 693 .as_ref() 694 .map(|child| AllTimeRollupKey::after_nsid_prefix(child)) 695 .unwrap_or_else(|| AllTimeRollupKey::after_nsid_prefix(&prefix_sub))?; 696 let end = AllTimeRollupKey::nsid_prefix_end(&prefix_sub)?; 697 get_lexi_iter::<AllTimeRollupKey>(&snapshot, start, end)? 698 } 699 }; 700 iters.push(it); 701 } 702 703 // with apologies 704 let mut iters: Vec<_> = iters 705 .into_iter() 706 .map(|it| { 707 it.map(|bla| { 708 bla.map(|(nsid, v)| { 709 let Some(child) = Child::from_prefix(&nsid, &prefix) else { 710 panic!("failed from_prefix: {nsid:?} {prefix:?} (bad iter bounds?)"); 711 }; 712 (child, v) 713 }) 714 }) 715 .peekable() 716 }) 717 .collect(); 718 719 let mut items = Vec::new(); 720 let mut prefix_count = CountsValue::default(); 721 #[derive(Debug, Clone, PartialEq)] 722 enum Child { 723 FullNsid(Nsid), 724 ChildPrefix(String), 725 } 726 impl Child { 727 fn from_prefix(nsid: &Nsid, prefix: &NsidPrefix) -> Option<Self> { 728 if prefix.is_group_of(nsid) { 729 return Some(Child::FullNsid(nsid.clone())); 730 } 731 let suffix = nsid.as_str().strip_prefix(&format!("{}.", prefix.0))?; 732 let (segment, _) = suffix.split_once('.').unwrap(); 733 let child_prefix = format!("{}.{segment}", prefix.0); 734 Some(Child::ChildPrefix(child_prefix)) 735 } 736 fn is_before(&self, other: &Child) -> bool { 737 match (self, other) { 738 (Child::FullNsid(s), Child::ChildPrefix(o)) if s.as_str() == o => true, 739 (Child::ChildPrefix(s), Child::FullNsid(o)) if s == o.as_str() => false, 740 (Child::FullNsid(s), Child::FullNsid(o)) => s.as_str() < o.as_str(), 741 (Child::ChildPrefix(s), Child::ChildPrefix(o)) => s < o, 742 (Child::FullNsid(s), Child::ChildPrefix(o)) => s.to_string() < *o, 743 (Child::ChildPrefix(s), Child::FullNsid(o)) => *s < o.to_string(), 744 } 745 } 746 fn into_inner(self) -> String { 747 match self { 748 Child::FullNsid(s) => s.to_string(), 749 Child::ChildPrefix(s) => s, 750 } 751 } 752 } 753 let mut current_child: Option<Child> = None; 754 for _ in 0..limit { 755 // double-scan the iters for each element: this could be eliminated but we're starting simple. 756 // first scan: find the lowest nsid 757 // second scan: take + merge, and advance all iters with lowest nsid 758 let mut lowest: Option<Child> = None; 759 for iter in &mut iters { 760 if let Some(bla) = iter.peek_mut() { 761 let (child, _) = match bla { 762 Ok(v) => v, 763 Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?, 764 }; 765 766 lowest = match lowest { 767 Some(ref current) if current.is_before(child) => lowest, 768 _ => Some(child.clone()), 769 }; 770 } 771 } 772 current_child = lowest.clone(); 773 let Some(child) = lowest else { break }; 774 775 let mut merged = CountsValue::default(); 776 for iter in &mut iters { 777 // unwrap: potential fjall error was already checked & bailed over when peeking in the first loop 778 while let Some(Ok((_, get_counts))) = 779 iter.next_if(|v| v.as_ref().unwrap().0 == child) 780 { 781 let counts = get_counts()?; 782 prefix_count.merge(&counts); 783 merged.merge(&counts); 784 } 785 } 786 items.push(match child { 787 Child::FullNsid(nsid) => PrefixChild::Collection(NsidCount::new(&nsid, &merged)), 788 Child::ChildPrefix(prefix) => { 789 PrefixChild::Prefix(PrefixCount::new(&prefix, &merged)) 790 } 791 }); 792 } 793 794 // TODO: could serialize the prefix count (with sketch) into the cursor so that uniqs can actually count up? 795 // ....er the sketch is probably too big 796 // TODO: this is probably buggy on child-type boundaries bleh 797 let next_cursor = current_child 798 .map(|s| s.into_inner().to_db_bytes()) 799 .transpose()?; 800 801 Ok(((&prefix_count).into(), items, next_cursor)) 802 } 803 804 fn get_prefix( 805 &self, 806 prefix: NsidPrefix, 807 limit: usize, 808 order: OrderCollectionsBy, 809 since: Option<HourTruncatedCursor>, 810 until: Option<HourTruncatedCursor>, 811 ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)> { 812 let snapshot = self.rollups.snapshot(); 813 let buckets = if let (None, None) = (since, until) { 814 vec![CursorBucket::AllTime] 815 } else { 816 let mut lower = self.get_earliest_hour(Some(&snapshot))?; 817 if let Some(specified) = since { 818 if specified > lower { 819 lower = specified; 820 } 821 } 822 let upper = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into()); 823 CursorBucket::buckets_spanning(lower, upper) 824 }; 825 match order { 826 OrderCollectionsBy::Lexi { cursor } => { 827 self.get_lexi_prefix(snapshot, prefix, limit, cursor, buckets) 828 } 829 _ => todo!(), 830 } 831 } 832 833 /// - step: output series time step, in seconds 834 fn get_timeseries( 835 &self, 836 collections: Vec<Nsid>, 837 since: HourTruncatedCursor, 838 until: Option<HourTruncatedCursor>, 839 step: u64, 840 ) -> StorageResult<(Vec<HourTruncatedCursor>, CollectionSerieses)> { 841 if step > WEEK_IN_MICROS { 842 panic!("week-stepping is todo"); 843 } 844 let until = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into()); 845 let Ok(dt) = Cursor::from(until).duration_since(&Cursor::from(since)) else { 846 return Ok(( 847 // empty: until < since 848 vec![], 849 collections.into_iter().map(|c| (c, vec![])).collect(), 850 )); 851 }; 852 let n_hours = (dt.as_micros() as u64) / HOUR_IN_MICROS; 853 let mut counts_by_hour = Vec::with_capacity(n_hours as usize); 854 let snapshot = self.rollups.snapshot(); 855 for hour in (0..n_hours).map(|i| since.nth_next(i)) { 856 let mut counts = Vec::with_capacity(collections.len()); 857 for nsid in &collections { 858 let count = snapshot 859 .get(&HourlyRollupKey::new(hour, nsid).to_db_bytes()?)? 860 .as_deref() 861 .map(db_complete::<CountsValue>) 862 .transpose()? 863 .unwrap_or_default(); 864 counts.push(count); 865 } 866 counts_by_hour.push((hour, counts)); 867 } 868 869 let step_hours = step / (HOUR_IN_MICROS / 1_000_000); 870 let mut output_hours = Vec::with_capacity(step_hours as usize); 871 let mut output_series: CollectionSerieses = collections 872 .iter() 873 .map(|c| (c.clone(), Vec::with_capacity(step_hours as usize))) 874 .collect(); 875 876 for chunk in counts_by_hour.chunks(step_hours as usize) { 877 output_hours.push(chunk[0].0); // always guaranteed to have at least one element in a chunks chunk 878 for (i, collection) in collections.iter().enumerate() { 879 let mut c = CountsValue::default(); 880 for (_, counts) in chunk { 881 c.merge(&counts[i]); 882 } 883 output_series 884 .get_mut(collection) 885 .expect("output series is initialized with all collections") 886 .push(c); 887 } 888 } 889 890 Ok((output_hours, output_series)) 891 } 892 893 fn get_collection_counts( 894 &self, 895 collection: &Nsid, 896 since: HourTruncatedCursor, 897 until: Option<HourTruncatedCursor>, 898 ) -> StorageResult<JustCount> { 899 // grab snapshots in case rollups happen while we're working 900 let rollups = self.rollups.snapshot(); 901 902 let until = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into()); 903 let buckets = CursorBucket::buckets_spanning(since, until); 904 let mut total_counts = CountsValue::default(); 905 906 for bucket in buckets { 907 let key = match bucket { 908 CursorBucket::Hour(t) => HourlyRollupKey::new(t, collection).to_db_bytes()?, 909 CursorBucket::Week(t) => WeeklyRollupKey::new(t, collection).to_db_bytes()?, 910 CursorBucket::AllTime => unreachable!(), // TODO: fall back on this if the time span spans the whole dataset? 911 }; 912 let count = rollups 913 .get(&key)? 914 .as_deref() 915 .map(db_complete::<CountsValue>) 916 .transpose()? 917 .unwrap_or_default(); 918 total_counts.merge(&count); 919 } 920 921 Ok((&total_counts).into()) 922 } 923 924 fn get_records_by_collections( 925 &self, 926 collections: HashSet<Nsid>, 927 limit: usize, 928 expand_each_collection: bool, 929 ) -> StorageResult<Vec<UFOsRecord>> { 930 if collections.is_empty() { 931 return Ok(vec![]); 932 } 933 let mut record_iterators = Vec::new(); 934 for collection in collections { 935 let iter = RecordIterator::new(&self.feeds, self.records.clone(), &collection, limit)?; 936 record_iterators.push(iter.peekable()); 937 } 938 let mut merged = Vec::new(); 939 loop { 940 let mut latest: Option<(Cursor, usize)> = None; // ugh 941 for (i, iter) in record_iterators.iter_mut().enumerate() { 942 let Some(it) = iter.peek_mut() else { 943 continue; 944 }; 945 let it = match it { 946 Ok(v) => v, 947 Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?, 948 }; 949 let Some(rec) = it else { 950 if expand_each_collection { 951 continue; 952 } else { 953 break; 954 } 955 }; 956 if let Some((cursor, _)) = latest { 957 if rec.cursor > cursor { 958 latest = Some((rec.cursor, i)) 959 } 960 } else { 961 latest = Some((rec.cursor, i)); 962 } 963 } 964 let Some((_, idx)) = latest else { 965 break; 966 }; 967 // yeah yeah whateverrrrrrrrrrrrrrrr 968 merged.push(record_iterators[idx].next().unwrap().unwrap().unwrap()); 969 } 970 Ok(merged) 971 } 972 973 fn search_collections(&self, terms: Vec<String>) -> StorageResult<Vec<NsidCount>> { 974 let start = AllTimeRollupKey::start()?; 975 let end = AllTimeRollupKey::end()?; 976 let mut matches = Vec::new(); 977 let limit = 16; // TODO: param 978 for kv in self.rollups.range((start, end)) { 979 let (key_bytes, val_bytes) = kv?; 980 let key = db_complete::<AllTimeRollupKey>(&key_bytes)?; 981 let nsid = key.collection(); 982 for term in &terms { 983 if nsid.contains(term) { 984 let counts = db_complete::<CountsValue>(&val_bytes)?; 985 matches.push(NsidCount::new(nsid, &counts)); 986 break; 987 } 988 } 989 if matches.len() >= limit { 990 break; 991 } 992 } 993 // TODO: indicate incomplete results 994 Ok(matches) 995 } 996} 997 998#[async_trait] 999impl StoreReader for FjallReader { 1000 fn name(&self) -> String { 1001 "fjall storage v2".into() 1002 } 1003 async fn get_storage_stats(&self) -> StorageResult<serde_json::Value> { 1004 let s = self.clone(); 1005 tokio::task::spawn_blocking(move || FjallReader::get_storage_stats(&s)).await? 1006 } 1007 async fn get_consumer_info(&self) -> StorageResult<ConsumerInfo> { 1008 let s = self.clone(); 1009 tokio::task::spawn_blocking(move || FjallReader::get_consumer_info(&s)).await? 1010 } 1011 async fn get_collections( 1012 &self, 1013 limit: usize, 1014 order: OrderCollectionsBy, 1015 since: Option<HourTruncatedCursor>, 1016 until: Option<HourTruncatedCursor>, 1017 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> { 1018 let s = self.clone(); 1019 tokio::task::spawn_blocking(move || { 1020 FjallReader::get_collections(&s, limit, order, since, until) 1021 }) 1022 .await? 1023 } 1024 async fn get_prefix( 1025 &self, 1026 prefix: NsidPrefix, 1027 limit: usize, 1028 order: OrderCollectionsBy, 1029 since: Option<HourTruncatedCursor>, 1030 until: Option<HourTruncatedCursor>, 1031 ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)> { 1032 let s = self.clone(); 1033 tokio::task::spawn_blocking(move || { 1034 FjallReader::get_prefix(&s, prefix, limit, order, since, until) 1035 }) 1036 .await? 1037 } 1038 async fn get_timeseries( 1039 &self, 1040 collections: Vec<Nsid>, 1041 since: HourTruncatedCursor, 1042 until: Option<HourTruncatedCursor>, 1043 step: u64, 1044 ) -> StorageResult<(Vec<HourTruncatedCursor>, CollectionSerieses)> { 1045 let s = self.clone(); 1046 tokio::task::spawn_blocking(move || { 1047 FjallReader::get_timeseries(&s, collections, since, until, step) 1048 }) 1049 .await? 1050 } 1051 async fn get_collection_counts( 1052 &self, 1053 collection: &Nsid, 1054 since: HourTruncatedCursor, 1055 until: Option<HourTruncatedCursor>, 1056 ) -> StorageResult<JustCount> { 1057 let s = self.clone(); 1058 let collection = collection.clone(); 1059 tokio::task::spawn_blocking(move || { 1060 FjallReader::get_collection_counts(&s, &collection, since, until) 1061 }) 1062 .await? 1063 } 1064 async fn get_records_by_collections( 1065 &self, 1066 collections: HashSet<Nsid>, 1067 limit: usize, 1068 expand_each_collection: bool, 1069 ) -> StorageResult<Vec<UFOsRecord>> { 1070 let s = self.clone(); 1071 tokio::task::spawn_blocking(move || { 1072 FjallReader::get_records_by_collections(&s, collections, limit, expand_each_collection) 1073 }) 1074 .await? 1075 } 1076 async fn search_collections(&self, terms: Vec<String>) -> StorageResult<Vec<NsidCount>> { 1077 let s = self.clone(); 1078 tokio::task::spawn_blocking(move || FjallReader::search_collections(&s, terms)).await? 1079 } 1080} 1081 1082#[derive(Clone)] 1083pub struct FjallWriter { 1084 bg_taken: Arc<AtomicBool>, 1085 keyspace: Keyspace, 1086 global: PartitionHandle, 1087 feeds: PartitionHandle, 1088 records: PartitionHandle, 1089 rollups: PartitionHandle, 1090 queues: PartitionHandle, 1091} 1092 1093impl FjallWriter { 1094 fn rollup_delete_account( 1095 &mut self, 1096 cursor: Cursor, 1097 key_bytes: &[u8], 1098 val_bytes: &[u8], 1099 ) -> StorageResult<usize> { 1100 let did = db_complete::<DeleteAccountQueueVal>(val_bytes)?; 1101 self.delete_account(&did)?; 1102 let mut batch = self.keyspace.batch(); 1103 batch.remove(&self.queues, key_bytes); 1104 insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, cursor)?; 1105 batch.commit()?; 1106 Ok(1) 1107 } 1108 1109 fn rollup_live_counts( 1110 &mut self, 1111 timelies: impl Iterator<Item = Result<(fjall::Slice, fjall::Slice), fjall::Error>>, 1112 cursor_exclusive_limit: Option<Cursor>, 1113 rollup_limit: usize, 1114 ) -> StorageResult<(usize, HashSet<Nsid>)> { 1115 // current strategy is to buffer counts in mem before writing the rollups 1116 // we *could* read+write every single batch to rollup.. but their merge is associative so 1117 // ...so save the db some work up front? is this worth it? who knows... 1118 1119 let mut dirty_nsids = HashSet::new(); 1120 1121 #[derive(Eq, Hash, PartialEq)] 1122 enum Rollup { 1123 Hourly(HourTruncatedCursor), 1124 Weekly(WeekTruncatedCursor), 1125 AllTime, 1126 } 1127 1128 let mut batch = self.keyspace.batch(); 1129 let mut cursors_advanced = 0; 1130 let mut last_cursor = Cursor::from_start(); 1131 let mut counts_by_rollup: HashMap<(Nsid, Rollup), CountsValue> = HashMap::new(); 1132 1133 for (i, kv) in timelies.enumerate() { 1134 if i >= rollup_limit { 1135 break; 1136 } 1137 1138 let (key_bytes, val_bytes) = kv?; 1139 let key = db_complete::<LiveCountsKey>(&key_bytes)?; 1140 1141 if cursor_exclusive_limit 1142 .map(|limit| key.cursor() > limit) 1143 .unwrap_or(false) 1144 { 1145 break; 1146 } 1147 1148 dirty_nsids.insert(key.collection().clone()); 1149 1150 batch.remove(&self.rollups, key_bytes); 1151 let val = db_complete::<CountsValue>(&val_bytes)?; 1152 counts_by_rollup 1153 .entry(( 1154 key.collection().clone(), 1155 Rollup::Hourly(key.cursor().into()), 1156 )) 1157 .or_default() 1158 .merge(&val); 1159 counts_by_rollup 1160 .entry(( 1161 key.collection().clone(), 1162 Rollup::Weekly(key.cursor().into()), 1163 )) 1164 .or_default() 1165 .merge(&val); 1166 counts_by_rollup 1167 .entry((key.collection().clone(), Rollup::AllTime)) 1168 .or_default() 1169 .merge(&val); 1170 1171 cursors_advanced += 1; 1172 last_cursor = key.cursor(); 1173 } 1174 1175 // go through each new rollup thing and merge it with whatever might already be in the db 1176 for ((nsid, rollup), counts) in counts_by_rollup { 1177 let rollup_key_bytes = match rollup { 1178 Rollup::Hourly(hourly_cursor) => { 1179 HourlyRollupKey::new(hourly_cursor, &nsid).to_db_bytes()? 1180 } 1181 Rollup::Weekly(weekly_cursor) => { 1182 WeeklyRollupKey::new(weekly_cursor, &nsid).to_db_bytes()? 1183 } 1184 Rollup::AllTime => AllTimeRollupKey::new(&nsid).to_db_bytes()?, 1185 }; 1186 let mut rolled: CountsValue = self 1187 .rollups 1188 .get(&rollup_key_bytes)? 1189 .as_deref() 1190 .map(db_complete::<CountsValue>) 1191 .transpose()? 1192 .unwrap_or_default(); 1193 1194 // now that we have values, we can know the exising ranks 1195 let before_creates_count = rolled.counts().creates; 1196 let before_dids_estimate = rolled.dids().estimate() as u64; 1197 1198 // update the rollup 1199 rolled.merge(&counts); 1200 1201 // new ranks 1202 let new_creates_count = rolled.counts().creates; 1203 let new_dids_estimate = rolled.dids().estimate() as u64; 1204 1205 // update create-ranked secondary index if rank changed 1206 if new_creates_count != before_creates_count { 1207 let (old_k, new_k) = match rollup { 1208 Rollup::Hourly(cursor) => ( 1209 HourlyRecordsKey::new(cursor, before_creates_count.into(), &nsid) 1210 .to_db_bytes()?, 1211 HourlyRecordsKey::new(cursor, new_creates_count.into(), &nsid) 1212 .to_db_bytes()?, 1213 ), 1214 Rollup::Weekly(cursor) => ( 1215 WeeklyRecordsKey::new(cursor, before_creates_count.into(), &nsid) 1216 .to_db_bytes()?, 1217 WeeklyRecordsKey::new(cursor, new_creates_count.into(), &nsid) 1218 .to_db_bytes()?, 1219 ), 1220 Rollup::AllTime => ( 1221 AllTimeRecordsKey::new(before_creates_count.into(), &nsid).to_db_bytes()?, 1222 AllTimeRecordsKey::new(new_creates_count.into(), &nsid).to_db_bytes()?, 1223 ), 1224 }; 1225 batch.remove(&self.rollups, &old_k); // TODO: when fjall gets weak delete, this will hopefully work way better 1226 batch.insert(&self.rollups, &new_k, ""); 1227 } 1228 1229 // update dids-ranked secondary index if rank changed 1230 if new_dids_estimate != before_dids_estimate { 1231 let (old_k, new_k) = match rollup { 1232 Rollup::Hourly(cursor) => ( 1233 HourlyDidsKey::new(cursor, before_dids_estimate.into(), &nsid) 1234 .to_db_bytes()?, 1235 HourlyDidsKey::new(cursor, new_dids_estimate.into(), &nsid) 1236 .to_db_bytes()?, 1237 ), 1238 Rollup::Weekly(cursor) => ( 1239 WeeklyDidsKey::new(cursor, before_dids_estimate.into(), &nsid) 1240 .to_db_bytes()?, 1241 WeeklyDidsKey::new(cursor, new_dids_estimate.into(), &nsid) 1242 .to_db_bytes()?, 1243 ), 1244 Rollup::AllTime => ( 1245 AllTimeDidsKey::new(before_dids_estimate.into(), &nsid).to_db_bytes()?, 1246 AllTimeDidsKey::new(new_dids_estimate.into(), &nsid).to_db_bytes()?, 1247 ), 1248 }; 1249 batch.remove(&self.rollups, &old_k); // TODO: when fjall gets weak delete, this will hopefully work way better 1250 batch.insert(&self.rollups, &new_k, ""); 1251 } 1252 1253 // replace the main counts rollup 1254 batch.insert(&self.rollups, &rollup_key_bytes, &rolled.to_db_bytes()?); 1255 } 1256 1257 insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, last_cursor)?; 1258 1259 batch.commit()?; 1260 Ok((cursors_advanced, dirty_nsids)) 1261 } 1262} 1263 1264impl StoreWriter<FjallBackground> for FjallWriter { 1265 fn background_tasks(&mut self, reroll: bool) -> StorageResult<FjallBackground> { 1266 if self.bg_taken.swap(true, Ordering::SeqCst) { 1267 return Err(StorageError::BackgroundAlreadyStarted); 1268 } 1269 describe_histogram!( 1270 "storage_trim_dirty_nsids", 1271 Unit::Count, 1272 "number of NSIDs trimmed" 1273 ); 1274 describe_histogram!( 1275 "storage_trim_duration", 1276 Unit::Microseconds, 1277 "how long it took to trim the dirty NSIDs" 1278 ); 1279 describe_counter!( 1280 "storage_trim_removed", 1281 Unit::Count, 1282 "how many records were removed during trim" 1283 ); 1284 if reroll { 1285 log::info!("reroll: resetting rollup cursor..."); 1286 insert_static_neu::<NewRollupCursorKey>(&self.global, Cursor::from_start())?; 1287 log::info!("reroll: clearing trim cursors..."); 1288 let mut batch = self.keyspace.batch(); 1289 for kv in self 1290 .global 1291 .prefix(TrimCollectionCursorKey::from_prefix_to_db_bytes( 1292 &Default::default(), 1293 )?) 1294 { 1295 let (k, _) = kv?; 1296 batch.remove(&self.global, k); 1297 } 1298 let n = batch.len(); 1299 batch.commit()?; 1300 log::info!("reroll: cleared {n} trim cursors."); 1301 } 1302 Ok(FjallBackground(self.clone())) 1303 } 1304 1305 fn insert_batch<const LIMIT: usize>( 1306 &mut self, 1307 event_batch: EventBatch<LIMIT>, 1308 ) -> StorageResult<()> { 1309 if event_batch.is_empty() { 1310 return Ok(()); 1311 } 1312 1313 let mut batch = self.keyspace.batch(); 1314 1315 // would be nice not to have to iterate everything at once here 1316 let latest = event_batch.latest_cursor().unwrap(); 1317 1318 for (nsid, commits) in event_batch.commits_by_nsid { 1319 for commit in commits.commits { 1320 let location_key: RecordLocationKey = (&commit, &nsid).into(); 1321 1322 match commit.action { 1323 CommitAction::Cut => { 1324 batch.remove(&self.records, &location_key.to_db_bytes()?); 1325 } 1326 CommitAction::Put(put_action) => { 1327 let feed_key = NsidRecordFeedKey::from_pair(nsid.clone(), commit.cursor); 1328 let feed_val: NsidRecordFeedVal = 1329 (&commit.did, &commit.rkey, commit.rev.as_str()).into(); 1330 batch.insert( 1331 &self.feeds, 1332 feed_key.to_db_bytes()?, 1333 feed_val.to_db_bytes()?, 1334 ); 1335 1336 let location_val: RecordLocationVal = 1337 (commit.cursor, commit.rev.as_str(), put_action).into(); 1338 batch.insert( 1339 &self.records, 1340 &location_key.to_db_bytes()?, 1341 &location_val.to_db_bytes()?, 1342 ); 1343 } 1344 } 1345 } 1346 let live_counts_key: LiveCountsKey = (latest, &nsid).into(); 1347 let counts_value = CountsValue::new( 1348 CommitCounts { 1349 creates: commits.creates as u64, 1350 updates: commits.updates as u64, 1351 deletes: commits.deletes as u64, 1352 }, 1353 commits.dids_estimate, 1354 ); 1355 batch.insert( 1356 &self.rollups, 1357 &live_counts_key.to_db_bytes()?, 1358 &counts_value.to_db_bytes()?, 1359 ); 1360 } 1361 1362 for remove in event_batch.account_removes { 1363 let queue_key = DeleteAccountQueueKey::new(remove.cursor); 1364 let queue_val: DeleteAccountQueueVal = remove.did; 1365 batch.insert( 1366 &self.queues, 1367 &queue_key.to_db_bytes()?, 1368 &queue_val.to_db_bytes()?, 1369 ); 1370 } 1371 1372 batch.insert( 1373 &self.global, 1374 DbStaticStr::<JetstreamCursorKey>::default().to_db_bytes()?, 1375 latest.to_db_bytes()?, 1376 ); 1377 1378 batch.commit()?; 1379 Ok(()) 1380 } 1381 1382 fn step_rollup(&mut self) -> StorageResult<(usize, HashSet<Nsid>)> { 1383 let mut dirty_nsids = HashSet::new(); 1384 1385 let rollup_cursor = 1386 get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?.ok_or( 1387 StorageError::BadStateError("Could not find current rollup cursor".to_string()), 1388 )?; 1389 1390 // timelies 1391 let live_counts_range = LiveCountsKey::range_from_cursor(rollup_cursor)?; 1392 let mut timely_iter = self.rollups.range(live_counts_range).peekable(); 1393 1394 let timely_next = timely_iter 1395 .peek_mut() 1396 .map(|kv| -> StorageResult<LiveCountsKey> { 1397 match kv { 1398 Err(e) => Err(std::mem::replace(e, fjall::Error::Poisoned))?, 1399 Ok((key_bytes, _)) => { 1400 let key = db_complete::<LiveCountsKey>(key_bytes)?; 1401 Ok(key) 1402 } 1403 } 1404 }) 1405 .transpose()?; 1406 1407 // delete accounts 1408 let delete_accounts_range = 1409 DeleteAccountQueueKey::new(rollup_cursor).range_to_prefix_end()?; 1410 1411 let next_delete = self 1412 .queues 1413 .range(delete_accounts_range) 1414 .next() 1415 .transpose()? 1416 .map(|(key_bytes, val_bytes)| { 1417 db_complete::<DeleteAccountQueueKey>(&key_bytes) 1418 .map(|k| (k.suffix, key_bytes, val_bytes)) 1419 }) 1420 .transpose()?; 1421 1422 let cursors_stepped = match (timely_next, next_delete) { 1423 (Some(timely), Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => { 1424 if timely.cursor() < delete_cursor { 1425 let (n, dirty) = self.rollup_live_counts( 1426 timely_iter, 1427 Some(delete_cursor), 1428 MAX_BATCHED_ROLLUP_COUNTS, 1429 )?; 1430 dirty_nsids.extend(dirty); 1431 n 1432 } else { 1433 self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)? 1434 } 1435 } 1436 (Some(_), None) => { 1437 let (n, dirty) = 1438 self.rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS)?; 1439 dirty_nsids.extend(dirty); 1440 n 1441 } 1442 (None, Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => { 1443 self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)? 1444 } 1445 (None, None) => 0, 1446 }; 1447 1448 Ok((cursors_stepped, dirty_nsids)) 1449 } 1450 1451 fn trim_collection( 1452 &mut self, 1453 collection: &Nsid, 1454 limit: usize, 1455 full_scan: bool, 1456 ) -> StorageResult<(usize, usize, bool)> { 1457 let mut dangling_feed_keys_cleaned = 0; 1458 let mut records_deleted = 0; 1459 1460 let live_range = if full_scan { 1461 let start = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?; 1462 let end = NsidRecordFeedKey::prefix_range_end(collection)?; 1463 start..end 1464 } else { 1465 let feed_trim_cursor_key = 1466 TrimCollectionCursorKey::new(collection.clone()).to_db_bytes()?; 1467 let trim_cursor = self 1468 .global 1469 .get(&feed_trim_cursor_key)? 1470 .map(|value_bytes| db_complete(&value_bytes)) 1471 .transpose()? 1472 .unwrap_or(Cursor::from_start()); 1473 NsidRecordFeedKey::from_pair(collection.clone(), trim_cursor).range_to_prefix_end()? 1474 }; 1475 1476 let mut live_records_found = 0; 1477 let mut candidate_new_feed_lower_cursor = None; 1478 let ended_early = false; 1479 let mut current_cursor: Option<Cursor> = None; 1480 for (i, kv) in self.feeds.range(live_range).rev().enumerate() { 1481 if i > 0 && i % 500_000 == 0 { 1482 log::info!( 1483 "trim: at {i} for {:?} (now at {})", 1484 collection.to_string(), 1485 current_cursor 1486 .map(|c| c 1487 .elapsed() 1488 .map(nice_duration) 1489 .unwrap_or("[not past]".into())) 1490 .unwrap_or("??".into()), 1491 ); 1492 } 1493 let (key_bytes, val_bytes) = kv?; 1494 let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?; 1495 let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?; 1496 let location_key: RecordLocationKey = (&feed_key, &feed_val).into(); 1497 let location_key_bytes = location_key.to_db_bytes()?; 1498 1499 let Some(location_val_bytes) = self.records.get(&location_key_bytes)? else { 1500 // record was deleted (hopefully) 1501 self.feeds.remove(&*key_bytes)?; 1502 dangling_feed_keys_cleaned += 1; 1503 continue; 1504 }; 1505 1506 let (meta, _) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?; 1507 current_cursor = Some(meta.cursor()); 1508 1509 if meta.cursor() != feed_key.cursor() { 1510 // older/different version 1511 self.feeds.remove(&*key_bytes)?; 1512 dangling_feed_keys_cleaned += 1; 1513 continue; 1514 } 1515 if meta.rev != feed_val.rev() { 1516 // weird... 1517 log::warn!("record lookup: cursor match but rev did not...? removing."); 1518 self.records.remove(&location_key_bytes)?; 1519 self.feeds.remove(&*key_bytes)?; 1520 dangling_feed_keys_cleaned += 1; 1521 continue; 1522 } 1523 1524 live_records_found += 1; 1525 if live_records_found <= limit { 1526 continue; 1527 } 1528 if candidate_new_feed_lower_cursor.is_none() { 1529 candidate_new_feed_lower_cursor = Some(feed_key.cursor()); 1530 } 1531 1532 self.feeds.remove(&location_key_bytes)?; 1533 self.feeds.remove(key_bytes)?; 1534 records_deleted += 1; 1535 } 1536 1537 if !ended_early { 1538 if let Some(new_cursor) = candidate_new_feed_lower_cursor { 1539 self.global.insert( 1540 &TrimCollectionCursorKey::new(collection.clone()).to_db_bytes()?, 1541 &new_cursor.to_db_bytes()?, 1542 )?; 1543 } 1544 } 1545 1546 log::trace!("trim_collection ({collection:?}) removed {dangling_feed_keys_cleaned} dangling feed entries and {records_deleted} records (ended early? {ended_early})"); 1547 Ok((dangling_feed_keys_cleaned, records_deleted, ended_early)) 1548 } 1549 1550 fn delete_account(&mut self, did: &Did) -> Result<usize, StorageError> { 1551 let mut records_deleted = 0; 1552 let mut batch = self.keyspace.batch(); 1553 let prefix = RecordLocationKey::from_prefix_to_db_bytes(did)?; 1554 for kv in self.records.prefix(prefix) { 1555 let (key_bytes, _) = kv?; 1556 batch.remove(&self.records, key_bytes); 1557 records_deleted += 1; 1558 if batch.len() >= MAX_BATCHED_ACCOUNT_DELETE_RECORDS { 1559 batch.commit()?; 1560 batch = self.keyspace.batch(); 1561 } 1562 } 1563 batch.commit()?; 1564 Ok(records_deleted) 1565 } 1566} 1567 1568pub struct FjallBackground(FjallWriter); 1569 1570#[async_trait] 1571impl StoreBackground for FjallBackground { 1572 async fn run(mut self, backfill: bool) -> StorageResult<()> { 1573 let mut dirty_nsids = HashSet::new(); 1574 1575 // backfill condition here is iffy -- longer is good when doing the main ingest and then collection trims 1576 // shorter once those are done helps things catch up 1577 // the best setting for non-backfill is non-obvious.. it can be pretty slow and still be fine 1578 let mut rollup = 1579 tokio::time::interval(Duration::from_micros(if backfill { 100 } else { 32_000 })); 1580 rollup.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 1581 1582 // backfill condition again iffy. collection trims should probably happen in their own phase. 1583 let mut trim = tokio::time::interval(Duration::from_secs(if backfill { 18 } else { 9 })); 1584 trim.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 1585 1586 loop { 1587 tokio::select! { 1588 _ = rollup.tick() => { 1589 let mut db = self.0.clone(); 1590 let (n, dirty) = tokio::task::spawn_blocking(move || db.step_rollup()).await??; 1591 if n == 0 { 1592 rollup.reset_after(Duration::from_millis(1_200)); // we're caught up, take a break 1593 } 1594 dirty_nsids.extend(dirty); 1595 log::trace!("rolled up {n} items ({} collections now dirty)", dirty_nsids.len()); 1596 }, 1597 _ = trim.tick() => { 1598 let n = dirty_nsids.len(); 1599 log::trace!("trimming {n} nsids: {dirty_nsids:?}"); 1600 let t0 = Instant::now(); 1601 let (mut total_danglers, mut total_deleted) = (0, 0); 1602 let mut completed = HashSet::new(); 1603 for collection in &dirty_nsids { 1604 let mut db = self.0.clone(); 1605 let c = collection.clone(); 1606 let (danglers, deleted, ended_early) = tokio::task::spawn_blocking(move || db.trim_collection(&c, 512, false)).await??; 1607 total_danglers += danglers; 1608 total_deleted += deleted; 1609 if !ended_early { 1610 completed.insert(collection.clone()); 1611 } 1612 if total_deleted > 10_000_000 { 1613 log::info!("trim stopped early, more than 10M records already deleted."); 1614 break; 1615 } 1616 } 1617 let dt = t0.elapsed(); 1618 log::trace!("finished trimming {n} nsids in {dt:?}: {total_danglers} dangling and {total_deleted} total removed."); 1619 histogram!("storage_trim_dirty_nsids").record(completed.len() as f64); 1620 histogram!("storage_trim_duration").record(dt.as_micros() as f64); 1621 counter!("storage_trim_removed", "dangling" => "true").increment(total_danglers as u64); 1622 counter!("storage_trim_removed", "dangling" => "false").increment((total_deleted - total_danglers) as u64); 1623 for c in completed { 1624 dirty_nsids.remove(&c); 1625 } 1626 }, 1627 }; 1628 } 1629 } 1630} 1631 1632/// Get a value from a fixed key 1633fn get_static_neu<K: StaticStr, V: DbBytes>(global: &PartitionHandle) -> StorageResult<Option<V>> { 1634 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1635 let value = global 1636 .get(&key_bytes)? 1637 .map(|value_bytes| db_complete(&value_bytes)) 1638 .transpose()?; 1639 Ok(value) 1640} 1641 1642/// Get a value from a fixed key 1643fn get_snapshot_static_neu<K: StaticStr, V: DbBytes>( 1644 global: &fjall::Snapshot, 1645) -> StorageResult<Option<V>> { 1646 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1647 let value = global 1648 .get(&key_bytes)? 1649 .map(|value_bytes| db_complete(&value_bytes)) 1650 .transpose()?; 1651 Ok(value) 1652} 1653 1654/// Set a value to a fixed key 1655fn insert_static_neu<K: StaticStr>( 1656 global: &PartitionHandle, 1657 value: impl DbBytes, 1658) -> StorageResult<()> { 1659 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1660 let value_bytes = value.to_db_bytes()?; 1661 global.insert(&key_bytes, &value_bytes)?; 1662 Ok(()) 1663} 1664 1665/// Set a value to a fixed key, erroring if the value already exists 1666/// 1667/// Intended for single-threaded init: not safe under concurrency, since there 1668/// is no transaction between checking if the already exists and writing it. 1669fn init_static_neu<K: StaticStr>( 1670 global: &PartitionHandle, 1671 value: impl DbBytes, 1672) -> StorageResult<()> { 1673 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1674 if global.get(&key_bytes)?.is_some() { 1675 return Err(StorageError::InitError(format!( 1676 "init failed: value for key {key_bytes:?} already exists" 1677 ))); 1678 } 1679 let value_bytes = value.to_db_bytes()?; 1680 global.insert(&key_bytes, &value_bytes)?; 1681 Ok(()) 1682} 1683 1684/// Set a value to a fixed key 1685fn insert_batch_static_neu<K: StaticStr>( 1686 batch: &mut FjallBatch, 1687 global: &PartitionHandle, 1688 value: impl DbBytes, 1689) -> StorageResult<()> { 1690 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1691 let value_bytes = value.to_db_bytes()?; 1692 batch.insert(global, &key_bytes, &value_bytes); 1693 Ok(()) 1694} 1695 1696#[derive(Debug, serde::Serialize, schemars::JsonSchema)] 1697pub struct StorageInfo { 1698 pub keyspace_disk_space: u64, 1699 pub keyspace_journal_count: usize, 1700 pub keyspace_sequence: u64, 1701 pub global_approximate_len: usize, 1702} 1703 1704////////// temp stuff to remove: 1705 1706#[cfg(test)] 1707mod tests { 1708 use super::*; 1709 use crate::{DeleteAccount, RecordKey, UFOsCommit}; 1710 use jetstream::events::{CommitEvent, CommitOp}; 1711 use jetstream::exports::Cid; 1712 use serde_json::value::RawValue; 1713 1714 fn fjall_db() -> (FjallReader, FjallWriter) { 1715 let (read, write, _, _) = FjallStorage::init( 1716 tempfile::tempdir().unwrap(), 1717 "offline test (no real jetstream endpoint)".to_string(), 1718 false, 1719 FjallConfig { temp: true }, 1720 ) 1721 .unwrap(); 1722 (read, write) 1723 } 1724 1725 const TEST_BATCH_LIMIT: usize = 16; 1726 fn beginning() -> HourTruncatedCursor { 1727 Cursor::from_start().into() 1728 } 1729 1730 #[derive(Debug, Default)] 1731 struct TestBatch { 1732 pub batch: EventBatch<TEST_BATCH_LIMIT>, 1733 } 1734 1735 impl TestBatch { 1736 #[allow(clippy::too_many_arguments)] 1737 pub fn create( 1738 &mut self, 1739 did: &str, 1740 collection: &str, 1741 rkey: &str, 1742 record: &str, 1743 rev: Option<&str>, 1744 cid: Option<Cid>, 1745 cursor: u64, 1746 ) -> Nsid { 1747 let did = Did::new(did.to_string()).unwrap(); 1748 let collection = Nsid::new(collection.to_string()).unwrap(); 1749 let record = RawValue::from_string(record.to_string()).unwrap(); 1750 let cid = cid.unwrap_or( 1751 "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy" 1752 .parse() 1753 .unwrap(), 1754 ); 1755 1756 let event = CommitEvent { 1757 collection, 1758 rkey: RecordKey::new(rkey.to_string()).unwrap(), 1759 rev: rev.unwrap_or("asdf").to_string(), 1760 operation: CommitOp::Create, 1761 record: Some(record), 1762 cid: Some(cid), 1763 }; 1764 1765 let (commit, collection) = 1766 UFOsCommit::from_commit_info(event, did.clone(), Cursor::from_raw_u64(cursor)) 1767 .unwrap(); 1768 1769 self.batch 1770 .commits_by_nsid 1771 .entry(collection.clone()) 1772 .or_default() 1773 .truncating_insert(commit, &[0u8; 16]) 1774 .unwrap(); 1775 1776 collection 1777 } 1778 #[allow(clippy::too_many_arguments)] 1779 pub fn update( 1780 &mut self, 1781 did: &str, 1782 collection: &str, 1783 rkey: &str, 1784 record: &str, 1785 rev: Option<&str>, 1786 cid: Option<Cid>, 1787 cursor: u64, 1788 ) -> Nsid { 1789 let did = Did::new(did.to_string()).unwrap(); 1790 let collection = Nsid::new(collection.to_string()).unwrap(); 1791 let record = RawValue::from_string(record.to_string()).unwrap(); 1792 let cid = cid.unwrap_or( 1793 "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy" 1794 .parse() 1795 .unwrap(), 1796 ); 1797 1798 let event = CommitEvent { 1799 collection, 1800 rkey: RecordKey::new(rkey.to_string()).unwrap(), 1801 rev: rev.unwrap_or("asdf").to_string(), 1802 operation: CommitOp::Update, 1803 record: Some(record), 1804 cid: Some(cid), 1805 }; 1806 1807 let (commit, collection) = 1808 UFOsCommit::from_commit_info(event, did.clone(), Cursor::from_raw_u64(cursor)) 1809 .unwrap(); 1810 1811 self.batch 1812 .commits_by_nsid 1813 .entry(collection.clone()) 1814 .or_default() 1815 .truncating_insert(commit, &[0u8; 16]) 1816 .unwrap(); 1817 1818 collection 1819 } 1820 #[allow(clippy::too_many_arguments)] 1821 pub fn delete( 1822 &mut self, 1823 did: &str, 1824 collection: &str, 1825 rkey: &str, 1826 rev: Option<&str>, 1827 cursor: u64, 1828 ) -> Nsid { 1829 let did = Did::new(did.to_string()).unwrap(); 1830 let collection = Nsid::new(collection.to_string()).unwrap(); 1831 let event = CommitEvent { 1832 collection, 1833 rkey: RecordKey::new(rkey.to_string()).unwrap(), 1834 rev: rev.unwrap_or("asdf").to_string(), 1835 operation: CommitOp::Delete, 1836 record: None, 1837 cid: None, 1838 }; 1839 1840 let (commit, collection) = 1841 UFOsCommit::from_commit_info(event, did, Cursor::from_raw_u64(cursor)).unwrap(); 1842 1843 self.batch 1844 .commits_by_nsid 1845 .entry(collection.clone()) 1846 .or_default() 1847 .truncating_insert(commit, &[0u8; 16]) 1848 .unwrap(); 1849 1850 collection 1851 } 1852 pub fn delete_account(&mut self, did: &str, cursor: u64) -> Did { 1853 let did = Did::new(did.to_string()).unwrap(); 1854 self.batch.account_removes.push(DeleteAccount { 1855 did: did.clone(), 1856 cursor: Cursor::from_raw_u64(cursor), 1857 }); 1858 did 1859 } 1860 } 1861 1862 #[test] 1863 fn test_hello() -> anyhow::Result<()> { 1864 let (read, mut write) = fjall_db(); 1865 write.insert_batch::<TEST_BATCH_LIMIT>(EventBatch::default())?; 1866 let JustCount { 1867 creates, 1868 dids_estimate, 1869 .. 1870 } = read.get_collection_counts( 1871 &Nsid::new("a.b.c".to_string()).unwrap(), 1872 beginning(), 1873 None, 1874 )?; 1875 assert_eq!(creates, 0); 1876 assert_eq!(dids_estimate, 0); 1877 Ok(()) 1878 } 1879 1880 #[test] 1881 fn test_insert_one() -> anyhow::Result<()> { 1882 let (read, mut write) = fjall_db(); 1883 1884 let mut batch = TestBatch::default(); 1885 let collection = batch.create( 1886 "did:plc:inze6wrmsm7pjl7yta3oig77", 1887 "a.b.c", 1888 "asdf", 1889 "{}", 1890 Some("rev-z"), 1891 None, 1892 100, 1893 ); 1894 write.insert_batch(batch.batch)?; 1895 write.step_rollup()?; 1896 1897 let JustCount { 1898 creates, 1899 dids_estimate, 1900 .. 1901 } = read.get_collection_counts(&collection, beginning(), None)?; 1902 assert_eq!(creates, 1); 1903 assert_eq!(dids_estimate, 1); 1904 let JustCount { 1905 creates, 1906 dids_estimate, 1907 .. 1908 } = read.get_collection_counts( 1909 &Nsid::new("d.e.f".to_string()).unwrap(), 1910 beginning(), 1911 None, 1912 )?; 1913 assert_eq!(creates, 0); 1914 assert_eq!(dids_estimate, 0); 1915 1916 let records = read.get_records_by_collections([collection].into(), 2, false)?; 1917 assert_eq!(records.len(), 1); 1918 let rec = &records[0]; 1919 assert_eq!(rec.record.get(), "{}"); 1920 assert!(!rec.is_update); 1921 1922 let records = read.get_records_by_collections( 1923 [Nsid::new("d.e.f".to_string()).unwrap()].into(), 1924 2, 1925 false, 1926 )?; 1927 assert_eq!(records.len(), 0); 1928 1929 Ok(()) 1930 } 1931 1932 #[test] 1933 fn test_get_multi_collection() -> anyhow::Result<()> { 1934 let (read, mut write) = fjall_db(); 1935 1936 let mut batch = TestBatch::default(); 1937 batch.create( 1938 "did:plc:inze6wrmsm7pjl7yta3oig77", 1939 "a.a.a", 1940 "aaa", 1941 r#""earliest""#, 1942 Some("rev-a"), 1943 None, 1944 100, 1945 ); 1946 batch.create( 1947 "did:plc:inze6wrmsm7pjl7yta3oig77", 1948 "a.a.b", 1949 "aab", 1950 r#""in between""#, 1951 Some("rev-ab"), 1952 None, 1953 101, 1954 ); 1955 batch.create( 1956 "did:plc:inze6wrmsm7pjl7yta3oig77", 1957 "a.a.a", 1958 "aaa-2", 1959 r#""last""#, 1960 Some("rev-a-2"), 1961 None, 1962 102, 1963 ); 1964 write.insert_batch(batch.batch)?; 1965 1966 let records = read.get_records_by_collections( 1967 HashSet::from([ 1968 Nsid::new("a.a.a".to_string()).unwrap(), 1969 Nsid::new("a.a.b".to_string()).unwrap(), 1970 Nsid::new("a.a.c".to_string()).unwrap(), 1971 ]), 1972 100, 1973 false, 1974 )?; 1975 assert_eq!(records.len(), 3); 1976 assert_eq!(records[0].record.get(), r#""last""#); 1977 assert_eq!( 1978 records[0].collection, 1979 Nsid::new("a.a.a".to_string()).unwrap() 1980 ); 1981 assert_eq!(records[1].record.get(), r#""in between""#); 1982 assert_eq!( 1983 records[1].collection, 1984 Nsid::new("a.a.b".to_string()).unwrap() 1985 ); 1986 assert_eq!(records[2].record.get(), r#""earliest""#); 1987 assert_eq!( 1988 records[2].collection, 1989 Nsid::new("a.a.a".to_string()).unwrap() 1990 ); 1991 1992 Ok(()) 1993 } 1994 1995 #[test] 1996 fn test_get_multi_collection_expanded() -> anyhow::Result<()> { 1997 let (read, mut write) = fjall_db(); 1998 1999 let mut batch = TestBatch::default(); 2000 // insert some older ones in aab 2001 for i in 1..=3 { 2002 batch.create( 2003 "did:plc:inze6wrmsm7pjl7yta3oig77", 2004 "a.a.b", 2005 &format!("aab-{i}"), 2006 &format!(r#""b {i}""#), 2007 Some(&format!("rev-b-{i}")), 2008 None, 2009 100 + i, 2010 ); 2011 } 2012 // and some newer ones in aaa 2013 for i in 1..=3 { 2014 batch.create( 2015 "did:plc:inze6wrmsm7pjl7yta3oig77", 2016 "a.a.a", 2017 &format!("aaa-{i}"), 2018 &format!(r#""a {i}""#), 2019 Some(&format!("rev-a-{i}")), 2020 None, 2021 200 + i, 2022 ); 2023 } 2024 write.insert_batch(batch.batch)?; 2025 2026 let records = read.get_records_by_collections( 2027 HashSet::from([ 2028 Nsid::new("a.a.a".to_string()).unwrap(), 2029 Nsid::new("a.a.b".to_string()).unwrap(), 2030 Nsid::new("a.a.c".to_string()).unwrap(), 2031 ]), 2032 2, 2033 true, 2034 )?; 2035 assert_eq!(records.len(), 4); 2036 assert_eq!(records[0].record.get(), r#""a 3""#); 2037 assert_eq!( 2038 records[0].collection, 2039 Nsid::new("a.a.a".to_string()).unwrap() 2040 ); 2041 2042 assert_eq!(records[3].record.get(), r#""b 2""#); 2043 assert_eq!( 2044 records[3].collection, 2045 Nsid::new("a.a.b".to_string()).unwrap() 2046 ); 2047 2048 Ok(()) 2049 } 2050 2051 #[test] 2052 fn test_update_one() -> anyhow::Result<()> { 2053 let (read, mut write) = fjall_db(); 2054 2055 let mut batch = TestBatch::default(); 2056 let collection = batch.create( 2057 "did:plc:inze6wrmsm7pjl7yta3oig77", 2058 "a.b.c", 2059 "rkey-asdf", 2060 "{}", 2061 Some("rev-a"), 2062 None, 2063 100, 2064 ); 2065 write.insert_batch(batch.batch)?; 2066 2067 let mut batch = TestBatch::default(); 2068 batch.update( 2069 "did:plc:inze6wrmsm7pjl7yta3oig77", 2070 "a.b.c", 2071 "rkey-asdf", 2072 r#"{"ch": "ch-ch-ch-changes"}"#, 2073 Some("rev-z"), 2074 None, 2075 101, 2076 ); 2077 write.insert_batch(batch.batch)?; 2078 write.step_rollup()?; 2079 2080 let JustCount { 2081 creates, 2082 dids_estimate, 2083 .. 2084 } = read.get_collection_counts(&collection, beginning(), None)?; 2085 assert_eq!(creates, 1); 2086 assert_eq!(dids_estimate, 1); 2087 2088 let records = read.get_records_by_collections([collection].into(), 2, false)?; 2089 assert_eq!(records.len(), 1); 2090 let rec = &records[0]; 2091 assert_eq!(rec.record.get(), r#"{"ch": "ch-ch-ch-changes"}"#); 2092 assert!(rec.is_update); 2093 Ok(()) 2094 } 2095 2096 #[test] 2097 fn test_delete_one() -> anyhow::Result<()> { 2098 let (read, mut write) = fjall_db(); 2099 2100 let mut batch = TestBatch::default(); 2101 let collection = batch.create( 2102 "did:plc:inze6wrmsm7pjl7yta3oig77", 2103 "a.b.c", 2104 "rkey-asdf", 2105 "{}", 2106 Some("rev-a"), 2107 None, 2108 100, 2109 ); 2110 write.insert_batch(batch.batch)?; 2111 2112 let mut batch = TestBatch::default(); 2113 batch.delete( 2114 "did:plc:inze6wrmsm7pjl7yta3oig77", 2115 "a.b.c", 2116 "rkey-asdf", 2117 Some("rev-z"), 2118 101, 2119 ); 2120 write.insert_batch(batch.batch)?; 2121 write.step_rollup()?; 2122 2123 let JustCount { 2124 creates, 2125 dids_estimate, 2126 .. 2127 } = read.get_collection_counts(&collection, beginning(), None)?; 2128 assert_eq!(creates, 1); 2129 assert_eq!(dids_estimate, 1); 2130 2131 let records = read.get_records_by_collections([collection].into(), 2, false)?; 2132 assert_eq!(records.len(), 0); 2133 2134 Ok(()) 2135 } 2136 2137 #[test] 2138 fn test_collection_trim() -> anyhow::Result<()> { 2139 let (read, mut write) = fjall_db(); 2140 2141 let mut batch = TestBatch::default(); 2142 batch.create( 2143 "did:plc:inze6wrmsm7pjl7yta3oig77", 2144 "a.a.a", 2145 "rkey-aaa", 2146 "{}", 2147 Some("rev-aaa"), 2148 None, 2149 10_000, 2150 ); 2151 let mut last_b_cursor; 2152 for i in 1..=10 { 2153 last_b_cursor = 11_000 + i; 2154 batch.create( 2155 &format!("did:plc:inze6wrmsm7pjl7yta3oig7{}", i % 3), 2156 "a.a.b", 2157 &format!("rkey-bbb-{i}"), 2158 &format!(r#"{{"n": {i}}}"#), 2159 Some(&format!("rev-bbb-{i}")), 2160 None, 2161 last_b_cursor, 2162 ); 2163 } 2164 batch.create( 2165 "did:plc:inze6wrmsm7pjl7yta3oig77", 2166 "a.a.c", 2167 "rkey-ccc", 2168 "{}", 2169 Some("rev-ccc"), 2170 None, 2171 12_000, 2172 ); 2173 2174 write.insert_batch(batch.batch)?; 2175 2176 let records = read.get_records_by_collections( 2177 HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 2178 100, 2179 false, 2180 )?; 2181 assert_eq!(records.len(), 1); 2182 let records = read.get_records_by_collections( 2183 HashSet::from([Nsid::new("a.a.b".to_string()).unwrap()]), 2184 100, 2185 false, 2186 )?; 2187 assert_eq!(records.len(), 10); 2188 let records = read.get_records_by_collections( 2189 HashSet::from([Nsid::new("a.a.c".to_string()).unwrap()]), 2190 100, 2191 false, 2192 )?; 2193 assert_eq!(records.len(), 1); 2194 let records = read.get_records_by_collections( 2195 HashSet::from([Nsid::new("a.a.d".to_string()).unwrap()]), 2196 100, 2197 false, 2198 )?; 2199 assert_eq!(records.len(), 0); 2200 2201 write.trim_collection(&Nsid::new("a.a.a".to_string()).unwrap(), 6, false)?; 2202 write.trim_collection(&Nsid::new("a.a.b".to_string()).unwrap(), 6, false)?; 2203 write.trim_collection(&Nsid::new("a.a.c".to_string()).unwrap(), 6, false)?; 2204 write.trim_collection(&Nsid::new("a.a.d".to_string()).unwrap(), 6, false)?; 2205 2206 let records = read.get_records_by_collections( 2207 HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 2208 100, 2209 false, 2210 )?; 2211 assert_eq!(records.len(), 1); 2212 let records = read.get_records_by_collections( 2213 HashSet::from([Nsid::new("a.a.b".to_string()).unwrap()]), 2214 100, 2215 false, 2216 )?; 2217 assert_eq!(records.len(), 6); 2218 let records = read.get_records_by_collections( 2219 HashSet::from([Nsid::new("a.a.c".to_string()).unwrap()]), 2220 100, 2221 false, 2222 )?; 2223 assert_eq!(records.len(), 1); 2224 let records = read.get_records_by_collections( 2225 HashSet::from([Nsid::new("a.a.d".to_string()).unwrap()]), 2226 100, 2227 false, 2228 )?; 2229 assert_eq!(records.len(), 0); 2230 2231 Ok(()) 2232 } 2233 2234 #[test] 2235 fn test_delete_account() -> anyhow::Result<()> { 2236 let (read, mut write) = fjall_db(); 2237 2238 let mut batch = TestBatch::default(); 2239 batch.create( 2240 "did:plc:person-a", 2241 "a.a.a", 2242 "rkey-aaa", 2243 "{}", 2244 Some("rev-aaa"), 2245 None, 2246 10_000, 2247 ); 2248 for i in 1..=2 { 2249 batch.create( 2250 "did:plc:person-b", 2251 "a.a.a", 2252 &format!("rkey-bbb-{i}"), 2253 &format!(r#"{{"n": {i}}}"#), 2254 Some(&format!("rev-bbb-{i}")), 2255 None, 2256 11_000 + i, 2257 ); 2258 } 2259 write.insert_batch(batch.batch)?; 2260 2261 let records = read.get_records_by_collections( 2262 HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 2263 100, 2264 false, 2265 )?; 2266 assert_eq!(records.len(), 3); 2267 2268 let records_deleted = 2269 write.delete_account(&Did::new("did:plc:person-b".to_string()).unwrap())?; 2270 assert_eq!(records_deleted, 2); 2271 2272 let records = read.get_records_by_collections( 2273 HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 2274 100, 2275 false, 2276 )?; 2277 assert_eq!(records.len(), 1); 2278 2279 Ok(()) 2280 } 2281 2282 #[test] 2283 fn rollup_delete_account_removes_record() -> anyhow::Result<()> { 2284 let (read, mut write) = fjall_db(); 2285 2286 let mut batch = TestBatch::default(); 2287 batch.create( 2288 "did:plc:person-a", 2289 "a.a.a", 2290 "rkey-aaa", 2291 "{}", 2292 Some("rev-aaa"), 2293 None, 2294 10_000, 2295 ); 2296 write.insert_batch(batch.batch)?; 2297 2298 let mut batch = TestBatch::default(); 2299 batch.delete_account("did:plc:person-a", 9_999); // queue it before the rollup 2300 write.insert_batch(batch.batch)?; 2301 2302 write.step_rollup()?; 2303 2304 let records = read.get_records_by_collections( 2305 [Nsid::new("a.a.a".to_string()).unwrap()].into(), 2306 1, 2307 false, 2308 )?; 2309 assert_eq!(records.len(), 0); 2310 2311 Ok(()) 2312 } 2313 2314 #[test] 2315 fn rollup_delete_live_count_step() -> anyhow::Result<()> { 2316 let (read, mut write) = fjall_db(); 2317 2318 let mut batch = TestBatch::default(); 2319 batch.create( 2320 "did:plc:person-a", 2321 "a.a.a", 2322 "rkey-aaa", 2323 "{}", 2324 Some("rev-aaa"), 2325 None, 2326 10_000, 2327 ); 2328 write.insert_batch(batch.batch)?; 2329 2330 let (n, _) = write.step_rollup()?; 2331 assert_eq!(n, 1); 2332 2333 let mut batch = TestBatch::default(); 2334 batch.delete_account("did:plc:person-a", 10_001); 2335 write.insert_batch(batch.batch)?; 2336 2337 let records = read.get_records_by_collections( 2338 [Nsid::new("a.a.a".to_string()).unwrap()].into(), 2339 1, 2340 false, 2341 )?; 2342 assert_eq!(records.len(), 1); 2343 2344 let (n, _) = write.step_rollup()?; 2345 assert_eq!(n, 1); 2346 2347 let records = read.get_records_by_collections( 2348 [Nsid::new("a.a.a".to_string()).unwrap()].into(), 2349 1, 2350 false, 2351 )?; 2352 assert_eq!(records.len(), 0); 2353 2354 let mut batch = TestBatch::default(); 2355 batch.delete_account("did:plc:person-a", 9_999); 2356 write.insert_batch(batch.batch)?; 2357 2358 let (n, _) = write.step_rollup()?; 2359 assert_eq!(n, 0); 2360 2361 Ok(()) 2362 } 2363 2364 #[test] 2365 fn rollup_multiple_count_batches() -> anyhow::Result<()> { 2366 let (_read, mut write) = fjall_db(); 2367 2368 let mut batch = TestBatch::default(); 2369 batch.create( 2370 "did:plc:person-a", 2371 "a.a.a", 2372 "rkey-aaa", 2373 "{}", 2374 Some("rev-aaa"), 2375 None, 2376 10_000, 2377 ); 2378 write.insert_batch(batch.batch)?; 2379 2380 let mut batch = TestBatch::default(); 2381 batch.create( 2382 "did:plc:person-a", 2383 "a.a.a", 2384 "rkey-aab", 2385 "{}", 2386 Some("rev-aab"), 2387 None, 2388 10_001, 2389 ); 2390 write.insert_batch(batch.batch)?; 2391 2392 let (n, _) = write.step_rollup()?; 2393 assert_eq!(n, 2); 2394 2395 let (n, _) = write.step_rollup()?; 2396 assert_eq!(n, 0); 2397 2398 Ok(()) 2399 } 2400 2401 #[test] 2402 fn counts_before_and_after_rollup() -> anyhow::Result<()> { 2403 let (read, mut write) = fjall_db(); 2404 2405 let mut batch = TestBatch::default(); 2406 batch.create( 2407 "did:plc:person-a", 2408 "a.a.a", 2409 "rkey-aaa", 2410 "{}", 2411 Some("rev-aaa"), 2412 None, 2413 10_000, 2414 ); 2415 batch.create( 2416 "did:plc:person-b", 2417 "a.a.a", 2418 "rkey-bbb", 2419 "{}", 2420 Some("rev-bbb"), 2421 None, 2422 10_001, 2423 ); 2424 write.insert_batch(batch.batch)?; 2425 2426 let mut batch = TestBatch::default(); 2427 batch.delete_account("did:plc:person-a", 11_000); 2428 write.insert_batch(batch.batch)?; 2429 2430 let mut batch = TestBatch::default(); 2431 batch.create( 2432 "did:plc:person-a", 2433 "a.a.a", 2434 "rkey-aac", 2435 "{}", 2436 Some("rev-aac"), 2437 None, 2438 12_000, 2439 ); 2440 write.insert_batch(batch.batch)?; 2441 2442 // before any rollup 2443 let JustCount { 2444 creates, 2445 dids_estimate, 2446 .. 2447 } = read.get_collection_counts( 2448 &Nsid::new("a.a.a".to_string()).unwrap(), 2449 beginning(), 2450 None, 2451 )?; 2452 assert_eq!(creates, 0); 2453 assert_eq!(dids_estimate, 0); 2454 2455 // first batch rolled up 2456 let (n, _) = write.step_rollup()?; 2457 assert_eq!(n, 1); 2458 2459 let JustCount { 2460 creates, 2461 dids_estimate, 2462 .. 2463 } = read.get_collection_counts( 2464 &Nsid::new("a.a.a".to_string()).unwrap(), 2465 beginning(), 2466 None, 2467 )?; 2468 assert_eq!(creates, 2); 2469 assert_eq!(dids_estimate, 2); 2470 2471 // delete account rolled up 2472 let (n, _) = write.step_rollup()?; 2473 assert_eq!(n, 1); 2474 2475 let JustCount { 2476 creates, 2477 dids_estimate, 2478 .. 2479 } = read.get_collection_counts( 2480 &Nsid::new("a.a.a".to_string()).unwrap(), 2481 beginning(), 2482 None, 2483 )?; 2484 assert_eq!(creates, 2); 2485 assert_eq!(dids_estimate, 2); 2486 2487 // second batch rolled up 2488 let (n, _) = write.step_rollup()?; 2489 assert_eq!(n, 1); 2490 2491 let JustCount { 2492 creates, 2493 dids_estimate, 2494 .. 2495 } = read.get_collection_counts( 2496 &Nsid::new("a.a.a".to_string()).unwrap(), 2497 beginning(), 2498 None, 2499 )?; 2500 assert_eq!(creates, 3); 2501 assert_eq!(dids_estimate, 2); 2502 2503 // no more rollups left 2504 let (n, _) = write.step_rollup()?; 2505 assert_eq!(n, 0); 2506 2507 Ok(()) 2508 } 2509 2510 #[test] 2511 fn get_prefix_children_lexi_empty() { 2512 let (read, _) = fjall_db(); 2513 let ( 2514 JustCount { 2515 creates, 2516 dids_estimate, 2517 .. 2518 }, 2519 children, 2520 cursor, 2521 ) = read 2522 .get_prefix( 2523 NsidPrefix::new("aaa.aaa").unwrap(), 2524 10, 2525 OrderCollectionsBy::Lexi { cursor: None }, 2526 None, 2527 None, 2528 ) 2529 .unwrap(); 2530 2531 assert_eq!(creates, 0); 2532 assert_eq!(dids_estimate, 0); 2533 assert_eq!(children, vec![]); 2534 assert_eq!(cursor, None); 2535 } 2536 2537 #[test] 2538 fn get_prefix_excludes_exact_collection() -> anyhow::Result<()> { 2539 let (read, mut write) = fjall_db(); 2540 2541 let mut batch = TestBatch::default(); 2542 batch.create( 2543 "did:plc:person-a", 2544 "a.a.a", 2545 "rkey-aaa", 2546 "{}", 2547 Some("rev-aaa"), 2548 None, 2549 10_000, 2550 ); 2551 write.insert_batch(batch.batch)?; 2552 write.step_rollup()?; 2553 2554 let ( 2555 JustCount { 2556 creates, 2557 dids_estimate, 2558 .. 2559 }, 2560 children, 2561 cursor, 2562 ) = read.get_prefix( 2563 NsidPrefix::new("a.a.a").unwrap(), 2564 10, 2565 OrderCollectionsBy::Lexi { cursor: None }, 2566 None, 2567 None, 2568 )?; 2569 assert_eq!(creates, 0); 2570 assert_eq!(dids_estimate, 0); 2571 assert_eq!(children, vec![]); 2572 assert_eq!(cursor, None); 2573 Ok(()) 2574 } 2575 2576 #[test] 2577 fn get_prefix_excludes_neighbour_collection() -> anyhow::Result<()> { 2578 let (read, mut write) = fjall_db(); 2579 2580 let mut batch = TestBatch::default(); 2581 batch.create( 2582 "did:plc:person-a", 2583 "a.a.aa", 2584 "rkey-aaa", 2585 "{}", 2586 Some("rev-aaa"), 2587 None, 2588 10_000, 2589 ); 2590 write.insert_batch(batch.batch)?; 2591 write.step_rollup()?; 2592 2593 let ( 2594 JustCount { 2595 creates, 2596 dids_estimate, 2597 .. 2598 }, 2599 children, 2600 cursor, 2601 ) = read.get_prefix( 2602 NsidPrefix::new("a.a.a").unwrap(), 2603 10, 2604 OrderCollectionsBy::Lexi { cursor: None }, 2605 None, 2606 None, 2607 )?; 2608 assert_eq!(creates, 0); 2609 assert_eq!(dids_estimate, 0); 2610 assert_eq!(children, vec![]); 2611 assert_eq!(cursor, None); 2612 Ok(()) 2613 } 2614 2615 #[test] 2616 fn get_prefix_includes_child_collection() -> anyhow::Result<()> { 2617 let (read, mut write) = fjall_db(); 2618 2619 let mut batch = TestBatch::default(); 2620 batch.create( 2621 "did:plc:person-a", 2622 "a.a.a", 2623 "rkey-aaa", 2624 "{}", 2625 Some("rev-aaa"), 2626 None, 2627 10_000, 2628 ); 2629 write.insert_batch(batch.batch)?; 2630 write.step_rollup()?; 2631 2632 let ( 2633 JustCount { 2634 creates, 2635 dids_estimate, 2636 .. 2637 }, 2638 children, 2639 cursor, 2640 ) = read.get_prefix( 2641 NsidPrefix::new("a.a").unwrap(), 2642 10, 2643 OrderCollectionsBy::Lexi { cursor: None }, 2644 None, 2645 None, 2646 )?; 2647 assert_eq!(creates, 1); 2648 assert_eq!(dids_estimate, 1); 2649 assert_eq!( 2650 children, 2651 vec![PrefixChild::Collection(NsidCount { 2652 nsid: "a.a.a".to_string(), 2653 creates: 1, 2654 updates: 0, 2655 deletes: 0, 2656 dids_estimate: 1 2657 }),] 2658 ); 2659 assert_eq!(cursor, None); 2660 Ok(()) 2661 } 2662 2663 #[test] 2664 fn get_prefix_includes_child_prefix() -> anyhow::Result<()> { 2665 let (read, mut write) = fjall_db(); 2666 2667 let mut batch = TestBatch::default(); 2668 batch.create( 2669 "did:plc:person-a", 2670 "a.a.a.a", 2671 "rkey-aaaa", 2672 "{}", 2673 Some("rev-aaaa"), 2674 None, 2675 10_000, 2676 ); 2677 write.insert_batch(batch.batch)?; 2678 write.step_rollup()?; 2679 2680 let ( 2681 JustCount { 2682 creates, 2683 dids_estimate, 2684 .. 2685 }, 2686 children, 2687 cursor, 2688 ) = read.get_prefix( 2689 NsidPrefix::new("a.a").unwrap(), 2690 10, 2691 OrderCollectionsBy::Lexi { cursor: None }, 2692 None, 2693 None, 2694 )?; 2695 assert_eq!(creates, 1); 2696 assert_eq!(dids_estimate, 1); 2697 assert_eq!( 2698 children, 2699 vec![PrefixChild::Prefix(PrefixCount { 2700 prefix: "a.a.a".to_string(), 2701 creates: 1, 2702 updates: 0, 2703 deletes: 0, 2704 dids_estimate: 1, 2705 }),] 2706 ); 2707 assert_eq!(cursor, None); 2708 Ok(()) 2709 } 2710 2711 #[test] 2712 fn get_prefix_merges_child_prefixes() -> anyhow::Result<()> { 2713 let (read, mut write) = fjall_db(); 2714 2715 let mut batch = TestBatch::default(); 2716 batch.create( 2717 "did:plc:person-a", 2718 "a.a.a.a", 2719 "rkey-aaaa", 2720 "{}", 2721 Some("rev-aaaa"), 2722 None, 2723 10_000, 2724 ); 2725 batch.create( 2726 "did:plc:person-a", 2727 "a.a.a.b", 2728 "rkey-aaab", 2729 "{}", 2730 Some("rev-aaab"), 2731 None, 2732 10_001, 2733 ); 2734 write.insert_batch(batch.batch)?; 2735 write.step_rollup()?; 2736 2737 let ( 2738 JustCount { 2739 creates, 2740 dids_estimate, 2741 .. 2742 }, 2743 children, 2744 cursor, 2745 ) = read.get_prefix( 2746 NsidPrefix::new("a.a").unwrap(), 2747 10, 2748 OrderCollectionsBy::Lexi { cursor: None }, 2749 None, 2750 None, 2751 )?; 2752 assert_eq!(creates, 2); 2753 assert_eq!(dids_estimate, 1); 2754 assert_eq!( 2755 children, 2756 vec![PrefixChild::Prefix(PrefixCount { 2757 prefix: "a.a.a".to_string(), 2758 creates: 2, 2759 updates: 0, 2760 deletes: 0, 2761 dids_estimate: 1 2762 }),] 2763 ); 2764 assert_eq!(cursor, None); 2765 Ok(()) 2766 } 2767 2768 #[test] 2769 fn get_prefix_exact_and_child_and_prefix() -> anyhow::Result<()> { 2770 let (read, mut write) = fjall_db(); 2771 2772 let mut batch = TestBatch::default(); 2773 // exact: 2774 batch.create( 2775 "did:plc:person-a", 2776 "a.a.a", 2777 "rkey-aaa", 2778 "{}", 2779 Some("rev-aaa"), 2780 None, 2781 10_000, 2782 ); 2783 // child: 2784 batch.create( 2785 "did:plc:person-a", 2786 "a.a.a.a", 2787 "rkey-aaaa", 2788 "{}", 2789 Some("rev-aaaa"), 2790 None, 2791 10_001, 2792 ); 2793 // prefix: 2794 batch.create( 2795 "did:plc:person-a", 2796 "a.a.a.a.a", 2797 "rkey-aaaaa", 2798 "{}", 2799 Some("rev-aaaaa"), 2800 None, 2801 10_002, 2802 ); 2803 write.insert_batch(batch.batch)?; 2804 write.step_rollup()?; 2805 2806 let ( 2807 JustCount { 2808 creates, 2809 dids_estimate, 2810 .. 2811 }, 2812 children, 2813 cursor, 2814 ) = read.get_prefix( 2815 NsidPrefix::new("a.a.a").unwrap(), 2816 10, 2817 OrderCollectionsBy::Lexi { cursor: None }, 2818 None, 2819 None, 2820 )?; 2821 assert_eq!(creates, 2); 2822 assert_eq!(dids_estimate, 1); 2823 assert_eq!( 2824 children, 2825 vec![ 2826 PrefixChild::Collection(NsidCount { 2827 nsid: "a.a.a.a".to_string(), 2828 creates: 1, 2829 updates: 0, 2830 deletes: 0, 2831 dids_estimate: 1 2832 }), 2833 PrefixChild::Prefix(PrefixCount { 2834 prefix: "a.a.a.a".to_string(), 2835 creates: 1, 2836 updates: 0, 2837 deletes: 0, 2838 dids_estimate: 1 2839 }), 2840 ] 2841 ); 2842 assert_eq!(cursor, None); 2843 Ok(()) 2844 } 2845}