Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use super::{ActionableEvent, LinkReader, LinkStorage, PagedAppendingCollection, StorageStats}; 2use crate::{CountsByCount, Did, RecordId}; 3use anyhow::{bail, Result}; 4use bincode::Options as BincodeOptions; 5use links::CollectedLink; 6use metrics::{counter, describe_counter, describe_histogram, histogram, Unit}; 7use ratelimit::Ratelimiter; 8use rocksdb::backup::{BackupEngine, BackupEngineOptions}; 9use rocksdb::{ 10 AsColumnFamilyRef, ColumnFamilyDescriptor, DBWithThreadMode, IteratorMode, MergeOperands, 11 MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch, 12}; 13use serde::{Deserialize, Serialize}; 14use std::collections::{HashMap, HashSet}; 15use std::io::Read; 16use std::marker::PhantomData; 17use std::path::{Path, PathBuf}; 18use std::sync::{ 19 atomic::{AtomicU64, Ordering}, 20 Arc, 21}; 22use std::thread; 23use std::time::{Duration, Instant}; 24use tokio_util::sync::CancellationToken; 25 26static DID_IDS_CF: &str = "did_ids"; 27static TARGET_IDS_CF: &str = "target_ids"; 28static TARGET_LINKERS_CF: &str = "target_links"; 29static LINK_TARGETS_CF: &str = "link_targets"; 30 31static JETSTREAM_CURSOR_KEY: &str = "jetstream_cursor"; 32 33// todo: actually understand and set these options probably better 34fn rocks_opts_base() -> Options { 35 let mut opts = Options::default(); 36 opts.set_level_compaction_dynamic_level_bytes(true); 37 opts.create_if_missing(true); 38 opts.set_compression_type(rocksdb::DBCompressionType::Lz4); 39 opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd); // this probably doesn't work because it hasn't been enabled 40 // TODO: actually enable the bottommost compression. but after other changes run for a bit in case zstd is cpu- or mem-expensive. 41 opts 42} 43fn get_db_opts() -> Options { 44 let mut opts = rocks_opts_base(); 45 opts.create_missing_column_families(true); 46 opts.increase_parallelism(4); // todo: make configurable if anyone else actually runs a different instance. start at # of cores 47 // consider doing optimize_level_style_compaction or optimize_universal_style_compaction 48 opts 49} 50fn get_db_read_opts() -> Options { 51 let mut opts = Options::default(); 52 opts.optimize_for_point_lookup(16_384); // mb (run this on big machines) 53 opts 54} 55 56#[derive(Debug, Clone)] 57pub struct RocksStorage { 58 pub db: Arc<DBWithThreadMode<MultiThreaded>>, // TODO: mov seqs here (concat merge op will be fun) 59 did_id_table: IdTable<Did, DidIdValue, true>, 60 target_id_table: IdTable<TargetKey, TargetId, false>, 61 is_writer: bool, 62 backup_task: Arc<Option<thread::JoinHandle<Result<()>>>>, 63} 64 65trait IdTableValue: ValueFromRocks + Clone { 66 fn new(v: u64) -> Self; 67 fn id(&self) -> u64; 68} 69#[derive(Debug, Clone)] 70struct IdTableBase<Orig, IdVal: IdTableValue> 71where 72 Orig: KeyFromRocks, 73 for<'a> &'a Orig: AsRocksKey, 74{ 75 _key_marker: PhantomData<Orig>, 76 _val_marker: PhantomData<IdVal>, 77 name: String, 78 id_seq: Arc<AtomicU64>, 79} 80impl<Orig, IdVal: IdTableValue> IdTableBase<Orig, IdVal> 81where 82 Orig: KeyFromRocks, 83 for<'a> &'a Orig: AsRocksKey, 84{ 85 fn cf_descriptor(&self) -> ColumnFamilyDescriptor { 86 ColumnFamilyDescriptor::new(&self.name, rocks_opts_base()) 87 } 88 fn init<const WITH_REVERSE: bool>( 89 self, 90 db: &DBWithThreadMode<MultiThreaded>, 91 ) -> Result<IdTable<Orig, IdVal, WITH_REVERSE>> { 92 if db.cf_handle(&self.name).is_none() { 93 bail!("failed to get cf handle from db -- was the db open with our .cf_descriptor()?"); 94 } 95 let priv_id_seq = if let Some(seq_bytes) = db.get(self.seq_key())? { 96 if seq_bytes.len() != 8 { 97 bail!( 98 "reading bytes for u64 id seq {:?}: found the wrong number of bytes", 99 self.seq_key() 100 ); 101 } 102 let mut buf: [u8; 8] = [0; 8]; 103 seq_bytes.as_slice().read_exact(&mut buf)?; 104 let last_seq = u64::from_le_bytes(buf); 105 last_seq + 1 106 } else { 107 1 108 }; 109 self.id_seq.store(priv_id_seq, Ordering::SeqCst); 110 Ok(IdTable { 111 base: self, 112 priv_id_seq, 113 }) 114 } 115 fn seq_key(&self) -> Vec<u8> { 116 let mut k = b"__id_seq_key_plz_be_unique:".to_vec(); 117 k.extend(self.name.as_bytes()); 118 k 119 } 120} 121#[derive(Debug, Clone)] 122struct IdTable<Orig, IdVal: IdTableValue, const WITH_REVERSE: bool> 123where 124 Orig: KeyFromRocks, 125 for<'a> &'a Orig: AsRocksKey, 126{ 127 base: IdTableBase<Orig, IdVal>, 128 priv_id_seq: u64, 129} 130impl<Orig: Clone, IdVal: IdTableValue, const WITH_REVERSE: bool> IdTable<Orig, IdVal, WITH_REVERSE> 131where 132 Orig: KeyFromRocks, 133 for<'v> &'v IdVal: AsRocksValue, 134 for<'k> &'k Orig: AsRocksKey, 135{ 136 #[must_use] 137 fn setup(name: &str) -> IdTableBase<Orig, IdVal> { 138 IdTableBase::<Orig, IdVal> { 139 _key_marker: PhantomData, 140 _val_marker: PhantomData, 141 name: name.into(), 142 id_seq: Arc::new(AtomicU64::new(0)), // zero is "uninint", first seq num will be 1 143 } 144 } 145 fn get_id_val( 146 &self, 147 db: &DBWithThreadMode<MultiThreaded>, 148 orig: &Orig, 149 ) -> Result<Option<IdVal>> { 150 let cf = db.cf_handle(&self.base.name).unwrap(); 151 if let Some(_id_bytes) = db.get_cf(&cf, _rk(orig))? { 152 Ok(Some(_vr(&_id_bytes)?)) 153 } else { 154 Ok(None) 155 } 156 } 157 fn __get_or_create_id_val<CF>( 158 &mut self, 159 cf: &CF, 160 db: &DBWithThreadMode<MultiThreaded>, 161 batch: &mut WriteBatch, 162 orig: &Orig, 163 ) -> Result<IdVal> 164 where 165 CF: AsColumnFamilyRef, 166 { 167 Ok(self.get_id_val(db, orig)?.unwrap_or_else(|| { 168 let prev_priv_seq = self.priv_id_seq; 169 self.priv_id_seq += 1; 170 let prev_public_seq = self.base.id_seq.swap(self.priv_id_seq, Ordering::SeqCst); 171 assert_eq!( 172 prev_public_seq, prev_priv_seq, 173 "public seq may have been modified??" 174 ); 175 let id_value = IdVal::new(self.priv_id_seq); 176 batch.put(self.base.seq_key(), self.priv_id_seq.to_le_bytes()); 177 batch.put_cf(cf, _rk(orig), _rv(&id_value)); 178 id_value 179 })) 180 } 181 fn estimate_count(&self) -> u64 { 182 self.base.id_seq.load(Ordering::SeqCst) - 1 // -1 because seq zero is reserved 183 } 184} 185impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal, true> 186where 187 Orig: KeyFromRocks, 188 for<'v> &'v IdVal: AsRocksValue, 189 for<'k> &'k Orig: AsRocksKey, 190{ 191 fn get_or_create_id_val( 192 &mut self, 193 db: &DBWithThreadMode<MultiThreaded>, 194 batch: &mut WriteBatch, 195 orig: &Orig, 196 ) -> Result<IdVal> { 197 let cf = db.cf_handle(&self.base.name).unwrap(); 198 let id_val = self.__get_or_create_id_val(&cf, db, batch, orig)?; 199 // TODO: assert that the original is never a u64 that could collide 200 batch.put_cf(&cf, id_val.id().to_be_bytes(), _rk(orig)); // reversed rk/rv on purpose here :/ 201 Ok(id_val) 202 } 203 204 fn get_val_from_id( 205 &self, 206 db: &DBWithThreadMode<MultiThreaded>, 207 id: u64, 208 ) -> Result<Option<Orig>> { 209 let cf = db.cf_handle(&self.base.name).unwrap(); 210 if let Some(orig_bytes) = db.get_cf(&cf, id.to_be_bytes())? { 211 // HACK ish 212 Ok(Some(_kr(&orig_bytes)?)) 213 } else { 214 Ok(None) 215 } 216 } 217} 218impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal, false> 219where 220 Orig: KeyFromRocks, 221 for<'v> &'v IdVal: AsRocksValue, 222 for<'k> &'k Orig: AsRocksKey, 223{ 224 fn get_or_create_id_val( 225 &mut self, 226 db: &DBWithThreadMode<MultiThreaded>, 227 batch: &mut WriteBatch, 228 orig: &Orig, 229 ) -> Result<IdVal> { 230 let cf = db.cf_handle(&self.base.name).unwrap(); 231 self.__get_or_create_id_val(&cf, db, batch, orig) 232 } 233} 234 235impl IdTableValue for DidIdValue { 236 fn new(v: u64) -> Self { 237 DidIdValue(DidId(v), true) 238 } 239 fn id(&self) -> u64 { 240 self.0 .0 241 } 242} 243impl IdTableValue for TargetId { 244 fn new(v: u64) -> Self { 245 TargetId(v) 246 } 247 fn id(&self) -> u64 { 248 self.0 249 } 250} 251 252impl RocksStorage { 253 pub fn new(path: impl AsRef<Path>) -> Result<Self> { 254 Self::describe_metrics(); 255 RocksStorage::open_readmode(path, false) 256 } 257 258 pub fn open_readonly(path: impl AsRef<Path>) -> Result<Self> { 259 RocksStorage::open_readmode(path, true) 260 } 261 262 fn open_readmode(path: impl AsRef<Path>, readonly: bool) -> Result<Self> { 263 let did_id_table = IdTable::<_, _, true>::setup(DID_IDS_CF); 264 let target_id_table = IdTable::<_, _, false>::setup(TARGET_IDS_CF); 265 266 let cfs = vec![ 267 // id reference tables 268 did_id_table.cf_descriptor(), 269 target_id_table.cf_descriptor(), 270 // the reverse links: 271 ColumnFamilyDescriptor::new(TARGET_LINKERS_CF, { 272 let mut opts = rocks_opts_base(); 273 opts.set_merge_operator_associative( 274 "merge_op_extend_did_ids", 275 Self::merge_op_extend_did_ids, 276 ); 277 opts 278 }), 279 // unfortunately we also need forward links to handle deletes 280 ColumnFamilyDescriptor::new(LINK_TARGETS_CF, rocks_opts_base()), 281 ]; 282 283 let db = if readonly { 284 DBWithThreadMode::open_cf_descriptors_read_only(&get_db_read_opts(), path, cfs, false)? 285 } else { 286 DBWithThreadMode::open_cf_descriptors(&get_db_opts(), path, cfs)? 287 }; 288 289 let db = Arc::new(db); 290 let did_id_table = did_id_table.init(&db)?; 291 let target_id_table = target_id_table.init(&db)?; 292 Ok(Self { 293 db, 294 did_id_table, 295 target_id_table, 296 is_writer: !readonly, 297 backup_task: None.into(), 298 }) 299 } 300 301 pub fn start_backup( 302 &mut self, 303 path: PathBuf, 304 auto: Option<(u64, Option<usize>)>, 305 stay_alive: CancellationToken, 306 ) -> Result<()> { 307 let task = if let Some((interval_hrs, copies)) = auto { 308 eprintln!("backups: starting background task..."); 309 self.backup_task(path, interval_hrs, copies, stay_alive) 310 } else { 311 eprintln!("backups: starting a one-off backup..."); 312 thread::spawn({ 313 let db = self.db.clone(); 314 move || Self::do_backup(db, path) 315 }) 316 }; 317 self.backup_task = Arc::new(Some(task)); 318 Ok(()) 319 } 320 321 fn backup_task( 322 &self, 323 path: PathBuf, 324 interval_hrs: u64, 325 copies: Option<usize>, 326 stay_alive: CancellationToken, 327 ) -> std::thread::JoinHandle<Result<()>> { 328 let db = self.db.clone(); 329 thread::spawn(move || { 330 let limit = 331 Ratelimiter::builder(1, Duration::from_secs(interval_hrs * 60 * 60)).build()?; 332 let minimum_sleep = Duration::from_secs(1); 333 334 'quit: loop { 335 if let Err(sleep) = limit.try_wait() { 336 eprintln!("backups: background: next backup scheduled in {sleep:?}"); 337 let waiting = Instant::now(); 338 loop { 339 let remaining = sleep - waiting.elapsed(); 340 if stay_alive.is_cancelled() { 341 break 'quit; 342 } else if remaining <= Duration::ZERO { 343 break; 344 } else if remaining < minimum_sleep { 345 thread::sleep(remaining); 346 break; 347 } else { 348 thread::sleep(minimum_sleep); 349 } 350 } 351 } 352 eprintln!("backups: background: starting backup..."); 353 if let Err(e) = Self::do_backup(db.clone(), &path) { 354 eprintln!("backups: background: backup failed: {e:?}"); 355 // todo: metrics 356 } else { 357 eprintln!("backups: background: backup succeeded yay"); 358 } 359 if let Some(copies) = copies { 360 eprintln!("backups: background: trimming to {copies} saved backups..."); 361 if let Err(e) = Self::trim_backups(copies, &path) { 362 eprintln!("backups: background: failed to trim backups: {e:?}"); 363 } else { 364 eprintln!("backups: background: trimming worked!") 365 } 366 } 367 } 368 369 Ok(()) 370 }) 371 } 372 373 fn do_backup(db: Arc<DBWithThreadMode<MultiThreaded>>, path: impl AsRef<Path>) -> Result<()> { 374 let mut engine = 375 BackupEngine::open(&BackupEngineOptions::new(path)?, &rocksdb::Env::new()?)?; 376 eprintln!("backups: starting backup..."); 377 let t0 = Instant::now(); 378 if let Err(e) = engine.create_new_backup(&db) { 379 eprintln!("backups: oh no, backup failed: {e:?}"); 380 } else { 381 eprintln!("backups: yay, backup worked?"); 382 } 383 eprintln!( 384 "backups: backup finished after {:.2}s", 385 t0.elapsed().as_secs_f32() 386 ); 387 Ok(()) 388 } 389 390 fn trim_backups(num_backups_to_keep: usize, path: impl AsRef<Path>) -> Result<()> { 391 let mut engine = 392 BackupEngine::open(&BackupEngineOptions::new(path)?, &rocksdb::Env::new()?)?; 393 engine.purge_old_backups(num_backups_to_keep)?; 394 Ok(()) 395 } 396 397 fn describe_metrics() { 398 describe_histogram!( 399 "storage_rocksdb_read_seconds", 400 Unit::Seconds, 401 "duration of the read stage of actions" 402 ); 403 describe_histogram!( 404 "storage_rocksdb_action_seconds", 405 Unit::Seconds, 406 "duration of read + write of actions" 407 ); 408 describe_counter!( 409 "storage_rocksdb_batch_ops_total", 410 Unit::Count, 411 "total batched operations from actions" 412 ); 413 describe_histogram!( 414 "storage_rocksdb_delete_account_ops", 415 Unit::Count, 416 "total batched ops for account deletions" 417 ); 418 } 419 420 fn merge_op_extend_did_ids( 421 key: &[u8], 422 existing: Option<&[u8]>, 423 operands: &MergeOperands, 424 ) -> Option<Vec<u8>> { 425 let mut linkers: Vec<_> = if let Some(existing_bytes) = existing { 426 match _vr(existing_bytes) { 427 Ok(TargetLinkers(mut existing_linkers)) => { 428 existing_linkers.reserve(operands.len()); 429 existing_linkers 430 } 431 Err(e) => { 432 eprintln!("bug? could not deserialize existing target linkers: {e:?}. key={key:?}. continuing, but data will be lost!"); 433 if existing_bytes.len() < 1000 { 434 eprintln!("dropping: {existing_bytes:?}"); 435 } else { 436 eprintln!("(too long to print)"); 437 } 438 Vec::with_capacity(operands.len()) 439 } 440 } 441 } else { 442 Vec::with_capacity(operands.len()) 443 }; 444 for new_linkers in operands { 445 match _vr(new_linkers) { 446 Ok(TargetLinkers(new_linkers)) => linkers.extend(new_linkers), 447 Err(e) => { 448 eprintln!("bug? could not deserialize new target linkers: {e:?}. key={key:?}. continuing, but data will be lost!"); 449 if new_linkers.len() < 1000 { 450 eprintln!("skipping: {new_linkers:?}"); 451 } else { 452 eprintln!("(too long to print)"); 453 } 454 } 455 } 456 } 457 Some(_rv(&TargetLinkers(linkers))) 458 } 459 460 fn prefix_iter_cf<K, V, CF, P>( 461 &self, 462 cf: &CF, 463 pre: P, 464 ) -> impl Iterator<Item = (K, V)> + use<'_, K, V, CF, P> 465 where 466 K: KeyFromRocks, 467 V: ValueFromRocks, 468 CF: AsColumnFamilyRef, 469 for<'a> &'a P: AsRocksKeyPrefix<K>, 470 { 471 let mut read_opts = ReadOptions::default(); 472 read_opts.set_iterate_range(PrefixRange(_rkp(&pre))); // TODO verify: inclusive bounds? 473 self.db 474 .iterator_cf_opt(cf, read_opts, IteratorMode::Start) 475 .map_while(Result::ok) 476 .map_while(|(k, v)| Some((_kr(&k).ok()?, _vr(&v).ok()?))) 477 } 478 479 fn update_did_id_value<F>(&self, batch: &mut WriteBatch, did: &Did, update: F) -> Result<bool> 480 where 481 F: FnOnce(DidIdValue) -> Option<DidIdValue>, 482 { 483 let cf = self.db.cf_handle(DID_IDS_CF).unwrap(); 484 let Some(did_id_value) = self.did_id_table.get_id_val(&self.db, did)? else { 485 return Ok(false); 486 }; 487 let Some(new_did_id_value) = update(did_id_value) else { 488 return Ok(false); 489 }; 490 batch.put_cf(&cf, _rk(did), _rv(&new_did_id_value)); 491 Ok(true) 492 } 493 fn delete_did_id_value(&self, batch: &mut WriteBatch, did: &Did) { 494 let cf = self.db.cf_handle(DID_IDS_CF).unwrap(); 495 batch.delete_cf(&cf, _rk(did)); 496 } 497 498 fn get_target_linkers(&self, target_id: &TargetId) -> Result<TargetLinkers> { 499 let cf = self.db.cf_handle(TARGET_LINKERS_CF).unwrap(); 500 let Some(linkers_bytes) = self.db.get_cf(&cf, _rk(target_id))? else { 501 return Ok(TargetLinkers::default()); 502 }; 503 _vr(&linkers_bytes) 504 } 505 /// zero out every duplicate did. bit of a hack, looks the same as deleted, but eh 506 fn get_distinct_target_linkers(&self, target_id: &TargetId) -> Result<TargetLinkers> { 507 let mut seen = HashSet::new(); 508 let mut linkers = self.get_target_linkers(target_id)?; 509 for (did_id, _) in linkers.0.iter_mut() { 510 if seen.contains(did_id) { 511 did_id.0 = 0; 512 } else { 513 seen.insert(*did_id); 514 } 515 } 516 Ok(linkers) 517 } 518 fn merge_target_linker( 519 &self, 520 batch: &mut WriteBatch, 521 target_id: &TargetId, 522 linker_did_id: &DidId, 523 linker_rkey: &RKey, 524 ) { 525 let cf = self.db.cf_handle(TARGET_LINKERS_CF).unwrap(); 526 batch.merge_cf( 527 &cf, 528 _rk(target_id), 529 _rv(&TargetLinkers(vec![(*linker_did_id, linker_rkey.clone())])), 530 ); 531 } 532 fn update_target_linkers<F>( 533 &self, 534 batch: &mut WriteBatch, 535 target_id: &TargetId, 536 update: F, 537 ) -> Result<bool> 538 where 539 F: FnOnce(TargetLinkers) -> Option<TargetLinkers>, 540 { 541 let cf = self.db.cf_handle(TARGET_LINKERS_CF).unwrap(); 542 let existing_linkers = self.get_target_linkers(target_id)?; 543 let Some(new_linkers) = update(existing_linkers) else { 544 return Ok(false); 545 }; 546 batch.put_cf(&cf, _rk(target_id), _rv(&new_linkers)); 547 Ok(true) 548 } 549 550 fn put_link_targets( 551 &self, 552 batch: &mut WriteBatch, 553 record_link_key: &RecordLinkKey, 554 targets: &RecordLinkTargets, 555 ) { 556 // todo: we are almost idempotent to link creates with this blind write, but we'll still 557 // merge in the reverse index. we could read+modify+write here but it'll be SLOWWWWW on 558 // the path that we need to be fast. we could go back to a merge op and probably be 559 // consistent. or we can accept just a littttttle inconsistency and be idempotent on 560 // forward links but not reverse, slightly messing up deletes :/ 561 // _maybe_ we could run in slow idempotent r-m-w mode during firehose catch-up at the start, 562 // then switch to the fast version? 563 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap(); 564 batch.put_cf(&cf, _rk(record_link_key), _rv(targets)); 565 } 566 fn get_record_link_targets( 567 &self, 568 record_link_key: &RecordLinkKey, 569 ) -> Result<Option<RecordLinkTargets>> { 570 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap(); 571 if let Some(bytes) = self.db.get_cf(&cf, _rk(record_link_key))? { 572 Ok(Some(_vr(&bytes)?)) 573 } else { 574 Ok(None) 575 } 576 } 577 fn delete_record_link(&self, batch: &mut WriteBatch, record_link_key: &RecordLinkKey) { 578 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap(); 579 batch.delete_cf(&cf, _rk(record_link_key)); 580 } 581 fn iter_links_for_did_id( 582 &self, 583 did_id: &DidId, 584 ) -> impl Iterator<Item = (RecordLinkKey, RecordLinkTargets)> + use<'_> { 585 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap(); 586 self.prefix_iter_cf(&cf, RecordLinkKeyDidIdPrefix(*did_id)) 587 } 588 fn iter_targets_for_target( 589 &self, 590 target: &Target, 591 ) -> impl Iterator<Item = (TargetKey, TargetId)> + use<'_> { 592 let cf = self.db.cf_handle(TARGET_IDS_CF).unwrap(); 593 self.prefix_iter_cf(&cf, TargetIdTargetPrefix(target.clone())) 594 } 595 596 // 597 // higher-level event action handlers 598 // 599 600 fn add_links( 601 &mut self, 602 record_id: &RecordId, 603 links: &[CollectedLink], 604 batch: &mut WriteBatch, 605 ) -> Result<()> { 606 let DidIdValue(did_id, _) = 607 self.did_id_table 608 .get_or_create_id_val(&self.db, batch, &record_id.did)?; 609 610 let record_link_key = RecordLinkKey( 611 did_id, 612 Collection(record_id.collection()), 613 RKey(record_id.rkey()), 614 ); 615 let mut record_link_targets = RecordLinkTargets::with_capacity(links.len()); 616 617 for CollectedLink { target, path } in links { 618 let target_key = TargetKey( 619 Target(target.clone().into_string()), 620 Collection(record_id.collection()), 621 RPath(path.clone()), 622 ); 623 let target_id = 624 self.target_id_table 625 .get_or_create_id_val(&self.db, batch, &target_key)?; 626 self.merge_target_linker(batch, &target_id, &did_id, &RKey(record_id.rkey())); 627 628 record_link_targets.add(RecordLinkTarget(RPath(path.clone()), target_id)) 629 } 630 631 self.put_link_targets(batch, &record_link_key, &record_link_targets); 632 Ok(()) 633 } 634 635 fn remove_links(&mut self, record_id: &RecordId, batch: &mut WriteBatch) -> Result<()> { 636 let Some(DidIdValue(linking_did_id, _)) = 637 self.did_id_table.get_id_val(&self.db, &record_id.did)? 638 else { 639 return Ok(()); // we don't know her: nothing to do 640 }; 641 642 let record_link_key = RecordLinkKey( 643 linking_did_id, 644 Collection(record_id.collection()), 645 RKey(record_id.rkey()), 646 ); 647 let Some(record_link_targets) = self.get_record_link_targets(&record_link_key)? else { 648 return Ok(()); // we don't have these links 649 }; 650 651 // we do read -> modify -> write here: could merge-op in the deletes instead? 652 // otherwise it's another single-thread-constraining thing. 653 for RecordLinkTarget(_, target_id) in record_link_targets.0 { 654 self.update_target_linkers(batch, &target_id, |mut linkers| { 655 if linkers.0.is_empty() { 656 eprintln!("bug? linked target was missing when removing links"); 657 } 658 if !linkers.remove_linker(&linking_did_id, &RKey(record_id.rkey.clone())) { 659 eprintln!("bug? linked target was missing a link when removing links"); 660 } 661 Some(linkers) 662 })?; 663 } 664 665 self.delete_record_link(batch, &record_link_key); 666 Ok(()) 667 } 668 669 fn set_account(&mut self, did: &Did, active: bool, batch: &mut WriteBatch) -> Result<()> { 670 // this needs to be read-modify-write since the did_id needs to stay the same, 671 // which has a benefit of allowing to avoid adding entries for dids we don't 672 // need. reading on dids needs to be cheap anyway for the current design, and 673 // did active/inactive updates are low-freq in the firehose so, eh, it's fine. 674 self.update_did_id_value(batch, did, |current_value| { 675 Some(DidIdValue(current_value.did_id(), active)) 676 })?; 677 Ok(()) 678 } 679 680 fn delete_account(&mut self, did: &Did, batch: &mut WriteBatch) -> Result<usize> { 681 let mut total_batched_ops = 0; 682 let Some(DidIdValue(did_id, _)) = self.did_id_table.get_id_val(&self.db, did)? else { 683 return Ok(total_batched_ops); // ignore updates for dids we don't know about 684 }; 685 self.delete_did_id_value(batch, did); 686 // TODO: also delete the reverse!! 687 688 // use a separate batch for all their links, since it can be a lot and make us crash at around 1GiB batch size. 689 // this should still hopefully be crash-safe: as long as we don't actually delete the DidId entry until after all links are cleared. 690 // the above .delete_did_id_value is batched, so it shouldn't be written until we've returned from this fn successfully 691 // TODO: queue a background delete task or whatever 692 // TODO: test delete account with more links than chunk size 693 let stuff: Vec<_> = self.iter_links_for_did_id(&did_id).collect(); 694 for chunk in stuff.chunks(1024) { 695 let mut mini_batch = WriteBatch::default(); 696 697 for (record_link_key, links) in chunk { 698 self.delete_record_link(&mut mini_batch, record_link_key); // _could_ use delete range here instead of individual deletes, but since we have to scan anyway it's not obvious if it's better 699 700 for RecordLinkTarget(_, target_link_id) in links.0.iter() { 701 self.update_target_linkers(&mut mini_batch, target_link_id, |mut linkers| { 702 if !linkers.remove_linker(&did_id, &record_link_key.2) { 703 eprintln!("bug? could not find linker when removing links while deleting an account"); 704 } 705 Some(linkers) 706 })?; 707 } 708 } 709 total_batched_ops += mini_batch.len(); 710 self.db.write(mini_batch)?; // todo 711 } 712 Ok(total_batched_ops) 713 } 714} 715 716impl Drop for RocksStorage { 717 fn drop(&mut self) { 718 if self.is_writer { 719 println!("rocksdb writer: cleaning up for shutdown..."); 720 if let Err(e) = self.db.flush_wal(true) { 721 eprintln!("rocks: flushing wal failed: {e:?}"); 722 } 723 if let Err(e) = self.db.flush_opt(&{ 724 let mut opt = rocksdb::FlushOptions::default(); 725 opt.set_wait(true); 726 opt 727 }) { 728 eprintln!("rocks: flushing memtables failed: {e:?}"); 729 } 730 match Arc::get_mut(&mut self.backup_task) { 731 Some(maybe_task) => { 732 if let Some(task) = maybe_task.take() { 733 eprintln!("waiting for backup task to complete..."); 734 if let Err(e) = task.join() { 735 eprintln!("failed to join backup task: {e:?}"); 736 } 737 } 738 } 739 None => eprintln!("rocks: failed to get backup task, likely a bug."), 740 } 741 self.db.cancel_all_background_work(true); 742 } 743 } 744} 745 746impl AsRocksValue for u64 {} 747impl ValueFromRocks for u64 {} 748 749impl LinkStorage for RocksStorage { 750 fn get_cursor(&mut self) -> Result<Option<u64>> { 751 self.db 752 .get(JETSTREAM_CURSOR_KEY)? 753 .map(|b| _vr(&b)) 754 .transpose() 755 } 756 757 fn push(&mut self, event: &ActionableEvent, cursor: u64) -> Result<()> { 758 // normal ops 759 let mut batch = WriteBatch::default(); 760 let t0 = Instant::now(); 761 if let Some(action) = match event { 762 ActionableEvent::CreateLinks { record_id, links } => { 763 self.add_links(record_id, links, &mut batch)?; 764 Some("create_links") 765 } 766 ActionableEvent::UpdateLinks { 767 record_id, 768 new_links, 769 } => { 770 self.remove_links(record_id, &mut batch)?; 771 self.add_links(record_id, new_links, &mut batch)?; 772 Some("update_links") 773 } 774 ActionableEvent::DeleteRecord(record_id) => { 775 self.remove_links(record_id, &mut batch)?; 776 Some("delete_record") 777 } 778 ActionableEvent::ActivateAccount(did) => { 779 self.set_account(did, true, &mut batch)?; 780 Some("set_account_status") 781 } 782 ActionableEvent::DeactivateAccount(did) => { 783 self.set_account(did, false, &mut batch)?; 784 Some("set_account_status") 785 } 786 ActionableEvent::DeleteAccount(_) => None, // delete account is handled specially 787 } { 788 let t_read = t0.elapsed(); 789 batch.put(JETSTREAM_CURSOR_KEY.as_bytes(), _rv(cursor)); 790 let batch_ops = batch.len(); 791 self.db.write(batch)?; 792 let t_total = t0.elapsed(); 793 794 histogram!("storage_rocksdb_read_seconds", "action" => action) 795 .record(t_read.as_secs_f64()); 796 histogram!("storage_rocksdb_action_seconds", "action" => action) 797 .record(t_total.as_secs_f64()); 798 counter!("storage_rocksdb_batch_ops_total", "action" => action) 799 .increment(batch_ops as u64); 800 } 801 802 // special metrics for account deletion which can be arbitrarily expensive 803 let mut outer_batch = WriteBatch::default(); 804 let t0 = Instant::now(); 805 if let ActionableEvent::DeleteAccount(did) = event { 806 let inner_batch_ops = self.delete_account(did, &mut outer_batch)?; 807 let total_batch_ops = inner_batch_ops + outer_batch.len(); 808 self.db.write(outer_batch)?; 809 let t_total = t0.elapsed(); 810 811 histogram!("storage_rocksdb_action_seconds", "action" => "delete_account") 812 .record(t_total.as_secs_f64()); 813 counter!("storage_rocksdb_batch_ops_total", "action" => "delete_account") 814 .increment(total_batch_ops as u64); 815 histogram!("storage_rocksdb_delete_account_ops").record(total_batch_ops as f64); 816 } 817 818 Ok(()) 819 } 820 821 fn to_readable(&mut self) -> impl LinkReader { 822 let mut readable = self.clone(); 823 readable.is_writer = false; 824 readable 825 } 826} 827 828impl LinkReader for RocksStorage { 829 fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> { 830 let target_key = TargetKey( 831 Target(target.to_string()), 832 Collection(collection.to_string()), 833 RPath(path.to_string()), 834 ); 835 if let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? { 836 let (alive, _) = self.get_target_linkers(&target_id)?.count(); 837 Ok(alive) 838 } else { 839 Ok(0) 840 } 841 } 842 843 fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> { 844 let target_key = TargetKey( 845 Target(target.to_string()), 846 Collection(collection.to_string()), 847 RPath(path.to_string()), 848 ); 849 if let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? { 850 Ok(self.get_target_linkers(&target_id)?.count_distinct_dids()) 851 } else { 852 Ok(0) 853 } 854 } 855 856 fn get_links( 857 &self, 858 target: &str, 859 collection: &str, 860 path: &str, 861 limit: u64, 862 until: Option<u64>, 863 ) -> Result<PagedAppendingCollection<RecordId>> { 864 let target_key = TargetKey( 865 Target(target.to_string()), 866 Collection(collection.to_string()), 867 RPath(path.to_string()), 868 ); 869 870 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 871 return Ok(PagedAppendingCollection { 872 version: (0, 0), 873 items: Vec::new(), 874 next: None, 875 total: 0, 876 }); 877 }; 878 879 let linkers = self.get_target_linkers(&target_id)?; 880 881 let (alive, gone) = linkers.count(); 882 let total = alive + gone; 883 let end = until.map(|u| std::cmp::min(u, total)).unwrap_or(total) as usize; 884 let begin = end.saturating_sub(limit as usize); 885 let next = if begin == 0 { None } else { Some(begin as u64) }; 886 887 let did_id_rkeys = linkers.0[begin..end].iter().rev().collect::<Vec<_>>(); 888 889 let mut items = Vec::with_capacity(did_id_rkeys.len()); 890 // TODO: use get-many (or multi-get or whatever it's called) 891 for (did_id, rkey) in did_id_rkeys { 892 if did_id.is_empty() { 893 continue; 894 } 895 if let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? { 896 let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? 897 else { 898 eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?"); 899 continue; 900 }; 901 if !active { 902 continue; 903 } 904 items.push(RecordId { 905 did, 906 collection: collection.to_string(), 907 rkey: rkey.0.clone(), 908 }); 909 } else { 910 eprintln!("failed to look up did from did_id {did_id:?}"); 911 } 912 } 913 914 Ok(PagedAppendingCollection { 915 version: (total, gone), 916 items, 917 next, 918 total: alive, 919 }) 920 } 921 922 fn get_distinct_dids( 923 &self, 924 target: &str, 925 collection: &str, 926 path: &str, 927 limit: u64, 928 until: Option<u64>, 929 ) -> Result<PagedAppendingCollection<Did>> { 930 let target_key = TargetKey( 931 Target(target.to_string()), 932 Collection(collection.to_string()), 933 RPath(path.to_string()), 934 ); 935 936 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 937 return Ok(PagedAppendingCollection { 938 version: (0, 0), 939 items: Vec::new(), 940 next: None, 941 total: 0, 942 }); 943 }; 944 945 let linkers = self.get_distinct_target_linkers(&target_id)?; 946 947 let (alive, gone) = linkers.count(); 948 let total = alive + gone; 949 let end = until.map(|u| std::cmp::min(u, total)).unwrap_or(total) as usize; 950 let begin = end.saturating_sub(limit as usize); 951 let next = if begin == 0 { None } else { Some(begin as u64) }; 952 953 let did_id_rkeys = linkers.0[begin..end].iter().rev().collect::<Vec<_>>(); 954 955 let mut items = Vec::with_capacity(did_id_rkeys.len()); 956 // TODO: use get-many (or multi-get or whatever it's called) 957 for (did_id, _) in did_id_rkeys { 958 if did_id.is_empty() { 959 continue; 960 } 961 if let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? { 962 let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? 963 else { 964 eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?"); 965 continue; 966 }; 967 if !active { 968 continue; 969 } 970 items.push(did); 971 } else { 972 eprintln!("failed to look up did from did_id {did_id:?}"); 973 } 974 } 975 976 Ok(PagedAppendingCollection { 977 version: (total, gone), 978 items, 979 next, 980 total: alive, 981 }) 982 } 983 984 fn get_all_record_counts(&self, target: &str) -> Result<HashMap<String, HashMap<String, u64>>> { 985 let mut out: HashMap<String, HashMap<String, u64>> = HashMap::new(); 986 for (target_key, target_id) in self.iter_targets_for_target(&Target(target.into())) { 987 let TargetKey(_, Collection(ref collection), RPath(ref path)) = target_key; 988 let (count, _) = self.get_target_linkers(&target_id)?.count(); 989 out.entry(collection.into()) 990 .or_default() 991 .insert(path.clone(), count); 992 } 993 Ok(out) 994 } 995 996 fn get_all_counts( 997 &self, 998 target: &str, 999 ) -> Result<HashMap<String, HashMap<String, CountsByCount>>> { 1000 let mut out: HashMap<String, HashMap<String, CountsByCount>> = HashMap::new(); 1001 for (target_key, target_id) in self.iter_targets_for_target(&Target(target.into())) { 1002 let TargetKey(_, Collection(ref collection), RPath(ref path)) = target_key; 1003 let target_linkers = self.get_target_linkers(&target_id)?; 1004 let (records, _) = target_linkers.count(); 1005 let distinct_dids = target_linkers.count_distinct_dids(); 1006 out.entry(collection.into()).or_default().insert( 1007 path.clone(), 1008 CountsByCount { 1009 records, 1010 distinct_dids, 1011 }, 1012 ); 1013 } 1014 Ok(out) 1015 } 1016 1017 fn get_stats(&self) -> Result<StorageStats> { 1018 let dids = self.did_id_table.estimate_count(); 1019 let targetables = self.target_id_table.estimate_count(); 1020 let lr_cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap(); 1021 let linking_records = self 1022 .db 1023 .property_value_cf(&lr_cf, rocksdb::properties::ESTIMATE_NUM_KEYS)? 1024 .map(|s| s.parse::<u64>()) 1025 .transpose()? 1026 .unwrap_or(0); 1027 Ok(StorageStats { 1028 dids, 1029 targetables, 1030 linking_records, 1031 }) 1032 } 1033} 1034 1035trait AsRocksKey: Serialize {} 1036trait AsRocksKeyPrefix<K: KeyFromRocks>: Serialize {} 1037trait AsRocksValue: Serialize {} 1038trait KeyFromRocks: for<'de> Deserialize<'de> {} 1039trait ValueFromRocks: for<'de> Deserialize<'de> {} 1040 1041// did_id table 1042impl AsRocksKey for &Did {} 1043impl AsRocksValue for &DidIdValue {} 1044impl ValueFromRocks for DidIdValue {} 1045 1046// temp 1047impl KeyFromRocks for Did {} 1048impl AsRocksKey for &DidId {} 1049 1050// target_ids table 1051impl AsRocksKey for &TargetKey {} 1052impl AsRocksKeyPrefix<TargetKey> for &TargetIdTargetPrefix {} 1053impl AsRocksValue for &TargetId {} 1054impl KeyFromRocks for TargetKey {} 1055impl ValueFromRocks for TargetId {} 1056 1057// target_links table 1058impl AsRocksKey for &TargetId {} 1059impl AsRocksValue for &TargetLinkers {} 1060impl ValueFromRocks for TargetLinkers {} 1061 1062// record_link_targets table 1063impl AsRocksKey for &RecordLinkKey {} 1064impl AsRocksKeyPrefix<RecordLinkKey> for &RecordLinkKeyDidIdPrefix {} 1065impl AsRocksValue for &RecordLinkTargets {} 1066impl KeyFromRocks for RecordLinkKey {} 1067impl ValueFromRocks for RecordLinkTargets {} 1068 1069pub fn _bincode_opts() -> impl BincodeOptions { 1070 bincode::DefaultOptions::new().with_big_endian() // happier db -- numeric prefixes in lsm 1071} 1072fn _rk(k: impl AsRocksKey) -> Vec<u8> { 1073 _bincode_opts().serialize(&k).unwrap() 1074} 1075fn _rkp<K: KeyFromRocks>(kp: impl AsRocksKeyPrefix<K>) -> Vec<u8> { 1076 _bincode_opts().serialize(&kp).unwrap() 1077} 1078fn _rv(v: impl AsRocksValue) -> Vec<u8> { 1079 _bincode_opts().serialize(&v).unwrap() 1080} 1081fn _kr<T: KeyFromRocks>(bytes: &[u8]) -> Result<T> { 1082 Ok(_bincode_opts().deserialize(bytes)?) 1083} 1084fn _vr<T: ValueFromRocks>(bytes: &[u8]) -> Result<T> { 1085 Ok(_bincode_opts().deserialize(bytes)?) 1086} 1087 1088#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] 1089pub struct Collection(pub String); 1090 1091#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] 1092pub struct RPath(pub String); 1093 1094#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] 1095pub struct RKey(pub String); 1096 1097impl RKey { 1098 fn empty() -> Self { 1099 RKey("".to_string()) 1100 } 1101} 1102 1103// did ids 1104#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] 1105pub struct DidId(pub u64); 1106 1107impl DidId { 1108 fn empty() -> Self { 1109 DidId(0) 1110 } 1111 fn is_empty(&self) -> bool { 1112 self.0 == 0 1113 } 1114} 1115 1116#[derive(Debug, Clone, Serialize, Deserialize)] 1117struct DidIdValue(DidId, bool); // active or not 1118 1119impl DidIdValue { 1120 fn did_id(&self) -> DidId { 1121 let Self(id, _) = self; 1122 *id 1123 } 1124} 1125 1126// target ids 1127#[derive(Debug, Clone, Serialize, Deserialize)] 1128struct TargetId(u64); // key 1129 1130#[derive(Debug, Clone, Serialize, Deserialize)] 1131pub struct Target(pub String); // the actual target/uri 1132 1133// targets (uris, dids, etc.): the reverse index 1134#[derive(Debug, Clone, Serialize, Deserialize)] 1135pub struct TargetKey(pub Target, pub Collection, pub RPath); 1136 1137#[derive(Debug, Default, Serialize, Deserialize)] 1138pub struct TargetLinkers(pub Vec<(DidId, RKey)>); 1139 1140impl TargetLinkers { 1141 fn remove_linker(&mut self, did: &DidId, rkey: &RKey) -> bool { 1142 if let Some(entry) = self.0.iter_mut().rfind(|d| **d == (*did, rkey.clone())) { 1143 *entry = (DidId::empty(), RKey::empty()); 1144 true 1145 } else { 1146 false 1147 } 1148 } 1149 pub fn count(&self) -> (u64, u64) { 1150 // (linkers, deleted links) 1151 let total = self.0.len() as u64; 1152 let alive = self.0.iter().filter(|(DidId(id), _)| *id != 0).count() as u64; 1153 let gone = total - alive; 1154 (alive, gone) 1155 } 1156 fn count_distinct_dids(&self) -> u64 { 1157 self.0 1158 .iter() 1159 .filter_map(|(DidId(id), _)| if *id == 0 { None } else { Some(id) }) 1160 .collect::<HashSet<_>>() 1161 .len() as u64 1162 } 1163} 1164 1165// forward links to targets so we can delete links 1166#[derive(Debug, Serialize, Deserialize)] 1167struct RecordLinkKey(DidId, Collection, RKey); 1168 1169// does this even work???? 1170#[derive(Debug, Serialize, Deserialize)] 1171struct RecordLinkKeyDidIdPrefix(DidId); 1172 1173#[derive(Debug, Serialize, Deserialize)] 1174struct TargetIdTargetPrefix(Target); 1175 1176#[derive(Debug, Serialize, Deserialize)] 1177struct RecordLinkTarget(RPath, TargetId); 1178 1179#[derive(Debug, Default, Serialize, Deserialize)] 1180struct RecordLinkTargets(Vec<RecordLinkTarget>); 1181 1182impl RecordLinkTargets { 1183 fn with_capacity(cap: usize) -> Self { 1184 Self(Vec::with_capacity(cap)) 1185 } 1186 fn add(&mut self, target: RecordLinkTarget) { 1187 self.0.push(target) 1188 } 1189} 1190 1191#[cfg(test)] 1192mod tests { 1193 use super::super::ActionableEvent; 1194 use super::*; 1195 use links::Link; 1196 use tempfile::tempdir; 1197 1198 #[test] 1199 fn rocks_delete_iterator_regression() -> Result<()> { 1200 let mut store = RocksStorage::new(tempdir()?)?; 1201 1202 // create a link from the deleter account 1203 store.push( 1204 &ActionableEvent::CreateLinks { 1205 record_id: RecordId { 1206 did: "did:plc:will-shortly-delete".into(), 1207 collection: "a.b.c".into(), 1208 rkey: "asdf".into(), 1209 }, 1210 links: vec![CollectedLink { 1211 target: Link::Uri("example.com".into()), 1212 path: ".uri".into(), 1213 }], 1214 }, 1215 0, 1216 )?; 1217 1218 // and a different link from a separate, new account (later in didid prefix iteration) 1219 store.push( 1220 &ActionableEvent::CreateLinks { 1221 record_id: RecordId { 1222 did: "did:plc:someone-else".into(), 1223 collection: "a.b.c".into(), 1224 rkey: "asdf".into(), 1225 }, 1226 links: vec![CollectedLink { 1227 target: Link::Uri("another.example.com".into()), 1228 path: ".uri".into(), 1229 }], 1230 }, 1231 0, 1232 )?; 1233 1234 // now delete the first account (this is where the buggy version explodes) 1235 store.push( 1236 &ActionableEvent::DeleteAccount("did:plc:will-shortly-delete".into()), 1237 0, 1238 )?; 1239 1240 Ok(()) 1241 } 1242 1243 #[test] 1244 fn rocks_prefix_iteration_helper() -> Result<()> { 1245 #[derive(Serialize, Deserialize)] 1246 struct Key(u8, u8); 1247 1248 #[derive(Serialize)] 1249 struct KeyPrefix(u8); 1250 1251 #[derive(Serialize, Deserialize)] 1252 struct Value(()); 1253 1254 impl AsRocksKey for &Key {} 1255 impl AsRocksKeyPrefix<Key> for &KeyPrefix {} 1256 impl AsRocksValue for &Value {} 1257 1258 impl KeyFromRocks for Key {} 1259 impl ValueFromRocks for Value {} 1260 1261 let data = RocksStorage::new(tempdir()?)?; 1262 let cf = data.db.cf_handle(DID_IDS_CF).unwrap(); 1263 let mut batch = WriteBatch::default(); 1264 1265 // not our prefix 1266 batch.put_cf(&cf, _rk(&Key(0x01, 0x00)), _rv(&Value(()))); 1267 batch.put_cf(&cf, _rk(&Key(0x01, 0xFF)), _rv(&Value(()))); 1268 1269 // our prefix! 1270 for i in 0..=0xFF { 1271 batch.put_cf(&cf, _rk(&Key(0x02, i)), _rv(&Value(()))); 1272 } 1273 1274 // not our prefix 1275 batch.put_cf(&cf, _rk(&Key(0x03, 0x00)), _rv(&Value(()))); 1276 batch.put_cf(&cf, _rk(&Key(0x03, 0xFF)), _rv(&Value(()))); 1277 1278 data.db.write(batch)?; 1279 1280 let mut okays: [bool; 256] = [false; 256]; 1281 for (i, (k, Value(_))) in data.prefix_iter_cf(&cf, KeyPrefix(0x02)).enumerate() { 1282 assert!(i < 256); 1283 assert_eq!(k.0, 0x02, "prefix iterated key has the right prefix"); 1284 assert_eq!(k.1 as usize, i, "prefixed keys are iterated in exact order"); 1285 okays[k.1 as usize] = true; 1286 } 1287 assert!(okays.iter().all(|b| *b), "every key was iterated"); 1288 1289 Ok(()) 1290 } 1291 1292 // TODO: add tests for key prefixes actually prefixing (bincode encoding _should_...) 1293}