Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use super::{ActionableEvent, LinkReader, LinkStorage, PagedAppendingCollection, StorageStats}; 2use crate::{CountsByCount, Did, RecordId}; 3use anyhow::{bail, Result}; 4use bincode::Options as BincodeOptions; 5use links::CollectedLink; 6use metrics::{counter, describe_counter, describe_histogram, histogram, Unit}; 7use ratelimit::Ratelimiter; 8use rocksdb::backup::{BackupEngine, BackupEngineOptions}; 9use rocksdb::{ 10 AsColumnFamilyRef, ColumnFamilyDescriptor, DBWithThreadMode, IteratorMode, MergeOperands, 11 MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch, 12}; 13use serde::{Deserialize, Serialize}; 14use std::collections::{HashMap, HashSet}; 15use std::io::Read; 16use std::marker::PhantomData; 17use std::path::{Path, PathBuf}; 18use std::sync::{ 19 atomic::{AtomicU64, Ordering}, 20 Arc, 21}; 22use std::thread; 23use std::time::{Duration, Instant}; 24use tokio_util::sync::CancellationToken; 25 26static DID_IDS_CF: &str = "did_ids"; 27static TARGET_IDS_CF: &str = "target_ids"; 28static TARGET_LINKERS_CF: &str = "target_links"; 29static LINK_TARGETS_CF: &str = "link_targets"; 30 31static JETSTREAM_CURSOR_KEY: &str = "jetstream_cursor"; 32 33// todo: actually understand and set these options probably better 34fn rocks_opts_base() -> Options { 35 let mut opts = Options::default(); 36 opts.set_level_compaction_dynamic_level_bytes(true); 37 opts.create_if_missing(true); 38 opts.set_compression_type(rocksdb::DBCompressionType::Lz4); 39 opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd); // this probably doesn't work because it hasn't been enabled 40 // TODO: actually enable the bottommost compression. but after other changes run for a bit in case zstd is cpu- or mem-expensive. 41 opts 42} 43fn get_db_opts() -> Options { 44 let mut opts = rocks_opts_base(); 45 opts.create_missing_column_families(true); 46 opts.increase_parallelism(4); // todo: make configurable if anyone else actually runs a different instance. start at # of cores 47 // consider doing optimize_level_style_compaction or optimize_universal_style_compaction 48 opts 49} 50fn get_db_read_opts() -> Options { 51 let mut opts = Options::default(); 52 opts.optimize_for_point_lookup(16_384); // mb (run this on big machines) 53 opts 54} 55 56#[derive(Debug, Clone)] 57pub struct RocksStorage { 58 pub db: Arc<DBWithThreadMode<MultiThreaded>>, // TODO: mov seqs here (concat merge op will be fun) 59 did_id_table: IdTable<Did, DidIdValue, true>, 60 target_id_table: IdTable<TargetKey, TargetId, false>, 61 is_writer: bool, 62 backup_task: Arc<Option<thread::JoinHandle<Result<()>>>>, 63} 64 65trait IdTableValue: ValueFromRocks + Clone { 66 fn new(v: u64) -> Self; 67 fn id(&self) -> u64; 68} 69#[derive(Debug, Clone)] 70struct IdTableBase<Orig, IdVal: IdTableValue> 71where 72 Orig: KeyFromRocks, 73 for<'a> &'a Orig: AsRocksKey, 74{ 75 _key_marker: PhantomData<Orig>, 76 _val_marker: PhantomData<IdVal>, 77 name: String, 78 id_seq: Arc<AtomicU64>, 79} 80impl<Orig, IdVal: IdTableValue> IdTableBase<Orig, IdVal> 81where 82 Orig: KeyFromRocks, 83 for<'a> &'a Orig: AsRocksKey, 84{ 85 fn cf_descriptor(&self) -> ColumnFamilyDescriptor { 86 ColumnFamilyDescriptor::new(&self.name, rocks_opts_base()) 87 } 88 fn init<const WITH_REVERSE: bool>( 89 self, 90 db: &DBWithThreadMode<MultiThreaded>, 91 ) -> Result<IdTable<Orig, IdVal, WITH_REVERSE>> { 92 if db.cf_handle(&self.name).is_none() { 93 bail!("failed to get cf handle from db -- was the db open with our .cf_descriptor()?"); 94 } 95 let priv_id_seq = if let Some(seq_bytes) = db.get(self.seq_key())? { 96 if seq_bytes.len() != 8 { 97 bail!( 98 "reading bytes for u64 id seq {:?}: found the wrong number of bytes", 99 self.seq_key() 100 ); 101 } 102 let mut buf: [u8; 8] = [0; 8]; 103 seq_bytes.as_slice().read_exact(&mut buf)?; 104 let last_seq = u64::from_le_bytes(buf); 105 last_seq + 1 106 } else { 107 1 108 }; 109 self.id_seq.store(priv_id_seq, Ordering::SeqCst); 110 Ok(IdTable { 111 base: self, 112 priv_id_seq, 113 }) 114 } 115 fn seq_key(&self) -> Vec<u8> { 116 let mut k = b"__id_seq_key_plz_be_unique:".to_vec(); 117 k.extend(self.name.as_bytes()); 118 k 119 } 120} 121#[derive(Debug, Clone)] 122struct IdTable<Orig, IdVal: IdTableValue, const WITH_REVERSE: bool> 123where 124 Orig: KeyFromRocks, 125 for<'a> &'a Orig: AsRocksKey, 126{ 127 base: IdTableBase<Orig, IdVal>, 128 priv_id_seq: u64, 129} 130impl<Orig: Clone, IdVal: IdTableValue, const WITH_REVERSE: bool> IdTable<Orig, IdVal, WITH_REVERSE> 131where 132 Orig: KeyFromRocks, 133 for<'v> &'v IdVal: AsRocksValue, 134 for<'k> &'k Orig: AsRocksKey, 135{ 136 #[must_use] 137 fn setup(name: &str) -> IdTableBase<Orig, IdVal> { 138 IdTableBase::<Orig, IdVal> { 139 _key_marker: PhantomData, 140 _val_marker: PhantomData, 141 name: name.into(), 142 id_seq: Arc::new(AtomicU64::new(0)), // zero is "uninint", first seq num will be 1 143 } 144 } 145 fn get_id_val( 146 &self, 147 db: &DBWithThreadMode<MultiThreaded>, 148 orig: &Orig, 149 ) -> Result<Option<IdVal>> { 150 let cf = db.cf_handle(&self.base.name).unwrap(); 151 if let Some(_id_bytes) = db.get_cf(&cf, _rk(orig))? { 152 Ok(Some(_vr(&_id_bytes)?)) 153 } else { 154 Ok(None) 155 } 156 } 157 fn __get_or_create_id_val<CF>( 158 &mut self, 159 cf: &CF, 160 db: &DBWithThreadMode<MultiThreaded>, 161 batch: &mut WriteBatch, 162 orig: &Orig, 163 ) -> Result<IdVal> 164 where 165 CF: AsColumnFamilyRef, 166 { 167 Ok(self.get_id_val(db, orig)?.unwrap_or_else(|| { 168 let prev_priv_seq = self.priv_id_seq; 169 self.priv_id_seq += 1; 170 let prev_public_seq = self.base.id_seq.swap(self.priv_id_seq, Ordering::SeqCst); 171 assert_eq!( 172 prev_public_seq, prev_priv_seq, 173 "public seq may have been modified??" 174 ); 175 let id_value = IdVal::new(self.priv_id_seq); 176 batch.put(self.base.seq_key(), self.priv_id_seq.to_le_bytes()); 177 batch.put_cf(cf, _rk(orig), _rv(&id_value)); 178 id_value 179 })) 180 } 181 fn estimate_count(&self) -> u64 { 182 self.base.id_seq.load(Ordering::SeqCst) - 1 // -1 because seq zero is reserved 183 } 184} 185impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal, true> 186where 187 Orig: KeyFromRocks, 188 for<'v> &'v IdVal: AsRocksValue, 189 for<'k> &'k Orig: AsRocksKey, 190{ 191 fn get_or_create_id_val( 192 &mut self, 193 db: &DBWithThreadMode<MultiThreaded>, 194 batch: &mut WriteBatch, 195 orig: &Orig, 196 ) -> Result<IdVal> { 197 let cf = db.cf_handle(&self.base.name).unwrap(); 198 let id_val = self.__get_or_create_id_val(&cf, db, batch, orig)?; 199 // TODO: assert that the original is never a u64 that could collide 200 batch.put_cf(&cf, id_val.id().to_be_bytes(), _rk(orig)); // reversed rk/rv on purpose here :/ 201 Ok(id_val) 202 } 203 204 fn get_val_from_id( 205 &self, 206 db: &DBWithThreadMode<MultiThreaded>, 207 id: u64, 208 ) -> Result<Option<Orig>> { 209 let cf = db.cf_handle(&self.base.name).unwrap(); 210 if let Some(orig_bytes) = db.get_cf(&cf, id.to_be_bytes())? { 211 // HACK ish 212 Ok(Some(_kr(&orig_bytes)?)) 213 } else { 214 Ok(None) 215 } 216 } 217} 218impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal, false> 219where 220 Orig: KeyFromRocks, 221 for<'v> &'v IdVal: AsRocksValue, 222 for<'k> &'k Orig: AsRocksKey, 223{ 224 fn get_or_create_id_val( 225 &mut self, 226 db: &DBWithThreadMode<MultiThreaded>, 227 batch: &mut WriteBatch, 228 orig: &Orig, 229 ) -> Result<IdVal> { 230 let cf = db.cf_handle(&self.base.name).unwrap(); 231 self.__get_or_create_id_val(&cf, db, batch, orig) 232 } 233} 234 235impl IdTableValue for DidIdValue { 236 fn new(v: u64) -> Self { 237 DidIdValue(DidId(v), true) 238 } 239 fn id(&self) -> u64 { 240 self.0 .0 241 } 242} 243impl IdTableValue for TargetId { 244 fn new(v: u64) -> Self { 245 TargetId(v) 246 } 247 fn id(&self) -> u64 { 248 self.0 249 } 250} 251 252impl RocksStorage { 253 pub fn new(path: impl AsRef<Path>) -> Result<Self> { 254 Self::describe_metrics(); 255 RocksStorage::open_readmode(path, false) 256 } 257 258 pub fn open_readonly(path: impl AsRef<Path>) -> Result<Self> { 259 RocksStorage::open_readmode(path, true) 260 } 261 262 fn open_readmode(path: impl AsRef<Path>, readonly: bool) -> Result<Self> { 263 let did_id_table = IdTable::<_, _, true>::setup(DID_IDS_CF); 264 let target_id_table = IdTable::<_, _, false>::setup(TARGET_IDS_CF); 265 266 let cfs = vec![ 267 // id reference tables 268 did_id_table.cf_descriptor(), 269 target_id_table.cf_descriptor(), 270 // the reverse links: 271 ColumnFamilyDescriptor::new(TARGET_LINKERS_CF, { 272 let mut opts = rocks_opts_base(); 273 opts.set_merge_operator_associative( 274 "merge_op_extend_did_ids", 275 Self::merge_op_extend_did_ids, 276 ); 277 opts 278 }), 279 // unfortunately we also need forward links to handle deletes 280 ColumnFamilyDescriptor::new(LINK_TARGETS_CF, rocks_opts_base()), 281 ]; 282 283 let db = if readonly { 284 DBWithThreadMode::open_cf_descriptors_read_only(&get_db_read_opts(), path, cfs, false)? 285 } else { 286 DBWithThreadMode::open_cf_descriptors(&get_db_opts(), path, cfs)? 287 }; 288 289 let db = Arc::new(db); 290 let did_id_table = did_id_table.init(&db)?; 291 let target_id_table = target_id_table.init(&db)?; 292 Ok(Self { 293 db, 294 did_id_table, 295 target_id_table, 296 is_writer: !readonly, 297 backup_task: None.into(), 298 }) 299 } 300 301 pub fn start_backup( 302 &mut self, 303 path: PathBuf, 304 auto: Option<(u64, Option<usize>)>, 305 stay_alive: CancellationToken, 306 ) -> Result<()> { 307 let task = if let Some((interval_hrs, copies)) = auto { 308 eprintln!("backups: starting background task..."); 309 self.backup_task(path, interval_hrs, copies, stay_alive) 310 } else { 311 eprintln!("backups: starting a one-off backup..."); 312 thread::spawn({ 313 let db = self.db.clone(); 314 move || Self::do_backup(db, path) 315 }) 316 }; 317 self.backup_task = Arc::new(Some(task)); 318 Ok(()) 319 } 320 321 fn backup_task( 322 &self, 323 path: PathBuf, 324 interval_hrs: u64, 325 copies: Option<usize>, 326 stay_alive: CancellationToken, 327 ) -> std::thread::JoinHandle<Result<()>> { 328 let db = self.db.clone(); 329 thread::spawn(move || { 330 let limit = 331 Ratelimiter::builder(1, Duration::from_secs(interval_hrs * 60 * 60)).build()?; 332 let minimum_sleep = Duration::from_secs(1); 333 334 'quit: loop { 335 if let Err(sleep) = limit.try_wait() { 336 eprintln!("backups: background: next backup scheduled in {sleep:?}"); 337 let waiting = Instant::now(); 338 loop { 339 let remaining = sleep - waiting.elapsed(); 340 if stay_alive.is_cancelled() { 341 break 'quit; 342 } else if remaining <= Duration::ZERO { 343 break; 344 } else if remaining < minimum_sleep { 345 thread::sleep(remaining); 346 break; 347 } else { 348 thread::sleep(minimum_sleep); 349 } 350 } 351 } 352 eprintln!("backups: background: starting backup..."); 353 if let Err(e) = Self::do_backup(db.clone(), &path) { 354 eprintln!("backups: background: backup failed: {e:?}"); 355 // todo: metrics 356 } else { 357 eprintln!("backups: background: backup succeeded yay"); 358 } 359 if let Some(copies) = copies { 360 eprintln!("backups: background: trimming to {copies} saved backups..."); 361 if let Err(e) = Self::trim_backups(copies, &path) { 362 eprintln!("backups: background: failed to trim backups: {e:?}"); 363 } else { 364 eprintln!("backups: background: trimming worked!") 365 } 366 } 367 } 368 369 Ok(()) 370 }) 371 } 372 373 fn do_backup(db: Arc<DBWithThreadMode<MultiThreaded>>, path: impl AsRef<Path>) -> Result<()> { 374 let mut engine = 375 BackupEngine::open(&BackupEngineOptions::new(path)?, &rocksdb::Env::new()?)?; 376 eprintln!("backups: starting backup..."); 377 let t0 = Instant::now(); 378 if let Err(e) = engine.create_new_backup(&db) { 379 eprintln!("backups: oh no, backup failed: {e:?}"); 380 } else { 381 eprintln!("backups: yay, backup worked?"); 382 } 383 eprintln!( 384 "backups: backup finished after {:.2}s", 385 t0.elapsed().as_secs_f32() 386 ); 387 Ok(()) 388 } 389 390 fn trim_backups(num_backups_to_keep: usize, path: impl AsRef<Path>) -> Result<()> { 391 let mut engine = 392 BackupEngine::open(&BackupEngineOptions::new(path)?, &rocksdb::Env::new()?)?; 393 engine.purge_old_backups(num_backups_to_keep)?; 394 Ok(()) 395 } 396 397 fn describe_metrics() { 398 describe_histogram!( 399 "storage_rocksdb_read_seconds", 400 Unit::Seconds, 401 "duration of the read stage of actions" 402 ); 403 describe_histogram!( 404 "storage_rocksdb_action_seconds", 405 Unit::Seconds, 406 "duration of read + write of actions" 407 ); 408 describe_counter!( 409 "storage_rocksdb_batch_ops_total", 410 Unit::Count, 411 "total batched operations from actions" 412 ); 413 describe_histogram!( 414 "storage_rocksdb_delete_account_ops", 415 Unit::Count, 416 "total batched ops for account deletions" 417 ); 418 } 419 420 fn merge_op_extend_did_ids( 421 key: &[u8], 422 existing: Option<&[u8]>, 423 operands: &MergeOperands, 424 ) -> Option<Vec<u8>> { 425 let mut linkers: Vec<_> = if let Some(existing_bytes) = existing { 426 match _vr(existing_bytes) { 427 Ok(TargetLinkers(mut existing_linkers)) => { 428 existing_linkers.reserve(operands.len()); 429 existing_linkers 430 } 431 Err(e) => { 432 eprintln!("bug? could not deserialize existing target linkers: {e:?}. key={key:?}. continuing, but data will be lost!"); 433 if existing_bytes.len() < 1000 { 434 eprintln!("dropping: {existing_bytes:?}"); 435 } else { 436 eprintln!("(too long to print)"); 437 } 438 Vec::with_capacity(operands.len()) 439 } 440 } 441 } else { 442 Vec::with_capacity(operands.len()) 443 }; 444 for new_linkers in operands { 445 match _vr(new_linkers) { 446 Ok(TargetLinkers(new_linkers)) => linkers.extend(new_linkers), 447 Err(e) => { 448 eprintln!("bug? could not deserialize new target linkers: {e:?}. key={key:?}. continuing, but data will be lost!"); 449 if new_linkers.len() < 1000 { 450 eprintln!("skipping: {new_linkers:?}"); 451 } else { 452 eprintln!("(too long to print)"); 453 } 454 } 455 } 456 } 457 Some(_rv(&TargetLinkers(linkers))) 458 } 459 460 fn prefix_iter_cf<K, V, CF, P>( 461 &self, 462 cf: &CF, 463 pre: P, 464 ) -> impl Iterator<Item = (K, V)> + use<'_, K, V, CF, P> 465 where 466 K: KeyFromRocks, 467 V: ValueFromRocks, 468 CF: AsColumnFamilyRef, 469 for<'a> &'a P: AsRocksKeyPrefix<K>, 470 { 471 let mut read_opts = ReadOptions::default(); 472 read_opts.set_iterate_range(PrefixRange(_rkp(&pre))); // TODO verify: inclusive bounds? 473 self.db 474 .iterator_cf_opt(cf, read_opts, IteratorMode::Start) 475 .map_while(Result::ok) 476 .map_while(|(k, v)| Some((_kr(&k).ok()?, _vr(&v).ok()?))) 477 } 478 479 fn update_did_id_value<F>(&self, batch: &mut WriteBatch, did: &Did, update: F) -> Result<bool> 480 where 481 F: FnOnce(DidIdValue) -> Option<DidIdValue>, 482 { 483 let cf = self.db.cf_handle(DID_IDS_CF).unwrap(); 484 let Some(did_id_value) = self.did_id_table.get_id_val(&self.db, did)? else { 485 return Ok(false); 486 }; 487 let Some(new_did_id_value) = update(did_id_value) else { 488 return Ok(false); 489 }; 490 batch.put_cf(&cf, _rk(did), _rv(&new_did_id_value)); 491 Ok(true) 492 } 493 fn delete_did_id_value(&self, batch: &mut WriteBatch, did: &Did) { 494 let cf = self.db.cf_handle(DID_IDS_CF).unwrap(); 495 batch.delete_cf(&cf, _rk(did)); 496 } 497 498 fn get_target_linkers(&self, target_id: &TargetId) -> Result<TargetLinkers> { 499 let cf = self.db.cf_handle(TARGET_LINKERS_CF).unwrap(); 500 let Some(linkers_bytes) = self.db.get_cf(&cf, _rk(target_id))? else { 501 return Ok(TargetLinkers::default()); 502 }; 503 _vr(&linkers_bytes) 504 } 505 /// zero out every duplicate did. bit of a hack, looks the same as deleted, but eh 506 fn get_distinct_target_linkers(&self, target_id: &TargetId) -> Result<TargetLinkers> { 507 let mut seen = HashSet::new(); 508 let mut linkers = self.get_target_linkers(target_id)?; 509 for (did_id, _) in linkers.0.iter_mut() { 510 if seen.contains(did_id) { 511 did_id.0 = 0; 512 } else { 513 seen.insert(*did_id); 514 } 515 } 516 Ok(linkers) 517 } 518 fn merge_target_linker( 519 &self, 520 batch: &mut WriteBatch, 521 target_id: &TargetId, 522 linker_did_id: &DidId, 523 linker_rkey: &RKey, 524 ) { 525 let cf = self.db.cf_handle(TARGET_LINKERS_CF).unwrap(); 526 batch.merge_cf( 527 &cf, 528 _rk(target_id), 529 _rv(&TargetLinkers(vec![(*linker_did_id, linker_rkey.clone())])), 530 ); 531 } 532 fn update_target_linkers<F>( 533 &self, 534 batch: &mut WriteBatch, 535 target_id: &TargetId, 536 update: F, 537 ) -> Result<bool> 538 where 539 F: FnOnce(TargetLinkers) -> Option<TargetLinkers>, 540 { 541 let cf = self.db.cf_handle(TARGET_LINKERS_CF).unwrap(); 542 let existing_linkers = self.get_target_linkers(target_id)?; 543 let Some(new_linkers) = update(existing_linkers) else { 544 return Ok(false); 545 }; 546 batch.put_cf(&cf, _rk(target_id), _rv(&new_linkers)); 547 Ok(true) 548 } 549 550 fn put_link_targets( 551 &self, 552 batch: &mut WriteBatch, 553 record_link_key: &RecordLinkKey, 554 targets: &RecordLinkTargets, 555 ) { 556 // todo: we are almost idempotent to link creates with this blind write, but we'll still 557 // merge in the reverse index. we could read+modify+write here but it'll be SLOWWWWW on 558 // the path that we need to be fast. we could go back to a merge op and probably be 559 // consistent. or we can accept just a littttttle inconsistency and be idempotent on 560 // forward links but not reverse, slightly messing up deletes :/ 561 // _maybe_ we could run in slow idempotent r-m-w mode during firehose catch-up at the start, 562 // then switch to the fast version? 563 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap(); 564 batch.put_cf(&cf, _rk(record_link_key), _rv(targets)); 565 } 566 fn get_record_link_targets( 567 &self, 568 record_link_key: &RecordLinkKey, 569 ) -> Result<Option<RecordLinkTargets>> { 570 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap(); 571 if let Some(bytes) = self.db.get_cf(&cf, _rk(record_link_key))? { 572 Ok(Some(_vr(&bytes)?)) 573 } else { 574 Ok(None) 575 } 576 } 577 fn delete_record_link(&self, batch: &mut WriteBatch, record_link_key: &RecordLinkKey) { 578 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap(); 579 batch.delete_cf(&cf, _rk(record_link_key)); 580 } 581 fn iter_links_for_did_id( 582 &self, 583 did_id: &DidId, 584 ) -> impl Iterator<Item = (RecordLinkKey, RecordLinkTargets)> + use<'_> { 585 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap(); 586 self.prefix_iter_cf(&cf, RecordLinkKeyDidIdPrefix(*did_id)) 587 } 588 fn iter_targets_for_target( 589 &self, 590 target: &Target, 591 ) -> impl Iterator<Item = (TargetKey, TargetId)> + use<'_> { 592 let cf = self.db.cf_handle(TARGET_IDS_CF).unwrap(); 593 self.prefix_iter_cf(&cf, TargetIdTargetPrefix(target.clone())) 594 } 595 596 // 597 // higher-level event action handlers 598 // 599 600 fn add_links( 601 &mut self, 602 record_id: &RecordId, 603 links: &[CollectedLink], 604 batch: &mut WriteBatch, 605 ) -> Result<()> { 606 let DidIdValue(did_id, _) = 607 self.did_id_table 608 .get_or_create_id_val(&self.db, batch, &record_id.did)?; 609 610 let record_link_key = RecordLinkKey( 611 did_id, 612 Collection(record_id.collection()), 613 RKey(record_id.rkey()), 614 ); 615 let mut record_link_targets = RecordLinkTargets::with_capacity(links.len()); 616 617 for CollectedLink { target, path } in links { 618 let target_key = TargetKey( 619 Target(target.clone().into_string()), 620 Collection(record_id.collection()), 621 RPath(path.clone()), 622 ); 623 let target_id = 624 self.target_id_table 625 .get_or_create_id_val(&self.db, batch, &target_key)?; 626 self.merge_target_linker(batch, &target_id, &did_id, &RKey(record_id.rkey())); 627 628 record_link_targets.add(RecordLinkTarget(RPath(path.clone()), target_id)) 629 } 630 631 self.put_link_targets(batch, &record_link_key, &record_link_targets); 632 Ok(()) 633 } 634 635 fn remove_links(&mut self, record_id: &RecordId, batch: &mut WriteBatch) -> Result<()> { 636 let Some(DidIdValue(linking_did_id, _)) = 637 self.did_id_table.get_id_val(&self.db, &record_id.did)? 638 else { 639 return Ok(()); // we don't know her: nothing to do 640 }; 641 642 let record_link_key = RecordLinkKey( 643 linking_did_id, 644 Collection(record_id.collection()), 645 RKey(record_id.rkey()), 646 ); 647 let Some(record_link_targets) = self.get_record_link_targets(&record_link_key)? else { 648 return Ok(()); // we don't have these links 649 }; 650 651 // we do read -> modify -> write here: could merge-op in the deletes instead? 652 // otherwise it's another single-thread-constraining thing. 653 for RecordLinkTarget(_, target_id) in record_link_targets.0 { 654 self.update_target_linkers(batch, &target_id, |mut linkers| { 655 if linkers.0.is_empty() { 656 eprintln!("bug? linked target was missing when removing links"); 657 } 658 if !linkers.remove_linker(&linking_did_id, &RKey(record_id.rkey.clone())) { 659 eprintln!("bug? linked target was missing a link when removing links"); 660 } 661 Some(linkers) 662 })?; 663 } 664 665 self.delete_record_link(batch, &record_link_key); 666 Ok(()) 667 } 668 669 fn set_account(&mut self, did: &Did, active: bool, batch: &mut WriteBatch) -> Result<()> { 670 // this needs to be read-modify-write since the did_id needs to stay the same, 671 // which has a benefit of allowing to avoid adding entries for dids we don't 672 // need. reading on dids needs to be cheap anyway for the current design, and 673 // did active/inactive updates are low-freq in the firehose so, eh, it's fine. 674 self.update_did_id_value(batch, did, |current_value| { 675 Some(DidIdValue(current_value.did_id(), active)) 676 })?; 677 Ok(()) 678 } 679 680 fn delete_account(&mut self, did: &Did, batch: &mut WriteBatch) -> Result<usize> { 681 let mut total_batched_ops = 0; 682 let Some(DidIdValue(did_id, _)) = self.did_id_table.get_id_val(&self.db, did)? else { 683 return Ok(total_batched_ops); // ignore updates for dids we don't know about 684 }; 685 self.delete_did_id_value(batch, did); 686 // TODO: also delete the reverse!! 687 688 // use a separate batch for all their links, since it can be a lot and make us crash at around 1GiB batch size. 689 // this should still hopefully be crash-safe: as long as we don't actually delete the DidId entry until after all links are cleared. 690 // the above .delete_did_id_value is batched, so it shouldn't be written until we've returned from this fn successfully 691 // TODO: queue a background delete task or whatever 692 // TODO: test delete account with more links than chunk size 693 let stuff: Vec<_> = self.iter_links_for_did_id(&did_id).collect(); 694 for chunk in stuff.chunks(1024) { 695 let mut mini_batch = WriteBatch::default(); 696 697 for (record_link_key, links) in chunk { 698 self.delete_record_link(&mut mini_batch, record_link_key); // _could_ use delete range here instead of individual deletes, but since we have to scan anyway it's not obvious if it's better 699 700 for RecordLinkTarget(_, target_link_id) in links.0.iter() { 701 self.update_target_linkers(&mut mini_batch, target_link_id, |mut linkers| { 702 if !linkers.remove_linker(&did_id, &record_link_key.2) { 703 eprintln!("bug? could not find linker when removing links while deleting an account"); 704 } 705 Some(linkers) 706 })?; 707 } 708 } 709 total_batched_ops += mini_batch.len(); 710 self.db.write(mini_batch)?; // todo 711 } 712 Ok(total_batched_ops) 713 } 714} 715 716impl Drop for RocksStorage { 717 fn drop(&mut self) { 718 if self.is_writer { 719 println!("rocksdb writer: cleaning up for shutdown..."); 720 if let Err(e) = self.db.flush_wal(true) { 721 eprintln!("rocks: flushing wal failed: {e:?}"); 722 } 723 if let Err(e) = self.db.flush_opt(&{ 724 let mut opt = rocksdb::FlushOptions::default(); 725 opt.set_wait(true); 726 opt 727 }) { 728 eprintln!("rocks: flushing memtables failed: {e:?}"); 729 } 730 match Arc::get_mut(&mut self.backup_task) { 731 Some(maybe_task) => { 732 if let Some(task) = maybe_task.take() { 733 eprintln!("waiting for backup task to complete..."); 734 if let Err(e) = task.join() { 735 eprintln!("failed to join backup task: {e:?}"); 736 } 737 } 738 } 739 None => eprintln!("rocks: failed to get backup task, likely a bug."), 740 } 741 self.db.cancel_all_background_work(true); 742 } 743 } 744} 745 746impl AsRocksValue for u64 {} 747impl ValueFromRocks for u64 {} 748 749impl LinkStorage for RocksStorage { 750 fn get_cursor(&mut self) -> Result<Option<u64>> { 751 self.db 752 .get(JETSTREAM_CURSOR_KEY)? 753 .map(|b| _vr(&b)) 754 .transpose() 755 } 756 757 fn push(&mut self, event: &ActionableEvent, cursor: u64) -> Result<()> { 758 // normal ops 759 let mut batch = WriteBatch::default(); 760 let t0 = Instant::now(); 761 if let Some(action) = match event { 762 ActionableEvent::CreateLinks { record_id, links } => { 763 self.add_links(record_id, links, &mut batch)?; 764 Some("create_links") 765 } 766 ActionableEvent::UpdateLinks { 767 record_id, 768 new_links, 769 } => { 770 self.remove_links(record_id, &mut batch)?; 771 self.add_links(record_id, new_links, &mut batch)?; 772 Some("update_links") 773 } 774 ActionableEvent::DeleteRecord(record_id) => { 775 self.remove_links(record_id, &mut batch)?; 776 Some("delete_record") 777 } 778 ActionableEvent::ActivateAccount(did) => { 779 self.set_account(did, true, &mut batch)?; 780 Some("set_account_status") 781 } 782 ActionableEvent::DeactivateAccount(did) => { 783 self.set_account(did, false, &mut batch)?; 784 Some("set_account_status") 785 } 786 ActionableEvent::DeleteAccount(_) => None, // delete account is handled specially 787 } { 788 let t_read = t0.elapsed(); 789 batch.put(JETSTREAM_CURSOR_KEY.as_bytes(), _rv(cursor)); 790 let batch_ops = batch.len(); 791 self.db.write(batch)?; 792 let t_total = t0.elapsed(); 793 794 histogram!("storage_rocksdb_read_seconds", "action" => action) 795 .record(t_read.as_secs_f64()); 796 histogram!("storage_rocksdb_action_seconds", "action" => action) 797 .record(t_total.as_secs_f64()); 798 counter!("storage_rocksdb_batch_ops_total", "action" => action) 799 .increment(batch_ops as u64); 800 } 801 802 // special metrics for account deletion which can be arbitrarily expensive 803 let mut outer_batch = WriteBatch::default(); 804 let t0 = Instant::now(); 805 if let ActionableEvent::DeleteAccount(did) = event { 806 let inner_batch_ops = self.delete_account(did, &mut outer_batch)?; 807 let total_batch_ops = inner_batch_ops + outer_batch.len(); 808 self.db.write(outer_batch)?; 809 let t_total = t0.elapsed(); 810 811 histogram!("storage_rocksdb_action_seconds", "action" => "delete_account") 812 .record(t_total.as_secs_f64()); 813 counter!("storage_rocksdb_batch_ops_total", "action" => "delete_account") 814 .increment(total_batch_ops as u64); 815 histogram!("storage_rocksdb_delete_account_ops").record(total_batch_ops as f64); 816 } 817 818 Ok(()) 819 } 820 821 fn to_readable(&mut self) -> impl LinkReader { 822 let mut readable = self.clone(); 823 readable.is_writer = false; 824 readable 825 } 826} 827 828impl LinkReader for RocksStorage { 829 fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> { 830 let target_key = TargetKey( 831 Target(target.to_string()), 832 Collection(collection.to_string()), 833 RPath(path.to_string()), 834 ); 835 if let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? { 836 let (alive, _) = self.get_target_linkers(&target_id)?.count(); 837 Ok(alive) 838 } else { 839 Ok(0) 840 } 841 } 842 843 fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> { 844 let target_key = TargetKey( 845 Target(target.to_string()), 846 Collection(collection.to_string()), 847 RPath(path.to_string()), 848 ); 849 if let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? { 850 Ok(self.get_target_linkers(&target_id)?.count_distinct_dids()) 851 } else { 852 Ok(0) 853 } 854 } 855 856 fn get_links( 857 &self, 858 target: &str, 859 collection: &str, 860 path: &str, 861 limit: u64, 862 until: Option<u64>, 863 filter_dids: &HashSet<Did>, 864 ) -> Result<PagedAppendingCollection<RecordId>> { 865 let target_key = TargetKey( 866 Target(target.to_string()), 867 Collection(collection.to_string()), 868 RPath(path.to_string()), 869 ); 870 871 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 872 return Ok(PagedAppendingCollection { 873 version: (0, 0), 874 items: Vec::new(), 875 next: None, 876 total: 0, 877 }); 878 }; 879 880 let mut linkers = self.get_target_linkers(&target_id)?; 881 if !filter_dids.is_empty() { 882 let mut did_filter = HashSet::new(); 883 for did in filter_dids { 884 let Some(DidIdValue(did_id, active)) = 885 self.did_id_table.get_id_val(&self.db, did)? 886 else { 887 eprintln!("failed to find a did_id for {did:?}"); 888 continue; 889 }; 890 if !active { 891 eprintln!("excluding inactive did from filtered results"); 892 continue; 893 } 894 did_filter.insert(did_id); 895 } 896 linkers.0.retain(|linker| did_filter.contains(&linker.0)); 897 } 898 899 let (alive, gone) = linkers.count(); 900 let total = alive + gone; 901 let end = until.map(|u| std::cmp::min(u, total)).unwrap_or(total) as usize; 902 let begin = end.saturating_sub(limit as usize); 903 let next = if begin == 0 { None } else { Some(begin as u64) }; 904 905 let did_id_rkeys = linkers.0[begin..end].iter().rev().collect::<Vec<_>>(); 906 907 let mut items = Vec::with_capacity(did_id_rkeys.len()); 908 // TODO: use get-many (or multi-get or whatever it's called) 909 for (did_id, rkey) in did_id_rkeys { 910 if did_id.is_empty() { 911 continue; 912 } 913 if let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? { 914 let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? 915 else { 916 eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?"); 917 continue; 918 }; 919 if !active { 920 continue; 921 } 922 items.push(RecordId { 923 did, 924 collection: collection.to_string(), 925 rkey: rkey.0.clone(), 926 }); 927 } else { 928 eprintln!("failed to look up did from did_id {did_id:?}"); 929 } 930 } 931 932 Ok(PagedAppendingCollection { 933 version: (total, gone), 934 items, 935 next, 936 total: alive, 937 }) 938 } 939 940 fn get_distinct_dids( 941 &self, 942 target: &str, 943 collection: &str, 944 path: &str, 945 limit: u64, 946 until: Option<u64>, 947 ) -> Result<PagedAppendingCollection<Did>> { 948 let target_key = TargetKey( 949 Target(target.to_string()), 950 Collection(collection.to_string()), 951 RPath(path.to_string()), 952 ); 953 954 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 955 return Ok(PagedAppendingCollection { 956 version: (0, 0), 957 items: Vec::new(), 958 next: None, 959 total: 0, 960 }); 961 }; 962 963 let linkers = self.get_distinct_target_linkers(&target_id)?; 964 965 let (alive, gone) = linkers.count(); 966 let total = alive + gone; 967 let end = until.map(|u| std::cmp::min(u, total)).unwrap_or(total) as usize; 968 let begin = end.saturating_sub(limit as usize); 969 let next = if begin == 0 { None } else { Some(begin as u64) }; 970 971 let did_id_rkeys = linkers.0[begin..end].iter().rev().collect::<Vec<_>>(); 972 973 let mut items = Vec::with_capacity(did_id_rkeys.len()); 974 // TODO: use get-many (or multi-get or whatever it's called) 975 for (did_id, _) in did_id_rkeys { 976 if did_id.is_empty() { 977 continue; 978 } 979 if let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? { 980 let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? 981 else { 982 eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?"); 983 continue; 984 }; 985 if !active { 986 continue; 987 } 988 items.push(did); 989 } else { 990 eprintln!("failed to look up did from did_id {did_id:?}"); 991 } 992 } 993 994 Ok(PagedAppendingCollection { 995 version: (total, gone), 996 items, 997 next, 998 total: alive, 999 }) 1000 } 1001 1002 fn get_all_record_counts(&self, target: &str) -> Result<HashMap<String, HashMap<String, u64>>> { 1003 let mut out: HashMap<String, HashMap<String, u64>> = HashMap::new(); 1004 for (target_key, target_id) in self.iter_targets_for_target(&Target(target.into())) { 1005 let TargetKey(_, Collection(ref collection), RPath(ref path)) = target_key; 1006 let (count, _) = self.get_target_linkers(&target_id)?.count(); 1007 out.entry(collection.into()) 1008 .or_default() 1009 .insert(path.clone(), count); 1010 } 1011 Ok(out) 1012 } 1013 1014 fn get_all_counts( 1015 &self, 1016 target: &str, 1017 ) -> Result<HashMap<String, HashMap<String, CountsByCount>>> { 1018 let mut out: HashMap<String, HashMap<String, CountsByCount>> = HashMap::new(); 1019 for (target_key, target_id) in self.iter_targets_for_target(&Target(target.into())) { 1020 let TargetKey(_, Collection(ref collection), RPath(ref path)) = target_key; 1021 let target_linkers = self.get_target_linkers(&target_id)?; 1022 let (records, _) = target_linkers.count(); 1023 let distinct_dids = target_linkers.count_distinct_dids(); 1024 out.entry(collection.into()).or_default().insert( 1025 path.clone(), 1026 CountsByCount { 1027 records, 1028 distinct_dids, 1029 }, 1030 ); 1031 } 1032 Ok(out) 1033 } 1034 1035 fn get_stats(&self) -> Result<StorageStats> { 1036 let dids = self.did_id_table.estimate_count(); 1037 let targetables = self.target_id_table.estimate_count(); 1038 let lr_cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap(); 1039 let linking_records = self 1040 .db 1041 .property_value_cf(&lr_cf, rocksdb::properties::ESTIMATE_NUM_KEYS)? 1042 .map(|s| s.parse::<u64>()) 1043 .transpose()? 1044 .unwrap_or(0); 1045 Ok(StorageStats { 1046 dids, 1047 targetables, 1048 linking_records, 1049 }) 1050 } 1051} 1052 1053trait AsRocksKey: Serialize {} 1054trait AsRocksKeyPrefix<K: KeyFromRocks>: Serialize {} 1055trait AsRocksValue: Serialize {} 1056trait KeyFromRocks: for<'de> Deserialize<'de> {} 1057trait ValueFromRocks: for<'de> Deserialize<'de> {} 1058 1059// did_id table 1060impl AsRocksKey for &Did {} 1061impl AsRocksValue for &DidIdValue {} 1062impl ValueFromRocks for DidIdValue {} 1063 1064// temp 1065impl KeyFromRocks for Did {} 1066impl AsRocksKey for &DidId {} 1067 1068// target_ids table 1069impl AsRocksKey for &TargetKey {} 1070impl AsRocksKeyPrefix<TargetKey> for &TargetIdTargetPrefix {} 1071impl AsRocksValue for &TargetId {} 1072impl KeyFromRocks for TargetKey {} 1073impl ValueFromRocks for TargetId {} 1074 1075// target_links table 1076impl AsRocksKey for &TargetId {} 1077impl AsRocksValue for &TargetLinkers {} 1078impl ValueFromRocks for TargetLinkers {} 1079 1080// record_link_targets table 1081impl AsRocksKey for &RecordLinkKey {} 1082impl AsRocksKeyPrefix<RecordLinkKey> for &RecordLinkKeyDidIdPrefix {} 1083impl AsRocksValue for &RecordLinkTargets {} 1084impl KeyFromRocks for RecordLinkKey {} 1085impl ValueFromRocks for RecordLinkTargets {} 1086 1087pub fn _bincode_opts() -> impl BincodeOptions { 1088 bincode::DefaultOptions::new().with_big_endian() // happier db -- numeric prefixes in lsm 1089} 1090fn _rk(k: impl AsRocksKey) -> Vec<u8> { 1091 _bincode_opts().serialize(&k).unwrap() 1092} 1093fn _rkp<K: KeyFromRocks>(kp: impl AsRocksKeyPrefix<K>) -> Vec<u8> { 1094 _bincode_opts().serialize(&kp).unwrap() 1095} 1096fn _rv(v: impl AsRocksValue) -> Vec<u8> { 1097 _bincode_opts().serialize(&v).unwrap() 1098} 1099fn _kr<T: KeyFromRocks>(bytes: &[u8]) -> Result<T> { 1100 Ok(_bincode_opts().deserialize(bytes)?) 1101} 1102fn _vr<T: ValueFromRocks>(bytes: &[u8]) -> Result<T> { 1103 Ok(_bincode_opts().deserialize(bytes)?) 1104} 1105 1106#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] 1107pub struct Collection(pub String); 1108 1109#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] 1110pub struct RPath(pub String); 1111 1112#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] 1113pub struct RKey(pub String); 1114 1115impl RKey { 1116 fn empty() -> Self { 1117 RKey("".to_string()) 1118 } 1119} 1120 1121// did ids 1122#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] 1123pub struct DidId(pub u64); 1124 1125impl DidId { 1126 fn empty() -> Self { 1127 DidId(0) 1128 } 1129 fn is_empty(&self) -> bool { 1130 self.0 == 0 1131 } 1132} 1133 1134#[derive(Debug, Clone, Serialize, Deserialize)] 1135struct DidIdValue(DidId, bool); // active or not 1136 1137impl DidIdValue { 1138 fn did_id(&self) -> DidId { 1139 let Self(id, _) = self; 1140 *id 1141 } 1142} 1143 1144// target ids 1145#[derive(Debug, Clone, Serialize, Deserialize)] 1146struct TargetId(u64); // key 1147 1148#[derive(Debug, Clone, Serialize, Deserialize)] 1149pub struct Target(pub String); // the actual target/uri 1150 1151// targets (uris, dids, etc.): the reverse index 1152#[derive(Debug, Clone, Serialize, Deserialize)] 1153pub struct TargetKey(pub Target, pub Collection, pub RPath); 1154 1155#[derive(Debug, Default, Serialize, Deserialize)] 1156pub struct TargetLinkers(pub Vec<(DidId, RKey)>); 1157 1158impl TargetLinkers { 1159 fn remove_linker(&mut self, did: &DidId, rkey: &RKey) -> bool { 1160 if let Some(entry) = self.0.iter_mut().rfind(|d| **d == (*did, rkey.clone())) { 1161 *entry = (DidId::empty(), RKey::empty()); 1162 true 1163 } else { 1164 false 1165 } 1166 } 1167 pub fn count(&self) -> (u64, u64) { 1168 // (linkers, deleted links) 1169 let total = self.0.len() as u64; 1170 let alive = self.0.iter().filter(|(DidId(id), _)| *id != 0).count() as u64; 1171 let gone = total - alive; 1172 (alive, gone) 1173 } 1174 fn count_distinct_dids(&self) -> u64 { 1175 self.0 1176 .iter() 1177 .filter_map(|(DidId(id), _)| if *id == 0 { None } else { Some(id) }) 1178 .collect::<HashSet<_>>() 1179 .len() as u64 1180 } 1181} 1182 1183// forward links to targets so we can delete links 1184#[derive(Debug, Serialize, Deserialize)] 1185struct RecordLinkKey(DidId, Collection, RKey); 1186 1187// does this even work???? 1188#[derive(Debug, Serialize, Deserialize)] 1189struct RecordLinkKeyDidIdPrefix(DidId); 1190 1191#[derive(Debug, Serialize, Deserialize)] 1192struct TargetIdTargetPrefix(Target); 1193 1194#[derive(Debug, Serialize, Deserialize)] 1195struct RecordLinkTarget(RPath, TargetId); 1196 1197#[derive(Debug, Default, Serialize, Deserialize)] 1198struct RecordLinkTargets(Vec<RecordLinkTarget>); 1199 1200impl RecordLinkTargets { 1201 fn with_capacity(cap: usize) -> Self { 1202 Self(Vec::with_capacity(cap)) 1203 } 1204 fn add(&mut self, target: RecordLinkTarget) { 1205 self.0.push(target) 1206 } 1207} 1208 1209#[cfg(test)] 1210mod tests { 1211 use super::super::ActionableEvent; 1212 use super::*; 1213 use links::Link; 1214 use tempfile::tempdir; 1215 1216 #[test] 1217 fn rocks_delete_iterator_regression() -> Result<()> { 1218 let mut store = RocksStorage::new(tempdir()?)?; 1219 1220 // create a link from the deleter account 1221 store.push( 1222 &ActionableEvent::CreateLinks { 1223 record_id: RecordId { 1224 did: "did:plc:will-shortly-delete".into(), 1225 collection: "a.b.c".into(), 1226 rkey: "asdf".into(), 1227 }, 1228 links: vec![CollectedLink { 1229 target: Link::Uri("example.com".into()), 1230 path: ".uri".into(), 1231 }], 1232 }, 1233 0, 1234 )?; 1235 1236 // and a different link from a separate, new account (later in didid prefix iteration) 1237 store.push( 1238 &ActionableEvent::CreateLinks { 1239 record_id: RecordId { 1240 did: "did:plc:someone-else".into(), 1241 collection: "a.b.c".into(), 1242 rkey: "asdf".into(), 1243 }, 1244 links: vec![CollectedLink { 1245 target: Link::Uri("another.example.com".into()), 1246 path: ".uri".into(), 1247 }], 1248 }, 1249 0, 1250 )?; 1251 1252 // now delete the first account (this is where the buggy version explodes) 1253 store.push( 1254 &ActionableEvent::DeleteAccount("did:plc:will-shortly-delete".into()), 1255 0, 1256 )?; 1257 1258 Ok(()) 1259 } 1260 1261 #[test] 1262 fn rocks_prefix_iteration_helper() -> Result<()> { 1263 #[derive(Serialize, Deserialize)] 1264 struct Key(u8, u8); 1265 1266 #[derive(Serialize)] 1267 struct KeyPrefix(u8); 1268 1269 #[derive(Serialize, Deserialize)] 1270 struct Value(()); 1271 1272 impl AsRocksKey for &Key {} 1273 impl AsRocksKeyPrefix<Key> for &KeyPrefix {} 1274 impl AsRocksValue for &Value {} 1275 1276 impl KeyFromRocks for Key {} 1277 impl ValueFromRocks for Value {} 1278 1279 let data = RocksStorage::new(tempdir()?)?; 1280 let cf = data.db.cf_handle(DID_IDS_CF).unwrap(); 1281 let mut batch = WriteBatch::default(); 1282 1283 // not our prefix 1284 batch.put_cf(&cf, _rk(&Key(0x01, 0x00)), _rv(&Value(()))); 1285 batch.put_cf(&cf, _rk(&Key(0x01, 0xFF)), _rv(&Value(()))); 1286 1287 // our prefix! 1288 for i in 0..=0xFF { 1289 batch.put_cf(&cf, _rk(&Key(0x02, i)), _rv(&Value(()))); 1290 } 1291 1292 // not our prefix 1293 batch.put_cf(&cf, _rk(&Key(0x03, 0x00)), _rv(&Value(()))); 1294 batch.put_cf(&cf, _rk(&Key(0x03, 0xFF)), _rv(&Value(()))); 1295 1296 data.db.write(batch)?; 1297 1298 let mut okays: [bool; 256] = [false; 256]; 1299 for (i, (k, Value(_))) in data.prefix_iter_cf(&cf, KeyPrefix(0x02)).enumerate() { 1300 assert!(i < 256); 1301 assert_eq!(k.0, 0x02, "prefix iterated key has the right prefix"); 1302 assert_eq!(k.1 as usize, i, "prefixed keys are iterated in exact order"); 1303 okays[k.1 as usize] = true; 1304 } 1305 assert!(okays.iter().all(|b| *b), "every key was iterated"); 1306 1307 Ok(()) 1308 } 1309 1310 // TODO: add tests for key prefixes actually prefixing (bincode encoding _should_...) 1311}