Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
at main 58 kB view raw
1use super::{ 2 ActionableEvent, LinkReader, LinkStorage, PagedAppendingCollection, PagedOrderedCollection, 3 StorageStats, 4}; 5use crate::{CountsByCount, Did, RecordId}; 6use anyhow::{bail, Result}; 7use bincode::Options as BincodeOptions; 8use links::CollectedLink; 9use metrics::{counter, describe_counter, describe_histogram, histogram, Unit}; 10use ratelimit::Ratelimiter; 11use rocksdb::backup::{BackupEngine, BackupEngineOptions}; 12use rocksdb::{ 13 AsColumnFamilyRef, ColumnFamilyDescriptor, DBWithThreadMode, IteratorMode, MergeOperands, 14 MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch, 15}; 16use serde::{Deserialize, Serialize}; 17use std::collections::{BTreeMap, HashMap, HashSet}; 18use std::io::Read; 19use std::marker::PhantomData; 20use std::path::{Path, PathBuf}; 21use std::sync::{ 22 atomic::{AtomicU64, Ordering}, 23 Arc, 24}; 25use std::thread; 26use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; 27use tokio_util::sync::CancellationToken; 28 29static DID_IDS_CF: &str = "did_ids"; 30static TARGET_IDS_CF: &str = "target_ids"; 31static TARGET_LINKERS_CF: &str = "target_links"; 32static LINK_TARGETS_CF: &str = "link_targets"; 33 34static JETSTREAM_CURSOR_KEY: &str = "jetstream_cursor"; 35static STARTED_AT_KEY: &str = "jetstream_first_cursor"; 36// add reverse mappings for targets if this db was running before that was a thing 37static TARGET_ID_REPAIR_STATE_KEY: &str = "target_id_table_repair_state"; 38 39static COZY_FIRST_CURSOR: u64 = 1_738_083_600_000_000; // constellation.microcosm.blue started 40 41#[derive(Debug, Clone, Serialize, Deserialize)] 42struct TargetIdRepairState { 43 /// start time for repair, microseconds timestamp 44 current_us_started_at: u64, 45 /// id table's latest id when repair started 46 id_when_started: u64, 47 /// id table id 48 latest_repaired_i: u64, 49} 50impl AsRocksValue for TargetIdRepairState {} 51impl ValueFromRocks for TargetIdRepairState {} 52 53// todo: actually understand and set these options probably better 54fn rocks_opts_base() -> Options { 55 let mut opts = Options::default(); 56 opts.set_level_compaction_dynamic_level_bytes(true); 57 opts.create_if_missing(true); 58 opts.set_compression_type(rocksdb::DBCompressionType::Lz4); 59 opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd); // this probably doesn't work because it hasn't been enabled 60 // TODO: actually enable the bottommost compression. but after other changes run for a bit in case zstd is cpu- or mem-expensive. 61 opts 62} 63fn get_db_opts() -> Options { 64 let mut opts = rocks_opts_base(); 65 opts.create_missing_column_families(true); 66 opts.increase_parallelism(4); // todo: make configurable if anyone else actually runs a different instance. start at # of cores 67 // consider doing optimize_level_style_compaction or optimize_universal_style_compaction 68 opts 69} 70fn get_db_read_opts() -> Options { 71 let mut opts = Options::default(); 72 opts.optimize_for_point_lookup(16_384); // mb (run this on big machines) 73 opts 74} 75 76#[derive(Debug, Clone)] 77pub struct RocksStorage { 78 pub db: Arc<DBWithThreadMode<MultiThreaded>>, // TODO: mov seqs here (concat merge op will be fun) 79 did_id_table: IdTable<Did, DidIdValue>, 80 target_id_table: IdTable<TargetKey, TargetId>, 81 is_writer: bool, 82 backup_task: Arc<Option<thread::JoinHandle<Result<()>>>>, 83} 84 85trait IdTableValue: ValueFromRocks + Clone { 86 fn new(v: u64) -> Self; 87 fn id(&self) -> u64; 88} 89#[derive(Debug, Clone)] 90struct IdTableBase<Orig, IdVal: IdTableValue> 91where 92 Orig: KeyFromRocks, 93 for<'a> &'a Orig: AsRocksKey, 94{ 95 _key_marker: PhantomData<Orig>, 96 _val_marker: PhantomData<IdVal>, 97 name: String, 98 id_seq: Arc<AtomicU64>, 99} 100impl<Orig, IdVal: IdTableValue> IdTableBase<Orig, IdVal> 101where 102 Orig: KeyFromRocks, 103 for<'a> &'a Orig: AsRocksKey, 104{ 105 fn cf_descriptor(&self) -> ColumnFamilyDescriptor { 106 ColumnFamilyDescriptor::new(&self.name, rocks_opts_base()) 107 } 108 fn init(self, db: &DBWithThreadMode<MultiThreaded>) -> Result<IdTable<Orig, IdVal>> { 109 if db.cf_handle(&self.name).is_none() { 110 bail!("failed to get cf handle from db -- was the db open with our .cf_descriptor()?"); 111 } 112 let priv_id_seq = if let Some(seq_bytes) = db.get(self.seq_key())? { 113 if seq_bytes.len() != 8 { 114 bail!( 115 "reading bytes for u64 id seq {:?}: found the wrong number of bytes", 116 self.seq_key() 117 ); 118 } 119 let mut buf: [u8; 8] = [0; 8]; 120 seq_bytes.as_slice().read_exact(&mut buf)?; 121 let last_seq = u64::from_le_bytes(buf); 122 last_seq + 1 123 } else { 124 1 125 }; 126 self.id_seq.store(priv_id_seq, Ordering::SeqCst); 127 Ok(IdTable { 128 base: self, 129 priv_id_seq, 130 }) 131 } 132 fn seq_key(&self) -> Vec<u8> { 133 let mut k = b"__id_seq_key_plz_be_unique:".to_vec(); 134 k.extend(self.name.as_bytes()); 135 k 136 } 137} 138#[derive(Debug, Clone)] 139struct IdTable<Orig, IdVal: IdTableValue> 140where 141 Orig: KeyFromRocks, 142 for<'a> &'a Orig: AsRocksKey, 143{ 144 base: IdTableBase<Orig, IdVal>, 145 priv_id_seq: u64, 146} 147impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal> 148where 149 Orig: KeyFromRocks, 150 for<'v> &'v IdVal: AsRocksValue, 151 for<'k> &'k Orig: AsRocksKey, 152{ 153 #[must_use] 154 fn setup(name: &str) -> IdTableBase<Orig, IdVal> { 155 IdTableBase::<Orig, IdVal> { 156 _key_marker: PhantomData, 157 _val_marker: PhantomData, 158 name: name.into(), 159 id_seq: Arc::new(AtomicU64::new(0)), // zero is "uninit", first seq num will be 1 160 } 161 } 162 fn get_id_val( 163 &self, 164 db: &DBWithThreadMode<MultiThreaded>, 165 orig: &Orig, 166 ) -> Result<Option<IdVal>> { 167 let cf = db.cf_handle(&self.base.name).unwrap(); 168 if let Some(_id_bytes) = db.get_cf(&cf, _rk(orig))? { 169 Ok(Some(_vr(&_id_bytes)?)) 170 } else { 171 Ok(None) 172 } 173 } 174 fn __get_or_create_id_val<CF>( 175 &mut self, 176 cf: &CF, 177 db: &DBWithThreadMode<MultiThreaded>, 178 batch: &mut WriteBatch, 179 orig: &Orig, 180 ) -> Result<IdVal> 181 where 182 CF: AsColumnFamilyRef, 183 { 184 Ok(self.get_id_val(db, orig)?.unwrap_or_else(|| { 185 let prev_priv_seq = self.priv_id_seq; 186 self.priv_id_seq += 1; 187 let prev_public_seq = self.base.id_seq.swap(self.priv_id_seq, Ordering::SeqCst); 188 assert_eq!( 189 prev_public_seq, prev_priv_seq, 190 "public seq may have been modified??" 191 ); 192 let id_value = IdVal::new(self.priv_id_seq); 193 batch.put(self.base.seq_key(), self.priv_id_seq.to_le_bytes()); 194 batch.put_cf(cf, _rk(orig), _rv(&id_value)); 195 id_value 196 })) 197 } 198 199 fn estimate_count(&self) -> u64 { 200 self.base.id_seq.load(Ordering::SeqCst) - 1 // -1 because seq zero is reserved 201 } 202 203 fn get_or_create_id_val( 204 &mut self, 205 db: &DBWithThreadMode<MultiThreaded>, 206 batch: &mut WriteBatch, 207 orig: &Orig, 208 ) -> Result<IdVal> { 209 let cf = db.cf_handle(&self.base.name).unwrap(); 210 let id_val = self.__get_or_create_id_val(&cf, db, batch, orig)?; 211 // TODO: assert that the original is never a u64 that could collide 212 batch.put_cf(&cf, id_val.id().to_be_bytes(), _rk(orig)); // reversed rk/rv on purpose here :/ 213 Ok(id_val) 214 } 215 216 fn get_val_from_id( 217 &self, 218 db: &DBWithThreadMode<MultiThreaded>, 219 id: u64, 220 ) -> Result<Option<Orig>> { 221 let cf = db.cf_handle(&self.base.name).unwrap(); 222 if let Some(orig_bytes) = db.get_cf(&cf, id.to_be_bytes())? { 223 // HACK ish 224 Ok(Some(_kr(&orig_bytes)?)) 225 } else { 226 Ok(None) 227 } 228 } 229} 230 231impl IdTableValue for DidIdValue { 232 fn new(v: u64) -> Self { 233 DidIdValue(DidId(v), true) 234 } 235 fn id(&self) -> u64 { 236 self.0 .0 237 } 238} 239impl IdTableValue for TargetId { 240 fn new(v: u64) -> Self { 241 TargetId(v) 242 } 243 fn id(&self) -> u64 { 244 self.0 245 } 246} 247 248fn now() -> u64 { 249 SystemTime::now() 250 .duration_since(UNIX_EPOCH) 251 .unwrap() 252 .as_micros() as u64 253} 254 255impl RocksStorage { 256 pub fn new(path: impl AsRef<Path>) -> Result<Self> { 257 Self::describe_metrics(); 258 let me = RocksStorage::open_readmode(path, false)?; 259 me.global_init()?; 260 Ok(me) 261 } 262 263 pub fn open_readonly(path: impl AsRef<Path>) -> Result<Self> { 264 RocksStorage::open_readmode(path, true) 265 } 266 267 fn open_readmode(path: impl AsRef<Path>, readonly: bool) -> Result<Self> { 268 let did_id_table = IdTable::setup(DID_IDS_CF); 269 let target_id_table = IdTable::setup(TARGET_IDS_CF); 270 271 // note: global stuff like jetstream cursor goes in the default cf 272 // these are bonus extra cfs 273 let cfs = vec![ 274 // id reference tables 275 did_id_table.cf_descriptor(), 276 target_id_table.cf_descriptor(), 277 // the reverse links: 278 ColumnFamilyDescriptor::new(TARGET_LINKERS_CF, { 279 let mut opts = rocks_opts_base(); 280 opts.set_merge_operator_associative( 281 "merge_op_extend_did_ids", 282 Self::merge_op_extend_did_ids, 283 ); 284 opts 285 }), 286 // unfortunately we also need forward links to handle deletes 287 ColumnFamilyDescriptor::new(LINK_TARGETS_CF, rocks_opts_base()), 288 ]; 289 290 let db = if readonly { 291 DBWithThreadMode::open_cf_descriptors_read_only(&get_db_read_opts(), path, cfs, false)? 292 } else { 293 DBWithThreadMode::open_cf_descriptors(&get_db_opts(), path, cfs)? 294 }; 295 296 let db = Arc::new(db); 297 let did_id_table = did_id_table.init(&db)?; 298 let target_id_table = target_id_table.init(&db)?; 299 Ok(Self { 300 db, 301 did_id_table, 302 target_id_table, 303 is_writer: !readonly, 304 backup_task: None.into(), 305 }) 306 } 307 308 fn global_init(&self) -> Result<()> { 309 let first_run = self.db.get(JETSTREAM_CURSOR_KEY)?.is_some(); 310 if first_run { 311 self.db.put(STARTED_AT_KEY, _rv(now()))?; 312 313 // hack / temporary: if we're a new db, put in a completed repair 314 // state so we don't run repairs (repairs are for old-code dbs) 315 let completed = TargetIdRepairState { 316 id_when_started: 0, 317 current_us_started_at: 0, 318 latest_repaired_i: 0, 319 }; 320 self.db.put(TARGET_ID_REPAIR_STATE_KEY, _rv(completed))?; 321 } 322 Ok(()) 323 } 324 325 pub fn run_repair(&self, breather: Duration, stay_alive: CancellationToken) -> Result<bool> { 326 let mut state = match self 327 .db 328 .get(TARGET_ID_REPAIR_STATE_KEY)? 329 .map(|s| _vr(&s)) 330 .transpose()? 331 { 332 Some(s) => s, 333 None => TargetIdRepairState { 334 id_when_started: self.did_id_table.priv_id_seq, 335 current_us_started_at: now(), 336 latest_repaired_i: 0, 337 }, 338 }; 339 340 eprintln!("initial repair state: {state:?}"); 341 342 let cf = self.db.cf_handle(TARGET_IDS_CF).unwrap(); 343 344 let mut iter = self.db.raw_iterator_cf(&cf); 345 iter.seek_to_first(); 346 347 eprintln!("repair iterator sent to first key"); 348 349 // skip ahead if we're done some, or take a single first step 350 for _ in 0..state.latest_repaired_i { 351 iter.next(); 352 } 353 354 eprintln!( 355 "repair iterator skipped to {}th key", 356 state.latest_repaired_i 357 ); 358 359 let mut maybe_done = false; 360 361 let mut write_fast = rocksdb::WriteOptions::default(); 362 write_fast.set_sync(false); 363 write_fast.disable_wal(true); 364 365 while !stay_alive.is_cancelled() && !maybe_done { 366 // let mut batch = WriteBatch::default(); 367 368 let mut any_written = false; 369 370 for _ in 0..1000 { 371 if state.latest_repaired_i % 1_000_000 == 0 { 372 eprintln!("target iter at {}", state.latest_repaired_i); 373 } 374 state.latest_repaired_i += 1; 375 376 if !iter.valid() { 377 eprintln!("invalid iter, are we done repairing?"); 378 maybe_done = true; 379 break; 380 }; 381 382 // eprintln!("iterator seems to be valid! getting the key..."); 383 let raw_key = iter.key().unwrap(); 384 if raw_key.len() == 8 { 385 // eprintln!("found an 8-byte key, skipping it since it's probably an id..."); 386 iter.next(); 387 continue; 388 } 389 let target: TargetKey = _kr::<TargetKey>(raw_key)?; 390 let target_id: TargetId = _vr(iter.value().unwrap())?; 391 392 self.db 393 .put_cf_opt(&cf, target_id.id().to_be_bytes(), _rv(&target), &write_fast)?; 394 any_written = true; 395 iter.next(); 396 } 397 398 if any_written { 399 self.db 400 .put(TARGET_ID_REPAIR_STATE_KEY, _rv(state.clone()))?; 401 std::thread::sleep(breather); 402 } 403 } 404 405 eprintln!("repair iterator done."); 406 407 Ok(false) 408 } 409 410 pub fn start_backup( 411 &mut self, 412 path: PathBuf, 413 auto: Option<(u64, Option<usize>)>, 414 stay_alive: CancellationToken, 415 ) -> Result<()> { 416 let task = if let Some((interval_hrs, copies)) = auto { 417 eprintln!("backups: starting background task..."); 418 self.backup_task(path, interval_hrs, copies, stay_alive) 419 } else { 420 eprintln!("backups: starting a one-off backup..."); 421 thread::spawn({ 422 let db = self.db.clone(); 423 move || Self::do_backup(db, path) 424 }) 425 }; 426 self.backup_task = Arc::new(Some(task)); 427 Ok(()) 428 } 429 430 fn backup_task( 431 &self, 432 path: PathBuf, 433 interval_hrs: u64, 434 copies: Option<usize>, 435 stay_alive: CancellationToken, 436 ) -> std::thread::JoinHandle<Result<()>> { 437 let db = self.db.clone(); 438 thread::spawn(move || { 439 let limit = 440 Ratelimiter::builder(1, Duration::from_secs(interval_hrs * 60 * 60)).build()?; 441 let minimum_sleep = Duration::from_secs(1); 442 443 'quit: loop { 444 if let Err(sleep) = limit.try_wait() { 445 eprintln!("backups: background: next backup scheduled in {sleep:?}"); 446 let waiting = Instant::now(); 447 loop { 448 let remaining = sleep - waiting.elapsed(); 449 if stay_alive.is_cancelled() { 450 break 'quit; 451 } else if remaining <= Duration::ZERO { 452 break; 453 } else if remaining < minimum_sleep { 454 thread::sleep(remaining); 455 break; 456 } else { 457 thread::sleep(minimum_sleep); 458 } 459 } 460 } 461 eprintln!("backups: background: starting backup..."); 462 if let Err(e) = Self::do_backup(db.clone(), &path) { 463 eprintln!("backups: background: backup failed: {e:?}"); 464 // todo: metrics 465 } else { 466 eprintln!("backups: background: backup succeeded yay"); 467 } 468 if let Some(copies) = copies { 469 eprintln!("backups: background: trimming to {copies} saved backups..."); 470 if let Err(e) = Self::trim_backups(copies, &path) { 471 eprintln!("backups: background: failed to trim backups: {e:?}"); 472 } else { 473 eprintln!("backups: background: trimming worked!") 474 } 475 } 476 } 477 478 Ok(()) 479 }) 480 } 481 482 fn do_backup(db: Arc<DBWithThreadMode<MultiThreaded>>, path: impl AsRef<Path>) -> Result<()> { 483 let mut engine = 484 BackupEngine::open(&BackupEngineOptions::new(path)?, &rocksdb::Env::new()?)?; 485 eprintln!("backups: starting backup..."); 486 let t0 = Instant::now(); 487 if let Err(e) = engine.create_new_backup(&db) { 488 eprintln!("backups: oh no, backup failed: {e:?}"); 489 } else { 490 eprintln!("backups: yay, backup worked?"); 491 } 492 eprintln!( 493 "backups: backup finished after {:.2}s", 494 t0.elapsed().as_secs_f32() 495 ); 496 Ok(()) 497 } 498 499 fn trim_backups(num_backups_to_keep: usize, path: impl AsRef<Path>) -> Result<()> { 500 let mut engine = 501 BackupEngine::open(&BackupEngineOptions::new(path)?, &rocksdb::Env::new()?)?; 502 engine.purge_old_backups(num_backups_to_keep)?; 503 Ok(()) 504 } 505 506 fn describe_metrics() { 507 describe_histogram!( 508 "storage_rocksdb_read_seconds", 509 Unit::Seconds, 510 "duration of the read stage of actions" 511 ); 512 describe_histogram!( 513 "storage_rocksdb_action_seconds", 514 Unit::Seconds, 515 "duration of read + write of actions" 516 ); 517 describe_counter!( 518 "storage_rocksdb_batch_ops_total", 519 Unit::Count, 520 "total batched operations from actions" 521 ); 522 describe_histogram!( 523 "storage_rocksdb_delete_account_ops", 524 Unit::Count, 525 "total batched ops for account deletions" 526 ); 527 } 528 529 fn merge_op_extend_did_ids( 530 key: &[u8], 531 existing: Option<&[u8]>, 532 operands: &MergeOperands, 533 ) -> Option<Vec<u8>> { 534 let mut linkers: Vec<_> = if let Some(existing_bytes) = existing { 535 match _vr(existing_bytes) { 536 Ok(TargetLinkers(mut existing_linkers)) => { 537 existing_linkers.reserve(operands.len()); 538 existing_linkers 539 } 540 Err(e) => { 541 eprintln!("bug? could not deserialize existing target linkers: {e:?}. key={key:?}. continuing, but data will be lost!"); 542 if existing_bytes.len() < 1000 { 543 eprintln!("dropping: {existing_bytes:?}"); 544 } else { 545 eprintln!("(too long to print)"); 546 } 547 Vec::with_capacity(operands.len()) 548 } 549 } 550 } else { 551 Vec::with_capacity(operands.len()) 552 }; 553 for new_linkers in operands { 554 match _vr(new_linkers) { 555 Ok(TargetLinkers(new_linkers)) => linkers.extend(new_linkers), 556 Err(e) => { 557 eprintln!("bug? could not deserialize new target linkers: {e:?}. key={key:?}. continuing, but data will be lost!"); 558 if new_linkers.len() < 1000 { 559 eprintln!("skipping: {new_linkers:?}"); 560 } else { 561 eprintln!("(too long to print)"); 562 } 563 } 564 } 565 } 566 Some(_rv(&TargetLinkers(linkers))) 567 } 568 569 fn prefix_iter_cf<K, V, CF, P>( 570 &self, 571 cf: &CF, 572 pre: P, 573 ) -> impl Iterator<Item = (K, V)> + use<'_, K, V, CF, P> 574 where 575 K: KeyFromRocks, 576 V: ValueFromRocks, 577 CF: AsColumnFamilyRef, 578 for<'a> &'a P: AsRocksKeyPrefix<K>, 579 { 580 let mut read_opts = ReadOptions::default(); 581 read_opts.set_iterate_range(PrefixRange(_rkp(&pre))); // TODO verify: inclusive bounds? 582 self.db 583 .iterator_cf_opt(cf, read_opts, IteratorMode::Start) 584 .map_while(Result::ok) 585 .map_while(|(k, v)| Some((_kr(&k).ok()?, _vr(&v).ok()?))) 586 } 587 588 fn update_did_id_value<F>(&self, batch: &mut WriteBatch, did: &Did, update: F) -> Result<bool> 589 where 590 F: FnOnce(DidIdValue) -> Option<DidIdValue>, 591 { 592 let cf = self.db.cf_handle(DID_IDS_CF).unwrap(); 593 let Some(did_id_value) = self.did_id_table.get_id_val(&self.db, did)? else { 594 return Ok(false); 595 }; 596 let Some(new_did_id_value) = update(did_id_value) else { 597 return Ok(false); 598 }; 599 batch.put_cf(&cf, _rk(did), _rv(&new_did_id_value)); 600 Ok(true) 601 } 602 fn delete_did_id_value(&self, batch: &mut WriteBatch, did: &Did) { 603 let cf = self.db.cf_handle(DID_IDS_CF).unwrap(); 604 batch.delete_cf(&cf, _rk(did)); 605 } 606 607 fn get_target_linkers(&self, target_id: &TargetId) -> Result<TargetLinkers> { 608 let cf = self.db.cf_handle(TARGET_LINKERS_CF).unwrap(); 609 let Some(linkers_bytes) = self.db.get_cf(&cf, _rk(target_id))? else { 610 return Ok(TargetLinkers::default()); 611 }; 612 _vr(&linkers_bytes) 613 } 614 /// zero out every duplicate did. bit of a hack, looks the same as deleted, but eh 615 fn get_distinct_target_linkers(&self, target_id: &TargetId) -> Result<TargetLinkers> { 616 let mut seen = HashSet::new(); 617 let mut linkers = self.get_target_linkers(target_id)?; 618 for (did_id, _) in linkers.0.iter_mut() { 619 if seen.contains(did_id) { 620 did_id.0 = 0; 621 } else { 622 seen.insert(*did_id); 623 } 624 } 625 Ok(linkers) 626 } 627 fn merge_target_linker( 628 &self, 629 batch: &mut WriteBatch, 630 target_id: &TargetId, 631 linker_did_id: &DidId, 632 linker_rkey: &RKey, 633 ) { 634 let cf = self.db.cf_handle(TARGET_LINKERS_CF).unwrap(); 635 batch.merge_cf( 636 &cf, 637 _rk(target_id), 638 _rv(&TargetLinkers(vec![(*linker_did_id, linker_rkey.clone())])), 639 ); 640 } 641 fn update_target_linkers<F>( 642 &self, 643 batch: &mut WriteBatch, 644 target_id: &TargetId, 645 update: F, 646 ) -> Result<bool> 647 where 648 F: FnOnce(TargetLinkers) -> Option<TargetLinkers>, 649 { 650 let cf = self.db.cf_handle(TARGET_LINKERS_CF).unwrap(); 651 let existing_linkers = self.get_target_linkers(target_id)?; 652 let Some(new_linkers) = update(existing_linkers) else { 653 return Ok(false); 654 }; 655 batch.put_cf(&cf, _rk(target_id), _rv(&new_linkers)); 656 Ok(true) 657 } 658 659 fn put_link_targets( 660 &self, 661 batch: &mut WriteBatch, 662 record_link_key: &RecordLinkKey, 663 targets: &RecordLinkTargets, 664 ) { 665 // todo: we are almost idempotent to link creates with this blind write, but we'll still 666 // merge in the reverse index. we could read+modify+write here but it'll be SLOWWWWW on 667 // the path that we need to be fast. we could go back to a merge op and probably be 668 // consistent. or we can accept just a littttttle inconsistency and be idempotent on 669 // forward links but not reverse, slightly messing up deletes :/ 670 // _maybe_ we could run in slow idempotent r-m-w mode during firehose catch-up at the start, 671 // then switch to the fast version? 672 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap(); 673 batch.put_cf(&cf, _rk(record_link_key), _rv(targets)); 674 } 675 fn get_record_link_targets( 676 &self, 677 record_link_key: &RecordLinkKey, 678 ) -> Result<Option<RecordLinkTargets>> { 679 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap(); 680 if let Some(bytes) = self.db.get_cf(&cf, _rk(record_link_key))? { 681 Ok(Some(_vr(&bytes)?)) 682 } else { 683 Ok(None) 684 } 685 } 686 fn delete_record_link(&self, batch: &mut WriteBatch, record_link_key: &RecordLinkKey) { 687 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap(); 688 batch.delete_cf(&cf, _rk(record_link_key)); 689 } 690 fn iter_links_for_did_id( 691 &self, 692 did_id: &DidId, 693 ) -> impl Iterator<Item = (RecordLinkKey, RecordLinkTargets)> + use<'_> { 694 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap(); 695 self.prefix_iter_cf(&cf, RecordLinkKeyDidIdPrefix(*did_id)) 696 } 697 fn iter_targets_for_target( 698 &self, 699 target: &Target, 700 ) -> impl Iterator<Item = (TargetKey, TargetId)> + use<'_> { 701 let cf = self.db.cf_handle(TARGET_IDS_CF).unwrap(); 702 self.prefix_iter_cf(&cf, TargetIdTargetPrefix(target.clone())) 703 } 704 705 // 706 // higher-level event action handlers 707 // 708 709 fn add_links( 710 &mut self, 711 record_id: &RecordId, 712 links: &[CollectedLink], 713 batch: &mut WriteBatch, 714 ) -> Result<()> { 715 let DidIdValue(did_id, _) = 716 self.did_id_table 717 .get_or_create_id_val(&self.db, batch, &record_id.did)?; 718 719 let record_link_key = RecordLinkKey( 720 did_id, 721 Collection(record_id.collection()), 722 RKey(record_id.rkey()), 723 ); 724 let mut record_link_targets = RecordLinkTargets::with_capacity(links.len()); 725 726 for CollectedLink { target, path } in links { 727 let target_key = TargetKey( 728 Target(target.clone().into_string()), 729 Collection(record_id.collection()), 730 RPath(path.clone()), 731 ); 732 let target_id = 733 self.target_id_table 734 .get_or_create_id_val(&self.db, batch, &target_key)?; 735 self.merge_target_linker(batch, &target_id, &did_id, &RKey(record_id.rkey())); 736 737 record_link_targets.add(RecordLinkTarget(RPath(path.clone()), target_id)) 738 } 739 740 self.put_link_targets(batch, &record_link_key, &record_link_targets); 741 Ok(()) 742 } 743 744 fn remove_links(&mut self, record_id: &RecordId, batch: &mut WriteBatch) -> Result<()> { 745 let Some(DidIdValue(linking_did_id, _)) = 746 self.did_id_table.get_id_val(&self.db, &record_id.did)? 747 else { 748 return Ok(()); // we don't know her: nothing to do 749 }; 750 751 let record_link_key = RecordLinkKey( 752 linking_did_id, 753 Collection(record_id.collection()), 754 RKey(record_id.rkey()), 755 ); 756 let Some(record_link_targets) = self.get_record_link_targets(&record_link_key)? else { 757 return Ok(()); // we don't have these links 758 }; 759 760 // we do read -> modify -> write here: could merge-op in the deletes instead? 761 // otherwise it's another single-thread-constraining thing. 762 for RecordLinkTarget(_, target_id) in record_link_targets.0 { 763 self.update_target_linkers(batch, &target_id, |mut linkers| { 764 if linkers.0.is_empty() { 765 eprintln!("bug? linked target was missing when removing links"); 766 } 767 if !linkers.remove_linker(&linking_did_id, &RKey(record_id.rkey.clone())) { 768 eprintln!("bug? linked target was missing a link when removing links"); 769 } 770 Some(linkers) 771 })?; 772 } 773 774 self.delete_record_link(batch, &record_link_key); 775 Ok(()) 776 } 777 778 fn set_account(&mut self, did: &Did, active: bool, batch: &mut WriteBatch) -> Result<()> { 779 // this needs to be read-modify-write since the did_id needs to stay the same, 780 // which has a benefit of allowing to avoid adding entries for dids we don't 781 // need. reading on dids needs to be cheap anyway for the current design, and 782 // did active/inactive updates are low-freq in the firehose so, eh, it's fine. 783 self.update_did_id_value(batch, did, |current_value| { 784 Some(DidIdValue(current_value.did_id(), active)) 785 })?; 786 Ok(()) 787 } 788 789 fn delete_account(&mut self, did: &Did, batch: &mut WriteBatch) -> Result<usize> { 790 let mut total_batched_ops = 0; 791 let Some(DidIdValue(did_id, _)) = self.did_id_table.get_id_val(&self.db, did)? else { 792 return Ok(total_batched_ops); // ignore updates for dids we don't know about 793 }; 794 self.delete_did_id_value(batch, did); 795 // TODO: also delete the reverse!! 796 797 // use a separate batch for all their links, since it can be a lot and make us crash at around 1GiB batch size. 798 // this should still hopefully be crash-safe: as long as we don't actually delete the DidId entry until after all links are cleared. 799 // the above .delete_did_id_value is batched, so it shouldn't be written until we've returned from this fn successfully 800 // TODO: queue a background delete task or whatever 801 // TODO: test delete account with more links than chunk size 802 let stuff: Vec<_> = self.iter_links_for_did_id(&did_id).collect(); 803 for chunk in stuff.chunks(1024) { 804 let mut mini_batch = WriteBatch::default(); 805 806 for (record_link_key, links) in chunk { 807 self.delete_record_link(&mut mini_batch, record_link_key); // _could_ use delete range here instead of individual deletes, but since we have to scan anyway it's not obvious if it's better 808 809 for RecordLinkTarget(_, target_link_id) in links.0.iter() { 810 self.update_target_linkers(&mut mini_batch, target_link_id, |mut linkers| { 811 if !linkers.remove_linker(&did_id, &record_link_key.2) { 812 eprintln!("bug? could not find linker when removing links while deleting an account"); 813 } 814 Some(linkers) 815 })?; 816 } 817 } 818 total_batched_ops += mini_batch.len(); 819 self.db.write(mini_batch)?; // todo 820 } 821 Ok(total_batched_ops) 822 } 823} 824 825impl Drop for RocksStorage { 826 fn drop(&mut self) { 827 if self.is_writer { 828 println!("rocksdb writer: cleaning up for shutdown..."); 829 if let Err(e) = self.db.flush_wal(true) { 830 eprintln!("rocks: flushing wal failed: {e:?}"); 831 } 832 if let Err(e) = self.db.flush_opt(&{ 833 let mut opt = rocksdb::FlushOptions::default(); 834 opt.set_wait(true); 835 opt 836 }) { 837 eprintln!("rocks: flushing memtables failed: {e:?}"); 838 } 839 match Arc::get_mut(&mut self.backup_task) { 840 Some(maybe_task) => { 841 if let Some(task) = maybe_task.take() { 842 eprintln!("waiting for backup task to complete..."); 843 if let Err(e) = task.join() { 844 eprintln!("failed to join backup task: {e:?}"); 845 } 846 } 847 } 848 None => eprintln!("rocks: failed to get backup task, likely a bug."), 849 } 850 self.db.cancel_all_background_work(true); 851 } 852 } 853} 854 855impl AsRocksValue for u64 {} 856impl ValueFromRocks for u64 {} 857 858impl LinkStorage for RocksStorage { 859 fn get_cursor(&mut self) -> Result<Option<u64>> { 860 self.db 861 .get(JETSTREAM_CURSOR_KEY)? 862 .map(|b| _vr(&b)) 863 .transpose() 864 } 865 866 fn push(&mut self, event: &ActionableEvent, cursor: u64) -> Result<()> { 867 // normal ops 868 let mut batch = WriteBatch::default(); 869 let t0 = Instant::now(); 870 if let Some(action) = match event { 871 ActionableEvent::CreateLinks { record_id, links } => { 872 self.add_links(record_id, links, &mut batch)?; 873 Some("create_links") 874 } 875 ActionableEvent::UpdateLinks { 876 record_id, 877 new_links, 878 } => { 879 self.remove_links(record_id, &mut batch)?; 880 self.add_links(record_id, new_links, &mut batch)?; 881 Some("update_links") 882 } 883 ActionableEvent::DeleteRecord(record_id) => { 884 self.remove_links(record_id, &mut batch)?; 885 Some("delete_record") 886 } 887 ActionableEvent::ActivateAccount(did) => { 888 self.set_account(did, true, &mut batch)?; 889 Some("set_account_status") 890 } 891 ActionableEvent::DeactivateAccount(did) => { 892 self.set_account(did, false, &mut batch)?; 893 Some("set_account_status") 894 } 895 ActionableEvent::DeleteAccount(_) => None, // delete account is handled specially 896 } { 897 let t_read = t0.elapsed(); 898 batch.put(JETSTREAM_CURSOR_KEY.as_bytes(), _rv(cursor)); 899 let batch_ops = batch.len(); 900 self.db.write(batch)?; 901 let t_total = t0.elapsed(); 902 903 histogram!("storage_rocksdb_read_seconds", "action" => action) 904 .record(t_read.as_secs_f64()); 905 histogram!("storage_rocksdb_action_seconds", "action" => action) 906 .record(t_total.as_secs_f64()); 907 counter!("storage_rocksdb_batch_ops_total", "action" => action) 908 .increment(batch_ops as u64); 909 } 910 911 // special metrics for account deletion which can be arbitrarily expensive 912 let mut outer_batch = WriteBatch::default(); 913 let t0 = Instant::now(); 914 if let ActionableEvent::DeleteAccount(did) = event { 915 let inner_batch_ops = self.delete_account(did, &mut outer_batch)?; 916 let total_batch_ops = inner_batch_ops + outer_batch.len(); 917 self.db.write(outer_batch)?; 918 let t_total = t0.elapsed(); 919 920 histogram!("storage_rocksdb_action_seconds", "action" => "delete_account") 921 .record(t_total.as_secs_f64()); 922 counter!("storage_rocksdb_batch_ops_total", "action" => "delete_account") 923 .increment(total_batch_ops as u64); 924 histogram!("storage_rocksdb_delete_account_ops").record(total_batch_ops as f64); 925 } 926 927 Ok(()) 928 } 929 930 fn to_readable(&mut self) -> impl LinkReader { 931 let mut readable = self.clone(); 932 readable.is_writer = false; 933 readable 934 } 935} 936 937impl LinkReader for RocksStorage { 938 fn get_many_to_many_counts( 939 &self, 940 target: &str, 941 collection: &str, 942 path: &str, 943 path_to_other: &str, 944 limit: u64, 945 after: Option<String>, 946 filter_dids: &HashSet<Did>, 947 filter_to_targets: &HashSet<String>, 948 ) -> Result<PagedOrderedCollection<(String, u64, u64), String>> { 949 let collection = Collection(collection.to_string()); 950 let path = RPath(path.to_string()); 951 952 let target_key = TargetKey(Target(target.to_string()), collection.clone(), path.clone()); 953 954 // unfortunately the cursor is a, uh, stringified number. 955 // this was easier for the memstore (plain target, not target id), and 956 // making it generic is a bit awful. 957 // so... parse the number out of a string here :( 958 // TODO: this should bubble up to a BAD_REQUEST response 959 let after = after.map(|s| s.parse::<u64>().map(TargetId)).transpose()?; 960 961 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 962 eprintln!("nothin doin for this target, {target_key:?}"); 963 return Ok(Default::default()); 964 }; 965 966 let filter_did_ids: HashMap<DidId, bool> = filter_dids 967 .iter() 968 .filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose()) 969 .collect::<Result<Vec<DidIdValue>>>()? 970 .into_iter() 971 .map(|DidIdValue(id, active)| (id, active)) 972 .collect(); 973 974 // stored targets are keyed by triples of (target, collection, path). 975 // target filtering only consideres the target itself, so we actually 976 // need to do a prefix iteration of all target ids for this target and 977 // keep them all. 978 // i *think* the number of keys at a target prefix should usually be 979 // pretty small, so this is hopefully fine. but if it turns out to be 980 // large, we can push this filtering back into the main links loop and 981 // do forward db queries per backlink to get the raw target back out. 982 let mut filter_to_target_ids: HashSet<TargetId> = HashSet::new(); 983 for t in filter_to_targets { 984 for (_, target_id) in self.iter_targets_for_target(&Target(t.to_string())) { 985 filter_to_target_ids.insert(target_id); 986 } 987 } 988 989 let linkers = self.get_target_linkers(&target_id)?; 990 991 let mut grouped_counts: BTreeMap<TargetId, (u64, HashSet<DidId>)> = BTreeMap::new(); 992 993 for (did_id, rkey) in linkers.0 { 994 if did_id.is_empty() { 995 continue; 996 } 997 998 if !filter_did_ids.is_empty() && filter_did_ids.get(&did_id) != Some(&true) { 999 continue; 1000 } 1001 1002 let record_link_key = RecordLinkKey(did_id, collection.clone(), rkey); 1003 let Some(targets) = self.get_record_link_targets(&record_link_key)? else { 1004 continue; 1005 }; 1006 1007 let Some(fwd_target) = targets 1008 .0 1009 .into_iter() 1010 .filter_map(|RecordLinkTarget(rpath, target_id)| { 1011 if rpath.0 == path_to_other 1012 && (filter_to_target_ids.is_empty() 1013 || filter_to_target_ids.contains(&target_id)) 1014 { 1015 Some(target_id) 1016 } else { 1017 None 1018 } 1019 }) 1020 .take(1) 1021 .next() 1022 else { 1023 eprintln!("no forward match"); 1024 continue; 1025 }; 1026 1027 // small relief: we page over target ids, so we can already bail 1028 // reprocessing previous pages here 1029 if after.as_ref().map(|a| fwd_target <= *a).unwrap_or(false) { 1030 continue; 1031 } 1032 1033 // aand we can skip target ids that must be on future pages 1034 // (this check continues after the did-lookup, which we have to do) 1035 let page_is_full = grouped_counts.len() as u64 >= limit; 1036 if page_is_full { 1037 let current_max = grouped_counts.keys().next_back().unwrap(); // limit should be non-zero bleh 1038 if fwd_target > *current_max { 1039 continue; 1040 } 1041 } 1042 1043 // bit painful: 2-step lookup to make sure this did is active 1044 let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? else { 1045 eprintln!("failed to look up did from did_id {did_id:?}"); 1046 continue; 1047 }; 1048 let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? else { 1049 eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?"); 1050 continue; 1051 }; 1052 if !active { 1053 continue; 1054 } 1055 1056 // page-management, continued 1057 // if we have a full page, and we're inserting a *new* key less than 1058 // the current max, then we can evict the current max 1059 let mut should_evict = false; 1060 let entry = grouped_counts.entry(fwd_target.clone()).or_insert_with(|| { 1061 // this is a *new* key, so kick the max if we're full 1062 should_evict = page_is_full; 1063 Default::default() 1064 }); 1065 entry.0 += 1; 1066 entry.1.insert(did_id); 1067 1068 if should_evict { 1069 grouped_counts.pop_last(); 1070 } 1071 } 1072 1073 let mut items: Vec<(String, u64, u64)> = Vec::with_capacity(grouped_counts.len()); 1074 for (target_id, (n, dids)) in &grouped_counts { 1075 let Some(target) = self 1076 .target_id_table 1077 .get_val_from_id(&self.db, target_id.0)? 1078 else { 1079 eprintln!("failed to look up target from target_id {target_id:?}"); 1080 continue; 1081 }; 1082 items.push((target.0 .0, *n, dids.len() as u64)); 1083 } 1084 1085 let next = if grouped_counts.len() as u64 >= limit { 1086 // yeah.... it's a number saved as a string......sorry 1087 grouped_counts 1088 .keys() 1089 .next_back() 1090 .map(|k| format!("{}", k.0)) 1091 } else { 1092 None 1093 }; 1094 1095 Ok(PagedOrderedCollection { items, next }) 1096 } 1097 1098 fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> { 1099 let target_key = TargetKey( 1100 Target(target.to_string()), 1101 Collection(collection.to_string()), 1102 RPath(path.to_string()), 1103 ); 1104 if let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? { 1105 let (alive, _) = self.get_target_linkers(&target_id)?.count(); 1106 Ok(alive) 1107 } else { 1108 Ok(0) 1109 } 1110 } 1111 1112 fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> { 1113 let target_key = TargetKey( 1114 Target(target.to_string()), 1115 Collection(collection.to_string()), 1116 RPath(path.to_string()), 1117 ); 1118 if let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? { 1119 Ok(self.get_target_linkers(&target_id)?.count_distinct_dids()) 1120 } else { 1121 Ok(0) 1122 } 1123 } 1124 1125 fn get_links( 1126 &self, 1127 target: &str, 1128 collection: &str, 1129 path: &str, 1130 limit: u64, 1131 until: Option<u64>, 1132 filter_dids: &HashSet<Did>, 1133 ) -> Result<PagedAppendingCollection<RecordId>> { 1134 let target_key = TargetKey( 1135 Target(target.to_string()), 1136 Collection(collection.to_string()), 1137 RPath(path.to_string()), 1138 ); 1139 1140 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 1141 return Ok(PagedAppendingCollection { 1142 version: (0, 0), 1143 items: Vec::new(), 1144 next: None, 1145 total: 0, 1146 }); 1147 }; 1148 1149 let mut linkers = self.get_target_linkers(&target_id)?; 1150 if !filter_dids.is_empty() { 1151 let mut did_filter = HashSet::new(); 1152 for did in filter_dids { 1153 let Some(DidIdValue(did_id, active)) = 1154 self.did_id_table.get_id_val(&self.db, did)? 1155 else { 1156 eprintln!("failed to find a did_id for {did:?}"); 1157 continue; 1158 }; 1159 if !active { 1160 eprintln!("excluding inactive did from filtered results"); 1161 continue; 1162 } 1163 did_filter.insert(did_id); 1164 } 1165 linkers.0.retain(|linker| did_filter.contains(&linker.0)); 1166 } 1167 1168 let (alive, gone) = linkers.count(); 1169 let total = alive + gone; 1170 let end = until.map(|u| std::cmp::min(u, total)).unwrap_or(total) as usize; 1171 let begin = end.saturating_sub(limit as usize); 1172 let next = if begin == 0 { None } else { Some(begin as u64) }; 1173 1174 let did_id_rkeys = linkers.0[begin..end].iter().rev().collect::<Vec<_>>(); 1175 1176 let mut items = Vec::with_capacity(did_id_rkeys.len()); 1177 // TODO: use get-many (or multi-get or whatever it's called) 1178 for (did_id, rkey) in did_id_rkeys { 1179 if did_id.is_empty() { 1180 continue; 1181 } 1182 if let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? { 1183 let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? 1184 else { 1185 eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?"); 1186 continue; 1187 }; 1188 if !active { 1189 continue; 1190 } 1191 items.push(RecordId { 1192 did, 1193 collection: collection.to_string(), 1194 rkey: rkey.0.clone(), 1195 }); 1196 } else { 1197 eprintln!("failed to look up did from did_id {did_id:?}"); 1198 } 1199 } 1200 1201 Ok(PagedAppendingCollection { 1202 version: (total, gone), 1203 items, 1204 next, 1205 total: alive, 1206 }) 1207 } 1208 1209 fn get_distinct_dids( 1210 &self, 1211 target: &str, 1212 collection: &str, 1213 path: &str, 1214 limit: u64, 1215 until: Option<u64>, 1216 ) -> Result<PagedAppendingCollection<Did>> { 1217 let target_key = TargetKey( 1218 Target(target.to_string()), 1219 Collection(collection.to_string()), 1220 RPath(path.to_string()), 1221 ); 1222 1223 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 1224 return Ok(PagedAppendingCollection { 1225 version: (0, 0), 1226 items: Vec::new(), 1227 next: None, 1228 total: 0, 1229 }); 1230 }; 1231 1232 let linkers = self.get_distinct_target_linkers(&target_id)?; 1233 1234 let (alive, gone) = linkers.count(); 1235 let total = alive + gone; 1236 let end = until.map(|u| std::cmp::min(u, total)).unwrap_or(total) as usize; 1237 let begin = end.saturating_sub(limit as usize); 1238 let next = if begin == 0 { None } else { Some(begin as u64) }; 1239 1240 let did_id_rkeys = linkers.0[begin..end].iter().rev().collect::<Vec<_>>(); 1241 1242 let mut items = Vec::with_capacity(did_id_rkeys.len()); 1243 // TODO: use get-many (or multi-get or whatever it's called) 1244 for (did_id, _) in did_id_rkeys { 1245 if did_id.is_empty() { 1246 continue; 1247 } 1248 if let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? { 1249 let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? 1250 else { 1251 eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?"); 1252 continue; 1253 }; 1254 if !active { 1255 continue; 1256 } 1257 items.push(did); 1258 } else { 1259 eprintln!("failed to look up did from did_id {did_id:?}"); 1260 } 1261 } 1262 1263 Ok(PagedAppendingCollection { 1264 version: (total, gone), 1265 items, 1266 next, 1267 total: alive, 1268 }) 1269 } 1270 1271 fn get_all_record_counts(&self, target: &str) -> Result<HashMap<String, HashMap<String, u64>>> { 1272 let mut out: HashMap<String, HashMap<String, u64>> = HashMap::new(); 1273 for (target_key, target_id) in self.iter_targets_for_target(&Target(target.into())) { 1274 let TargetKey(_, Collection(ref collection), RPath(ref path)) = target_key; 1275 let (count, _) = self.get_target_linkers(&target_id)?.count(); 1276 out.entry(collection.into()) 1277 .or_default() 1278 .insert(path.clone(), count); 1279 } 1280 Ok(out) 1281 } 1282 1283 fn get_all_counts( 1284 &self, 1285 target: &str, 1286 ) -> Result<HashMap<String, HashMap<String, CountsByCount>>> { 1287 let mut out: HashMap<String, HashMap<String, CountsByCount>> = HashMap::new(); 1288 for (target_key, target_id) in self.iter_targets_for_target(&Target(target.into())) { 1289 let TargetKey(_, Collection(ref collection), RPath(ref path)) = target_key; 1290 let target_linkers = self.get_target_linkers(&target_id)?; 1291 let (records, _) = target_linkers.count(); 1292 let distinct_dids = target_linkers.count_distinct_dids(); 1293 out.entry(collection.into()).or_default().insert( 1294 path.clone(), 1295 CountsByCount { 1296 records, 1297 distinct_dids, 1298 }, 1299 ); 1300 } 1301 Ok(out) 1302 } 1303 1304 fn get_stats(&self) -> Result<StorageStats> { 1305 let dids = self.did_id_table.estimate_count(); 1306 let targetables = self.target_id_table.estimate_count(); 1307 let lr_cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap(); 1308 let linking_records = self 1309 .db 1310 .property_value_cf(&lr_cf, rocksdb::properties::ESTIMATE_NUM_KEYS)? 1311 .map(|s| s.parse::<u64>()) 1312 .transpose()? 1313 .unwrap_or(0); 1314 let started_at = self 1315 .db 1316 .get(STARTED_AT_KEY)? 1317 .map(|c| _vr(&c)) 1318 .transpose()? 1319 .unwrap_or(COZY_FIRST_CURSOR); 1320 1321 let other_data = self 1322 .db 1323 .get(TARGET_ID_REPAIR_STATE_KEY)? 1324 .map(|s| _vr(&s)) 1325 .transpose()? 1326 .map( 1327 |TargetIdRepairState { 1328 current_us_started_at, 1329 id_when_started, 1330 latest_repaired_i, 1331 }| { 1332 HashMap::from([ 1333 ("current_us_started_at".to_string(), current_us_started_at), 1334 ("id_when_started".to_string(), id_when_started), 1335 ("latest_repaired_i".to_string(), latest_repaired_i), 1336 ]) 1337 }, 1338 ) 1339 .unwrap_or(HashMap::default()); 1340 1341 Ok(StorageStats { 1342 dids, 1343 targetables, 1344 linking_records, 1345 started_at: Some(started_at), 1346 other_data, 1347 }) 1348 } 1349} 1350 1351trait AsRocksKey: Serialize {} 1352trait AsRocksKeyPrefix<K: KeyFromRocks>: Serialize {} 1353trait AsRocksValue: Serialize {} 1354trait KeyFromRocks: for<'de> Deserialize<'de> {} 1355trait ValueFromRocks: for<'de> Deserialize<'de> {} 1356 1357// did_id table 1358impl AsRocksKey for &Did {} 1359impl AsRocksValue for &DidIdValue {} 1360impl ValueFromRocks for DidIdValue {} 1361 1362// temp 1363impl KeyFromRocks for Did {} 1364impl AsRocksKey for &DidId {} 1365 1366// target_ids table 1367impl AsRocksKey for &TargetKey {} 1368impl AsRocksKeyPrefix<TargetKey> for &TargetIdTargetPrefix {} 1369impl AsRocksValue for &TargetId {} 1370impl KeyFromRocks for TargetKey {} 1371impl ValueFromRocks for TargetId {} 1372 1373// temp? 1374impl KeyFromRocks for TargetId {} 1375impl AsRocksValue for &TargetKey {} 1376 1377// target_links table 1378impl AsRocksKey for &TargetId {} 1379impl AsRocksValue for &TargetLinkers {} 1380impl ValueFromRocks for TargetLinkers {} 1381 1382// record_link_targets table 1383impl AsRocksKey for &RecordLinkKey {} 1384impl AsRocksKeyPrefix<RecordLinkKey> for &RecordLinkKeyDidIdPrefix {} 1385impl AsRocksValue for &RecordLinkTargets {} 1386impl KeyFromRocks for RecordLinkKey {} 1387impl ValueFromRocks for RecordLinkTargets {} 1388 1389pub fn _bincode_opts() -> impl BincodeOptions { 1390 bincode::DefaultOptions::new().with_big_endian() // happier db -- numeric prefixes in lsm 1391} 1392fn _rk(k: impl AsRocksKey) -> Vec<u8> { 1393 _bincode_opts().serialize(&k).unwrap() 1394} 1395fn _rkp<K: KeyFromRocks>(kp: impl AsRocksKeyPrefix<K>) -> Vec<u8> { 1396 _bincode_opts().serialize(&kp).unwrap() 1397} 1398fn _rv(v: impl AsRocksValue) -> Vec<u8> { 1399 _bincode_opts().serialize(&v).unwrap() 1400} 1401fn _kr<T: KeyFromRocks>(bytes: &[u8]) -> Result<T> { 1402 Ok(_bincode_opts().deserialize(bytes)?) 1403} 1404fn _vr<T: ValueFromRocks>(bytes: &[u8]) -> Result<T> { 1405 Ok(_bincode_opts().deserialize(bytes)?) 1406} 1407 1408#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] 1409pub struct Collection(pub String); 1410 1411#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] 1412pub struct RPath(pub String); 1413 1414#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] 1415pub struct RKey(pub String); 1416 1417impl RKey { 1418 fn empty() -> Self { 1419 RKey("".to_string()) 1420 } 1421} 1422 1423// did ids 1424#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] 1425pub struct DidId(pub u64); 1426 1427impl DidId { 1428 fn empty() -> Self { 1429 DidId(0) 1430 } 1431 fn is_empty(&self) -> bool { 1432 self.0 == 0 1433 } 1434} 1435 1436#[derive(Debug, Clone, Serialize, Deserialize)] 1437struct DidIdValue(DidId, bool); // active or not 1438 1439impl DidIdValue { 1440 fn did_id(&self) -> DidId { 1441 let Self(id, _) = self; 1442 *id 1443 } 1444} 1445 1446// target ids 1447#[derive(Debug, Clone, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq, Hash)] 1448struct TargetId(u64); // key 1449 1450#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] 1451pub struct Target(pub String); // the actual target/uri 1452 1453// targets (uris, dids, etc.): the reverse index 1454#[derive(Debug, Clone, Serialize, Deserialize)] 1455pub struct TargetKey(pub Target, pub Collection, pub RPath); 1456 1457#[derive(Debug, Default, Serialize, Deserialize)] 1458pub struct TargetLinkers(pub Vec<(DidId, RKey)>); 1459 1460impl TargetLinkers { 1461 fn remove_linker(&mut self, did: &DidId, rkey: &RKey) -> bool { 1462 if let Some(entry) = self.0.iter_mut().rfind(|d| **d == (*did, rkey.clone())) { 1463 *entry = (DidId::empty(), RKey::empty()); 1464 true 1465 } else { 1466 false 1467 } 1468 } 1469 pub fn count(&self) -> (u64, u64) { 1470 // (linkers, deleted links) 1471 let total = self.0.len() as u64; 1472 let alive = self.0.iter().filter(|(DidId(id), _)| *id != 0).count() as u64; 1473 let gone = total - alive; 1474 (alive, gone) 1475 } 1476 fn count_distinct_dids(&self) -> u64 { 1477 self.0 1478 .iter() 1479 .filter_map(|(DidId(id), _)| if *id == 0 { None } else { Some(id) }) 1480 .collect::<HashSet<_>>() 1481 .len() as u64 1482 } 1483} 1484 1485// forward links to targets so we can delete links 1486#[derive(Debug, Serialize, Deserialize)] 1487struct RecordLinkKey(DidId, Collection, RKey); 1488 1489// does this even work???? 1490#[derive(Debug, Serialize, Deserialize)] 1491struct RecordLinkKeyDidIdPrefix(DidId); 1492 1493#[derive(Debug, Serialize, Deserialize)] 1494struct TargetIdTargetPrefix(Target); 1495 1496#[derive(Debug, Serialize, Deserialize)] 1497struct RecordLinkTarget(RPath, TargetId); 1498 1499#[derive(Debug, Default, Serialize, Deserialize)] 1500struct RecordLinkTargets(Vec<RecordLinkTarget>); 1501 1502impl RecordLinkTargets { 1503 fn with_capacity(cap: usize) -> Self { 1504 Self(Vec::with_capacity(cap)) 1505 } 1506 fn add(&mut self, target: RecordLinkTarget) { 1507 self.0.push(target) 1508 } 1509} 1510 1511#[cfg(test)] 1512mod tests { 1513 use super::super::ActionableEvent; 1514 use super::*; 1515 use links::Link; 1516 use tempfile::tempdir; 1517 1518 #[test] 1519 fn rocks_delete_iterator_regression() -> Result<()> { 1520 let mut store = RocksStorage::new(tempdir()?)?; 1521 1522 // create a link from the deleter account 1523 store.push( 1524 &ActionableEvent::CreateLinks { 1525 record_id: RecordId { 1526 did: "did:plc:will-shortly-delete".into(), 1527 collection: "a.b.c".into(), 1528 rkey: "asdf".into(), 1529 }, 1530 links: vec![CollectedLink { 1531 target: Link::Uri("example.com".into()), 1532 path: ".uri".into(), 1533 }], 1534 }, 1535 0, 1536 )?; 1537 1538 // and a different link from a separate, new account (later in didid prefix iteration) 1539 store.push( 1540 &ActionableEvent::CreateLinks { 1541 record_id: RecordId { 1542 did: "did:plc:someone-else".into(), 1543 collection: "a.b.c".into(), 1544 rkey: "asdf".into(), 1545 }, 1546 links: vec![CollectedLink { 1547 target: Link::Uri("another.example.com".into()), 1548 path: ".uri".into(), 1549 }], 1550 }, 1551 0, 1552 )?; 1553 1554 // now delete the first account (this is where the buggy version explodes) 1555 store.push( 1556 &ActionableEvent::DeleteAccount("did:plc:will-shortly-delete".into()), 1557 0, 1558 )?; 1559 1560 Ok(()) 1561 } 1562 1563 #[test] 1564 fn rocks_prefix_iteration_helper() -> Result<()> { 1565 #[derive(Serialize, Deserialize)] 1566 struct Key(u8, u8); 1567 1568 #[derive(Serialize)] 1569 struct KeyPrefix(u8); 1570 1571 #[derive(Serialize, Deserialize)] 1572 struct Value(()); 1573 1574 impl AsRocksKey for &Key {} 1575 impl AsRocksKeyPrefix<Key> for &KeyPrefix {} 1576 impl AsRocksValue for &Value {} 1577 1578 impl KeyFromRocks for Key {} 1579 impl ValueFromRocks for Value {} 1580 1581 let data = RocksStorage::new(tempdir()?)?; 1582 let cf = data.db.cf_handle(DID_IDS_CF).unwrap(); 1583 let mut batch = WriteBatch::default(); 1584 1585 // not our prefix 1586 batch.put_cf(&cf, _rk(&Key(0x01, 0x00)), _rv(&Value(()))); 1587 batch.put_cf(&cf, _rk(&Key(0x01, 0xFF)), _rv(&Value(()))); 1588 1589 // our prefix! 1590 for i in 0..=0xFF { 1591 batch.put_cf(&cf, _rk(&Key(0x02, i)), _rv(&Value(()))); 1592 } 1593 1594 // not our prefix 1595 batch.put_cf(&cf, _rk(&Key(0x03, 0x00)), _rv(&Value(()))); 1596 batch.put_cf(&cf, _rk(&Key(0x03, 0xFF)), _rv(&Value(()))); 1597 1598 data.db.write(batch)?; 1599 1600 let mut okays: [bool; 256] = [false; 256]; 1601 for (i, (k, Value(_))) in data.prefix_iter_cf(&cf, KeyPrefix(0x02)).enumerate() { 1602 assert!(i < 256); 1603 assert_eq!(k.0, 0x02, "prefix iterated key has the right prefix"); 1604 assert_eq!(k.1 as usize, i, "prefixed keys are iterated in exact order"); 1605 okays[k.1 as usize] = true; 1606 } 1607 assert!(okays.iter().all(|b| *b), "every key was iterated"); 1608 1609 Ok(()) 1610 } 1611 1612 // TODO: add tests for key prefixes actually prefixing (bincode encoding _should_...) 1613}