forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use super::{
2 ActionableEvent, LinkReader, LinkStorage, PagedAppendingCollection, PagedOrderedCollection,
3 StorageStats,
4};
5use crate::{CountsByCount, Did, RecordId};
6use anyhow::{bail, Result};
7use bincode::Options as BincodeOptions;
8use links::CollectedLink;
9use metrics::{counter, describe_counter, describe_histogram, histogram, Unit};
10use ratelimit::Ratelimiter;
11use rocksdb::backup::{BackupEngine, BackupEngineOptions};
12use rocksdb::{
13 AsColumnFamilyRef, ColumnFamilyDescriptor, DBWithThreadMode, IteratorMode, MergeOperands,
14 MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch,
15};
16use serde::{Deserialize, Serialize};
17use std::collections::{BTreeMap, HashMap, HashSet};
18use std::io::Read;
19use std::marker::PhantomData;
20use std::path::{Path, PathBuf};
21use std::sync::{
22 atomic::{AtomicU64, Ordering},
23 Arc,
24};
25use std::thread;
26use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
27use tokio_util::sync::CancellationToken;
28
29static DID_IDS_CF: &str = "did_ids";
30static TARGET_IDS_CF: &str = "target_ids";
31static TARGET_LINKERS_CF: &str = "target_links";
32static LINK_TARGETS_CF: &str = "link_targets";
33
34static JETSTREAM_CURSOR_KEY: &str = "jetstream_cursor";
35static STARTED_AT_KEY: &str = "jetstream_first_cursor";
36// add reverse mappings for targets if this db was running before that was a thing
37static TARGET_ID_REPAIR_STATE_KEY: &str = "target_id_table_repair_state";
38
39static COZY_FIRST_CURSOR: u64 = 1_738_083_600_000_000; // constellation.microcosm.blue started
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
42struct TargetIdRepairState {
43 /// start time for repair, microseconds timestamp
44 current_us_started_at: u64,
45 /// id table's latest id when repair started
46 id_when_started: u64,
47 /// id table id
48 latest_repaired_i: u64,
49}
50impl AsRocksValue for TargetIdRepairState {}
51impl ValueFromRocks for TargetIdRepairState {}
52
53// todo: actually understand and set these options probably better
54fn rocks_opts_base() -> Options {
55 let mut opts = Options::default();
56 opts.set_level_compaction_dynamic_level_bytes(true);
57 opts.create_if_missing(true);
58 opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
59 opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd); // this probably doesn't work because it hasn't been enabled
60 // TODO: actually enable the bottommost compression. but after other changes run for a bit in case zstd is cpu- or mem-expensive.
61 opts
62}
63fn get_db_opts() -> Options {
64 let mut opts = rocks_opts_base();
65 opts.create_missing_column_families(true);
66 opts.increase_parallelism(4); // todo: make configurable if anyone else actually runs a different instance. start at # of cores
67 // consider doing optimize_level_style_compaction or optimize_universal_style_compaction
68 opts
69}
70fn get_db_read_opts() -> Options {
71 let mut opts = Options::default();
72 opts.optimize_for_point_lookup(16_384); // mb (run this on big machines)
73 opts
74}
75
76#[derive(Debug, Clone)]
77pub struct RocksStorage {
78 pub db: Arc<DBWithThreadMode<MultiThreaded>>, // TODO: mov seqs here (concat merge op will be fun)
79 did_id_table: IdTable<Did, DidIdValue>,
80 target_id_table: IdTable<TargetKey, TargetId>,
81 is_writer: bool,
82 backup_task: Arc<Option<thread::JoinHandle<Result<()>>>>,
83}
84
85trait IdTableValue: ValueFromRocks + Clone {
86 fn new(v: u64) -> Self;
87 fn id(&self) -> u64;
88}
89#[derive(Debug, Clone)]
90struct IdTableBase<Orig, IdVal: IdTableValue>
91where
92 Orig: KeyFromRocks,
93 for<'a> &'a Orig: AsRocksKey,
94{
95 _key_marker: PhantomData<Orig>,
96 _val_marker: PhantomData<IdVal>,
97 name: String,
98 id_seq: Arc<AtomicU64>,
99}
100impl<Orig, IdVal: IdTableValue> IdTableBase<Orig, IdVal>
101where
102 Orig: KeyFromRocks,
103 for<'a> &'a Orig: AsRocksKey,
104{
105 fn cf_descriptor(&self) -> ColumnFamilyDescriptor {
106 ColumnFamilyDescriptor::new(&self.name, rocks_opts_base())
107 }
108 fn init(self, db: &DBWithThreadMode<MultiThreaded>) -> Result<IdTable<Orig, IdVal>> {
109 if db.cf_handle(&self.name).is_none() {
110 bail!("failed to get cf handle from db -- was the db open with our .cf_descriptor()?");
111 }
112 let priv_id_seq = if let Some(seq_bytes) = db.get(self.seq_key())? {
113 if seq_bytes.len() != 8 {
114 bail!(
115 "reading bytes for u64 id seq {:?}: found the wrong number of bytes",
116 self.seq_key()
117 );
118 }
119 let mut buf: [u8; 8] = [0; 8];
120 seq_bytes.as_slice().read_exact(&mut buf)?;
121 let last_seq = u64::from_le_bytes(buf);
122 last_seq + 1
123 } else {
124 1
125 };
126 self.id_seq.store(priv_id_seq, Ordering::SeqCst);
127 Ok(IdTable {
128 base: self,
129 priv_id_seq,
130 })
131 }
132 fn seq_key(&self) -> Vec<u8> {
133 let mut k = b"__id_seq_key_plz_be_unique:".to_vec();
134 k.extend(self.name.as_bytes());
135 k
136 }
137}
138#[derive(Debug, Clone)]
139struct IdTable<Orig, IdVal: IdTableValue>
140where
141 Orig: KeyFromRocks,
142 for<'a> &'a Orig: AsRocksKey,
143{
144 base: IdTableBase<Orig, IdVal>,
145 priv_id_seq: u64,
146}
147impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal>
148where
149 Orig: KeyFromRocks,
150 for<'v> &'v IdVal: AsRocksValue,
151 for<'k> &'k Orig: AsRocksKey,
152{
153 #[must_use]
154 fn setup(name: &str) -> IdTableBase<Orig, IdVal> {
155 IdTableBase::<Orig, IdVal> {
156 _key_marker: PhantomData,
157 _val_marker: PhantomData,
158 name: name.into(),
159 id_seq: Arc::new(AtomicU64::new(0)), // zero is "uninit", first seq num will be 1
160 }
161 }
162 fn get_id_val(
163 &self,
164 db: &DBWithThreadMode<MultiThreaded>,
165 orig: &Orig,
166 ) -> Result<Option<IdVal>> {
167 let cf = db.cf_handle(&self.base.name).unwrap();
168 if let Some(_id_bytes) = db.get_cf(&cf, _rk(orig))? {
169 Ok(Some(_vr(&_id_bytes)?))
170 } else {
171 Ok(None)
172 }
173 }
174 fn __get_or_create_id_val<CF>(
175 &mut self,
176 cf: &CF,
177 db: &DBWithThreadMode<MultiThreaded>,
178 batch: &mut WriteBatch,
179 orig: &Orig,
180 ) -> Result<IdVal>
181 where
182 CF: AsColumnFamilyRef,
183 {
184 Ok(self.get_id_val(db, orig)?.unwrap_or_else(|| {
185 let prev_priv_seq = self.priv_id_seq;
186 self.priv_id_seq += 1;
187 let prev_public_seq = self.base.id_seq.swap(self.priv_id_seq, Ordering::SeqCst);
188 assert_eq!(
189 prev_public_seq, prev_priv_seq,
190 "public seq may have been modified??"
191 );
192 let id_value = IdVal::new(self.priv_id_seq);
193 batch.put(self.base.seq_key(), self.priv_id_seq.to_le_bytes());
194 batch.put_cf(cf, _rk(orig), _rv(&id_value));
195 id_value
196 }))
197 }
198
199 fn estimate_count(&self) -> u64 {
200 self.base.id_seq.load(Ordering::SeqCst) - 1 // -1 because seq zero is reserved
201 }
202
203 fn get_or_create_id_val(
204 &mut self,
205 db: &DBWithThreadMode<MultiThreaded>,
206 batch: &mut WriteBatch,
207 orig: &Orig,
208 ) -> Result<IdVal> {
209 let cf = db.cf_handle(&self.base.name).unwrap();
210 let id_val = self.__get_or_create_id_val(&cf, db, batch, orig)?;
211 // TODO: assert that the original is never a u64 that could collide
212 batch.put_cf(&cf, id_val.id().to_be_bytes(), _rk(orig)); // reversed rk/rv on purpose here :/
213 Ok(id_val)
214 }
215
216 fn get_val_from_id(
217 &self,
218 db: &DBWithThreadMode<MultiThreaded>,
219 id: u64,
220 ) -> Result<Option<Orig>> {
221 let cf = db.cf_handle(&self.base.name).unwrap();
222 if let Some(orig_bytes) = db.get_cf(&cf, id.to_be_bytes())? {
223 // HACK ish
224 Ok(Some(_kr(&orig_bytes)?))
225 } else {
226 Ok(None)
227 }
228 }
229}
230
231impl IdTableValue for DidIdValue {
232 fn new(v: u64) -> Self {
233 DidIdValue(DidId(v), true)
234 }
235 fn id(&self) -> u64 {
236 self.0 .0
237 }
238}
239impl IdTableValue for TargetId {
240 fn new(v: u64) -> Self {
241 TargetId(v)
242 }
243 fn id(&self) -> u64 {
244 self.0
245 }
246}
247
248fn now() -> u64 {
249 SystemTime::now()
250 .duration_since(UNIX_EPOCH)
251 .unwrap()
252 .as_micros() as u64
253}
254
255impl RocksStorage {
256 pub fn new(path: impl AsRef<Path>) -> Result<Self> {
257 Self::describe_metrics();
258 let me = RocksStorage::open_readmode(path, false)?;
259 me.global_init()?;
260 Ok(me)
261 }
262
263 pub fn open_readonly(path: impl AsRef<Path>) -> Result<Self> {
264 RocksStorage::open_readmode(path, true)
265 }
266
267 fn open_readmode(path: impl AsRef<Path>, readonly: bool) -> Result<Self> {
268 let did_id_table = IdTable::setup(DID_IDS_CF);
269 let target_id_table = IdTable::setup(TARGET_IDS_CF);
270
271 // note: global stuff like jetstream cursor goes in the default cf
272 // these are bonus extra cfs
273 let cfs = vec![
274 // id reference tables
275 did_id_table.cf_descriptor(),
276 target_id_table.cf_descriptor(),
277 // the reverse links:
278 ColumnFamilyDescriptor::new(TARGET_LINKERS_CF, {
279 let mut opts = rocks_opts_base();
280 opts.set_merge_operator_associative(
281 "merge_op_extend_did_ids",
282 Self::merge_op_extend_did_ids,
283 );
284 opts
285 }),
286 // unfortunately we also need forward links to handle deletes
287 ColumnFamilyDescriptor::new(LINK_TARGETS_CF, rocks_opts_base()),
288 ];
289
290 let db = if readonly {
291 DBWithThreadMode::open_cf_descriptors_read_only(&get_db_read_opts(), path, cfs, false)?
292 } else {
293 DBWithThreadMode::open_cf_descriptors(&get_db_opts(), path, cfs)?
294 };
295
296 let db = Arc::new(db);
297 let did_id_table = did_id_table.init(&db)?;
298 let target_id_table = target_id_table.init(&db)?;
299 Ok(Self {
300 db,
301 did_id_table,
302 target_id_table,
303 is_writer: !readonly,
304 backup_task: None.into(),
305 })
306 }
307
308 fn global_init(&self) -> Result<()> {
309 let first_run = self.db.get(JETSTREAM_CURSOR_KEY)?.is_some();
310 if first_run {
311 self.db.put(STARTED_AT_KEY, _rv(now()))?;
312
313 // hack / temporary: if we're a new db, put in a completed repair
314 // state so we don't run repairs (repairs are for old-code dbs)
315 let completed = TargetIdRepairState {
316 id_when_started: 0,
317 current_us_started_at: 0,
318 latest_repaired_i: 0,
319 };
320 self.db.put(TARGET_ID_REPAIR_STATE_KEY, _rv(completed))?;
321 }
322 Ok(())
323 }
324
325 pub fn run_repair(&self, breather: Duration, stay_alive: CancellationToken) -> Result<bool> {
326 let mut state = match self
327 .db
328 .get(TARGET_ID_REPAIR_STATE_KEY)?
329 .map(|s| _vr(&s))
330 .transpose()?
331 {
332 Some(s) => s,
333 None => TargetIdRepairState {
334 id_when_started: self.did_id_table.priv_id_seq,
335 current_us_started_at: now(),
336 latest_repaired_i: 0,
337 },
338 };
339
340 eprintln!("initial repair state: {state:?}");
341
342 let cf = self.db.cf_handle(TARGET_IDS_CF).unwrap();
343
344 let mut iter = self.db.raw_iterator_cf(&cf);
345 iter.seek_to_first();
346
347 eprintln!("repair iterator sent to first key");
348
349 // skip ahead if we're done some, or take a single first step
350 for _ in 0..state.latest_repaired_i {
351 iter.next();
352 }
353
354 eprintln!(
355 "repair iterator skipped to {}th key",
356 state.latest_repaired_i
357 );
358
359 let mut maybe_done = false;
360
361 let mut write_fast = rocksdb::WriteOptions::default();
362 write_fast.set_sync(false);
363 write_fast.disable_wal(true);
364
365 while !stay_alive.is_cancelled() && !maybe_done {
366 // let mut batch = WriteBatch::default();
367
368 let mut any_written = false;
369
370 for _ in 0..1000 {
371 if state.latest_repaired_i % 1_000_000 == 0 {
372 eprintln!("target iter at {}", state.latest_repaired_i);
373 }
374 state.latest_repaired_i += 1;
375
376 if !iter.valid() {
377 eprintln!("invalid iter, are we done repairing?");
378 maybe_done = true;
379 break;
380 };
381
382 // eprintln!("iterator seems to be valid! getting the key...");
383 let raw_key = iter.key().unwrap();
384 if raw_key.len() == 8 {
385 // eprintln!("found an 8-byte key, skipping it since it's probably an id...");
386 iter.next();
387 continue;
388 }
389 let target: TargetKey = _kr::<TargetKey>(raw_key)?;
390 let target_id: TargetId = _vr(iter.value().unwrap())?;
391
392 self.db
393 .put_cf_opt(&cf, target_id.id().to_be_bytes(), _rv(&target), &write_fast)?;
394 any_written = true;
395 iter.next();
396 }
397
398 if any_written {
399 self.db
400 .put(TARGET_ID_REPAIR_STATE_KEY, _rv(state.clone()))?;
401 std::thread::sleep(breather);
402 }
403 }
404
405 eprintln!("repair iterator done.");
406
407 Ok(false)
408 }
409
410 pub fn start_backup(
411 &mut self,
412 path: PathBuf,
413 auto: Option<(u64, Option<usize>)>,
414 stay_alive: CancellationToken,
415 ) -> Result<()> {
416 let task = if let Some((interval_hrs, copies)) = auto {
417 eprintln!("backups: starting background task...");
418 self.backup_task(path, interval_hrs, copies, stay_alive)
419 } else {
420 eprintln!("backups: starting a one-off backup...");
421 thread::spawn({
422 let db = self.db.clone();
423 move || Self::do_backup(db, path)
424 })
425 };
426 self.backup_task = Arc::new(Some(task));
427 Ok(())
428 }
429
430 fn backup_task(
431 &self,
432 path: PathBuf,
433 interval_hrs: u64,
434 copies: Option<usize>,
435 stay_alive: CancellationToken,
436 ) -> std::thread::JoinHandle<Result<()>> {
437 let db = self.db.clone();
438 thread::spawn(move || {
439 let limit =
440 Ratelimiter::builder(1, Duration::from_secs(interval_hrs * 60 * 60)).build()?;
441 let minimum_sleep = Duration::from_secs(1);
442
443 'quit: loop {
444 if let Err(sleep) = limit.try_wait() {
445 eprintln!("backups: background: next backup scheduled in {sleep:?}");
446 let waiting = Instant::now();
447 loop {
448 let remaining = sleep - waiting.elapsed();
449 if stay_alive.is_cancelled() {
450 break 'quit;
451 } else if remaining <= Duration::ZERO {
452 break;
453 } else if remaining < minimum_sleep {
454 thread::sleep(remaining);
455 break;
456 } else {
457 thread::sleep(minimum_sleep);
458 }
459 }
460 }
461 eprintln!("backups: background: starting backup...");
462 if let Err(e) = Self::do_backup(db.clone(), &path) {
463 eprintln!("backups: background: backup failed: {e:?}");
464 // todo: metrics
465 } else {
466 eprintln!("backups: background: backup succeeded yay");
467 }
468 if let Some(copies) = copies {
469 eprintln!("backups: background: trimming to {copies} saved backups...");
470 if let Err(e) = Self::trim_backups(copies, &path) {
471 eprintln!("backups: background: failed to trim backups: {e:?}");
472 } else {
473 eprintln!("backups: background: trimming worked!")
474 }
475 }
476 }
477
478 Ok(())
479 })
480 }
481
482 fn do_backup(db: Arc<DBWithThreadMode<MultiThreaded>>, path: impl AsRef<Path>) -> Result<()> {
483 let mut engine =
484 BackupEngine::open(&BackupEngineOptions::new(path)?, &rocksdb::Env::new()?)?;
485 eprintln!("backups: starting backup...");
486 let t0 = Instant::now();
487 if let Err(e) = engine.create_new_backup(&db) {
488 eprintln!("backups: oh no, backup failed: {e:?}");
489 } else {
490 eprintln!("backups: yay, backup worked?");
491 }
492 eprintln!(
493 "backups: backup finished after {:.2}s",
494 t0.elapsed().as_secs_f32()
495 );
496 Ok(())
497 }
498
499 fn trim_backups(num_backups_to_keep: usize, path: impl AsRef<Path>) -> Result<()> {
500 let mut engine =
501 BackupEngine::open(&BackupEngineOptions::new(path)?, &rocksdb::Env::new()?)?;
502 engine.purge_old_backups(num_backups_to_keep)?;
503 Ok(())
504 }
505
506 fn describe_metrics() {
507 describe_histogram!(
508 "storage_rocksdb_read_seconds",
509 Unit::Seconds,
510 "duration of the read stage of actions"
511 );
512 describe_histogram!(
513 "storage_rocksdb_action_seconds",
514 Unit::Seconds,
515 "duration of read + write of actions"
516 );
517 describe_counter!(
518 "storage_rocksdb_batch_ops_total",
519 Unit::Count,
520 "total batched operations from actions"
521 );
522 describe_histogram!(
523 "storage_rocksdb_delete_account_ops",
524 Unit::Count,
525 "total batched ops for account deletions"
526 );
527 }
528
529 fn merge_op_extend_did_ids(
530 key: &[u8],
531 existing: Option<&[u8]>,
532 operands: &MergeOperands,
533 ) -> Option<Vec<u8>> {
534 let mut linkers: Vec<_> = if let Some(existing_bytes) = existing {
535 match _vr(existing_bytes) {
536 Ok(TargetLinkers(mut existing_linkers)) => {
537 existing_linkers.reserve(operands.len());
538 existing_linkers
539 }
540 Err(e) => {
541 eprintln!("bug? could not deserialize existing target linkers: {e:?}. key={key:?}. continuing, but data will be lost!");
542 if existing_bytes.len() < 1000 {
543 eprintln!("dropping: {existing_bytes:?}");
544 } else {
545 eprintln!("(too long to print)");
546 }
547 Vec::with_capacity(operands.len())
548 }
549 }
550 } else {
551 Vec::with_capacity(operands.len())
552 };
553 for new_linkers in operands {
554 match _vr(new_linkers) {
555 Ok(TargetLinkers(new_linkers)) => linkers.extend(new_linkers),
556 Err(e) => {
557 eprintln!("bug? could not deserialize new target linkers: {e:?}. key={key:?}. continuing, but data will be lost!");
558 if new_linkers.len() < 1000 {
559 eprintln!("skipping: {new_linkers:?}");
560 } else {
561 eprintln!("(too long to print)");
562 }
563 }
564 }
565 }
566 Some(_rv(&TargetLinkers(linkers)))
567 }
568
569 fn prefix_iter_cf<K, V, CF, P>(
570 &self,
571 cf: &CF,
572 pre: P,
573 ) -> impl Iterator<Item = (K, V)> + use<'_, K, V, CF, P>
574 where
575 K: KeyFromRocks,
576 V: ValueFromRocks,
577 CF: AsColumnFamilyRef,
578 for<'a> &'a P: AsRocksKeyPrefix<K>,
579 {
580 let mut read_opts = ReadOptions::default();
581 read_opts.set_iterate_range(PrefixRange(_rkp(&pre))); // TODO verify: inclusive bounds?
582 self.db
583 .iterator_cf_opt(cf, read_opts, IteratorMode::Start)
584 .map_while(Result::ok)
585 .map_while(|(k, v)| Some((_kr(&k).ok()?, _vr(&v).ok()?)))
586 }
587
588 fn update_did_id_value<F>(&self, batch: &mut WriteBatch, did: &Did, update: F) -> Result<bool>
589 where
590 F: FnOnce(DidIdValue) -> Option<DidIdValue>,
591 {
592 let cf = self.db.cf_handle(DID_IDS_CF).unwrap();
593 let Some(did_id_value) = self.did_id_table.get_id_val(&self.db, did)? else {
594 return Ok(false);
595 };
596 let Some(new_did_id_value) = update(did_id_value) else {
597 return Ok(false);
598 };
599 batch.put_cf(&cf, _rk(did), _rv(&new_did_id_value));
600 Ok(true)
601 }
602 fn delete_did_id_value(&self, batch: &mut WriteBatch, did: &Did) {
603 let cf = self.db.cf_handle(DID_IDS_CF).unwrap();
604 batch.delete_cf(&cf, _rk(did));
605 }
606
607 fn get_target_linkers(&self, target_id: &TargetId) -> Result<TargetLinkers> {
608 let cf = self.db.cf_handle(TARGET_LINKERS_CF).unwrap();
609 let Some(linkers_bytes) = self.db.get_cf(&cf, _rk(target_id))? else {
610 return Ok(TargetLinkers::default());
611 };
612 _vr(&linkers_bytes)
613 }
614 /// zero out every duplicate did. bit of a hack, looks the same as deleted, but eh
615 fn get_distinct_target_linkers(&self, target_id: &TargetId) -> Result<TargetLinkers> {
616 let mut seen = HashSet::new();
617 let mut linkers = self.get_target_linkers(target_id)?;
618 for (did_id, _) in linkers.0.iter_mut() {
619 if seen.contains(did_id) {
620 did_id.0 = 0;
621 } else {
622 seen.insert(*did_id);
623 }
624 }
625 Ok(linkers)
626 }
627 fn merge_target_linker(
628 &self,
629 batch: &mut WriteBatch,
630 target_id: &TargetId,
631 linker_did_id: &DidId,
632 linker_rkey: &RKey,
633 ) {
634 let cf = self.db.cf_handle(TARGET_LINKERS_CF).unwrap();
635 batch.merge_cf(
636 &cf,
637 _rk(target_id),
638 _rv(&TargetLinkers(vec![(*linker_did_id, linker_rkey.clone())])),
639 );
640 }
641 fn update_target_linkers<F>(
642 &self,
643 batch: &mut WriteBatch,
644 target_id: &TargetId,
645 update: F,
646 ) -> Result<bool>
647 where
648 F: FnOnce(TargetLinkers) -> Option<TargetLinkers>,
649 {
650 let cf = self.db.cf_handle(TARGET_LINKERS_CF).unwrap();
651 let existing_linkers = self.get_target_linkers(target_id)?;
652 let Some(new_linkers) = update(existing_linkers) else {
653 return Ok(false);
654 };
655 batch.put_cf(&cf, _rk(target_id), _rv(&new_linkers));
656 Ok(true)
657 }
658
659 fn put_link_targets(
660 &self,
661 batch: &mut WriteBatch,
662 record_link_key: &RecordLinkKey,
663 targets: &RecordLinkTargets,
664 ) {
665 // todo: we are almost idempotent to link creates with this blind write, but we'll still
666 // merge in the reverse index. we could read+modify+write here but it'll be SLOWWWWW on
667 // the path that we need to be fast. we could go back to a merge op and probably be
668 // consistent. or we can accept just a littttttle inconsistency and be idempotent on
669 // forward links but not reverse, slightly messing up deletes :/
670 // _maybe_ we could run in slow idempotent r-m-w mode during firehose catch-up at the start,
671 // then switch to the fast version?
672 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap();
673 batch.put_cf(&cf, _rk(record_link_key), _rv(targets));
674 }
675 fn get_record_link_targets(
676 &self,
677 record_link_key: &RecordLinkKey,
678 ) -> Result<Option<RecordLinkTargets>> {
679 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap();
680 if let Some(bytes) = self.db.get_cf(&cf, _rk(record_link_key))? {
681 Ok(Some(_vr(&bytes)?))
682 } else {
683 Ok(None)
684 }
685 }
686 fn delete_record_link(&self, batch: &mut WriteBatch, record_link_key: &RecordLinkKey) {
687 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap();
688 batch.delete_cf(&cf, _rk(record_link_key));
689 }
690 fn iter_links_for_did_id(
691 &self,
692 did_id: &DidId,
693 ) -> impl Iterator<Item = (RecordLinkKey, RecordLinkTargets)> + use<'_> {
694 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap();
695 self.prefix_iter_cf(&cf, RecordLinkKeyDidIdPrefix(*did_id))
696 }
697 fn iter_targets_for_target(
698 &self,
699 target: &Target,
700 ) -> impl Iterator<Item = (TargetKey, TargetId)> + use<'_> {
701 let cf = self.db.cf_handle(TARGET_IDS_CF).unwrap();
702 self.prefix_iter_cf(&cf, TargetIdTargetPrefix(target.clone()))
703 }
704
705 //
706 // higher-level event action handlers
707 //
708
709 fn add_links(
710 &mut self,
711 record_id: &RecordId,
712 links: &[CollectedLink],
713 batch: &mut WriteBatch,
714 ) -> Result<()> {
715 let DidIdValue(did_id, _) =
716 self.did_id_table
717 .get_or_create_id_val(&self.db, batch, &record_id.did)?;
718
719 let record_link_key = RecordLinkKey(
720 did_id,
721 Collection(record_id.collection()),
722 RKey(record_id.rkey()),
723 );
724 let mut record_link_targets = RecordLinkTargets::with_capacity(links.len());
725
726 for CollectedLink { target, path } in links {
727 let target_key = TargetKey(
728 Target(target.clone().into_string()),
729 Collection(record_id.collection()),
730 RPath(path.clone()),
731 );
732 let target_id =
733 self.target_id_table
734 .get_or_create_id_val(&self.db, batch, &target_key)?;
735 self.merge_target_linker(batch, &target_id, &did_id, &RKey(record_id.rkey()));
736
737 record_link_targets.add(RecordLinkTarget(RPath(path.clone()), target_id))
738 }
739
740 self.put_link_targets(batch, &record_link_key, &record_link_targets);
741 Ok(())
742 }
743
744 fn remove_links(&mut self, record_id: &RecordId, batch: &mut WriteBatch) -> Result<()> {
745 let Some(DidIdValue(linking_did_id, _)) =
746 self.did_id_table.get_id_val(&self.db, &record_id.did)?
747 else {
748 return Ok(()); // we don't know her: nothing to do
749 };
750
751 let record_link_key = RecordLinkKey(
752 linking_did_id,
753 Collection(record_id.collection()),
754 RKey(record_id.rkey()),
755 );
756 let Some(record_link_targets) = self.get_record_link_targets(&record_link_key)? else {
757 return Ok(()); // we don't have these links
758 };
759
760 // we do read -> modify -> write here: could merge-op in the deletes instead?
761 // otherwise it's another single-thread-constraining thing.
762 for RecordLinkTarget(_, target_id) in record_link_targets.0 {
763 self.update_target_linkers(batch, &target_id, |mut linkers| {
764 if linkers.0.is_empty() {
765 eprintln!("bug? linked target was missing when removing links");
766 }
767 if !linkers.remove_linker(&linking_did_id, &RKey(record_id.rkey.clone())) {
768 eprintln!("bug? linked target was missing a link when removing links");
769 }
770 Some(linkers)
771 })?;
772 }
773
774 self.delete_record_link(batch, &record_link_key);
775 Ok(())
776 }
777
778 fn set_account(&mut self, did: &Did, active: bool, batch: &mut WriteBatch) -> Result<()> {
779 // this needs to be read-modify-write since the did_id needs to stay the same,
780 // which has a benefit of allowing to avoid adding entries for dids we don't
781 // need. reading on dids needs to be cheap anyway for the current design, and
782 // did active/inactive updates are low-freq in the firehose so, eh, it's fine.
783 self.update_did_id_value(batch, did, |current_value| {
784 Some(DidIdValue(current_value.did_id(), active))
785 })?;
786 Ok(())
787 }
788
789 fn delete_account(&mut self, did: &Did, batch: &mut WriteBatch) -> Result<usize> {
790 let mut total_batched_ops = 0;
791 let Some(DidIdValue(did_id, _)) = self.did_id_table.get_id_val(&self.db, did)? else {
792 return Ok(total_batched_ops); // ignore updates for dids we don't know about
793 };
794 self.delete_did_id_value(batch, did);
795 // TODO: also delete the reverse!!
796
797 // use a separate batch for all their links, since it can be a lot and make us crash at around 1GiB batch size.
798 // this should still hopefully be crash-safe: as long as we don't actually delete the DidId entry until after all links are cleared.
799 // the above .delete_did_id_value is batched, so it shouldn't be written until we've returned from this fn successfully
800 // TODO: queue a background delete task or whatever
801 // TODO: test delete account with more links than chunk size
802 let stuff: Vec<_> = self.iter_links_for_did_id(&did_id).collect();
803 for chunk in stuff.chunks(1024) {
804 let mut mini_batch = WriteBatch::default();
805
806 for (record_link_key, links) in chunk {
807 self.delete_record_link(&mut mini_batch, record_link_key); // _could_ use delete range here instead of individual deletes, but since we have to scan anyway it's not obvious if it's better
808
809 for RecordLinkTarget(_, target_link_id) in links.0.iter() {
810 self.update_target_linkers(&mut mini_batch, target_link_id, |mut linkers| {
811 if !linkers.remove_linker(&did_id, &record_link_key.2) {
812 eprintln!("bug? could not find linker when removing links while deleting an account");
813 }
814 Some(linkers)
815 })?;
816 }
817 }
818 total_batched_ops += mini_batch.len();
819 self.db.write(mini_batch)?; // todo
820 }
821 Ok(total_batched_ops)
822 }
823}
824
825impl Drop for RocksStorage {
826 fn drop(&mut self) {
827 if self.is_writer {
828 println!("rocksdb writer: cleaning up for shutdown...");
829 if let Err(e) = self.db.flush_wal(true) {
830 eprintln!("rocks: flushing wal failed: {e:?}");
831 }
832 if let Err(e) = self.db.flush_opt(&{
833 let mut opt = rocksdb::FlushOptions::default();
834 opt.set_wait(true);
835 opt
836 }) {
837 eprintln!("rocks: flushing memtables failed: {e:?}");
838 }
839 match Arc::get_mut(&mut self.backup_task) {
840 Some(maybe_task) => {
841 if let Some(task) = maybe_task.take() {
842 eprintln!("waiting for backup task to complete...");
843 if let Err(e) = task.join() {
844 eprintln!("failed to join backup task: {e:?}");
845 }
846 }
847 }
848 None => eprintln!("rocks: failed to get backup task, likely a bug."),
849 }
850 self.db.cancel_all_background_work(true);
851 }
852 }
853}
854
855impl AsRocksValue for u64 {}
856impl ValueFromRocks for u64 {}
857
858impl LinkStorage for RocksStorage {
859 fn get_cursor(&mut self) -> Result<Option<u64>> {
860 self.db
861 .get(JETSTREAM_CURSOR_KEY)?
862 .map(|b| _vr(&b))
863 .transpose()
864 }
865
866 fn push(&mut self, event: &ActionableEvent, cursor: u64) -> Result<()> {
867 // normal ops
868 let mut batch = WriteBatch::default();
869 let t0 = Instant::now();
870 if let Some(action) = match event {
871 ActionableEvent::CreateLinks { record_id, links } => {
872 self.add_links(record_id, links, &mut batch)?;
873 Some("create_links")
874 }
875 ActionableEvent::UpdateLinks {
876 record_id,
877 new_links,
878 } => {
879 self.remove_links(record_id, &mut batch)?;
880 self.add_links(record_id, new_links, &mut batch)?;
881 Some("update_links")
882 }
883 ActionableEvent::DeleteRecord(record_id) => {
884 self.remove_links(record_id, &mut batch)?;
885 Some("delete_record")
886 }
887 ActionableEvent::ActivateAccount(did) => {
888 self.set_account(did, true, &mut batch)?;
889 Some("set_account_status")
890 }
891 ActionableEvent::DeactivateAccount(did) => {
892 self.set_account(did, false, &mut batch)?;
893 Some("set_account_status")
894 }
895 ActionableEvent::DeleteAccount(_) => None, // delete account is handled specially
896 } {
897 let t_read = t0.elapsed();
898 batch.put(JETSTREAM_CURSOR_KEY.as_bytes(), _rv(cursor));
899 let batch_ops = batch.len();
900 self.db.write(batch)?;
901 let t_total = t0.elapsed();
902
903 histogram!("storage_rocksdb_read_seconds", "action" => action)
904 .record(t_read.as_secs_f64());
905 histogram!("storage_rocksdb_action_seconds", "action" => action)
906 .record(t_total.as_secs_f64());
907 counter!("storage_rocksdb_batch_ops_total", "action" => action)
908 .increment(batch_ops as u64);
909 }
910
911 // special metrics for account deletion which can be arbitrarily expensive
912 let mut outer_batch = WriteBatch::default();
913 let t0 = Instant::now();
914 if let ActionableEvent::DeleteAccount(did) = event {
915 let inner_batch_ops = self.delete_account(did, &mut outer_batch)?;
916 let total_batch_ops = inner_batch_ops + outer_batch.len();
917 self.db.write(outer_batch)?;
918 let t_total = t0.elapsed();
919
920 histogram!("storage_rocksdb_action_seconds", "action" => "delete_account")
921 .record(t_total.as_secs_f64());
922 counter!("storage_rocksdb_batch_ops_total", "action" => "delete_account")
923 .increment(total_batch_ops as u64);
924 histogram!("storage_rocksdb_delete_account_ops").record(total_batch_ops as f64);
925 }
926
927 Ok(())
928 }
929
930 fn to_readable(&mut self) -> impl LinkReader {
931 let mut readable = self.clone();
932 readable.is_writer = false;
933 readable
934 }
935}
936
937impl LinkReader for RocksStorage {
938 fn get_many_to_many_counts(
939 &self,
940 target: &str,
941 collection: &str,
942 path: &str,
943 path_to_other: &str,
944 limit: u64,
945 after: Option<String>,
946 filter_dids: &HashSet<Did>,
947 filter_to_targets: &HashSet<String>,
948 ) -> Result<PagedOrderedCollection<(String, u64, u64), String>> {
949 let collection = Collection(collection.to_string());
950 let path = RPath(path.to_string());
951
952 let target_key = TargetKey(Target(target.to_string()), collection.clone(), path.clone());
953
954 // unfortunately the cursor is a, uh, stringified number.
955 // this was easier for the memstore (plain target, not target id), and
956 // making it generic is a bit awful.
957 // so... parse the number out of a string here :(
958 // TODO: this should bubble up to a BAD_REQUEST response
959 let after = after.map(|s| s.parse::<u64>().map(TargetId)).transpose()?;
960
961 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else {
962 eprintln!("nothin doin for this target, {target_key:?}");
963 return Ok(Default::default());
964 };
965
966 let filter_did_ids: HashMap<DidId, bool> = filter_dids
967 .iter()
968 .filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose())
969 .collect::<Result<Vec<DidIdValue>>>()?
970 .into_iter()
971 .map(|DidIdValue(id, active)| (id, active))
972 .collect();
973
974 // stored targets are keyed by triples of (target, collection, path).
975 // target filtering only consideres the target itself, so we actually
976 // need to do a prefix iteration of all target ids for this target and
977 // keep them all.
978 // i *think* the number of keys at a target prefix should usually be
979 // pretty small, so this is hopefully fine. but if it turns out to be
980 // large, we can push this filtering back into the main links loop and
981 // do forward db queries per backlink to get the raw target back out.
982 let mut filter_to_target_ids: HashSet<TargetId> = HashSet::new();
983 for t in filter_to_targets {
984 for (_, target_id) in self.iter_targets_for_target(&Target(t.to_string())) {
985 filter_to_target_ids.insert(target_id);
986 }
987 }
988
989 let linkers = self.get_target_linkers(&target_id)?;
990
991 let mut grouped_counts: BTreeMap<TargetId, (u64, HashSet<DidId>)> = BTreeMap::new();
992
993 for (did_id, rkey) in linkers.0 {
994 if did_id.is_empty() {
995 continue;
996 }
997
998 if !filter_did_ids.is_empty() && filter_did_ids.get(&did_id) != Some(&true) {
999 continue;
1000 }
1001
1002 let record_link_key = RecordLinkKey(did_id, collection.clone(), rkey);
1003 let Some(targets) = self.get_record_link_targets(&record_link_key)? else {
1004 continue;
1005 };
1006
1007 let Some(fwd_target) = targets
1008 .0
1009 .into_iter()
1010 .filter_map(|RecordLinkTarget(rpath, target_id)| {
1011 if rpath.0 == path_to_other
1012 && (filter_to_target_ids.is_empty()
1013 || filter_to_target_ids.contains(&target_id))
1014 {
1015 Some(target_id)
1016 } else {
1017 None
1018 }
1019 })
1020 .take(1)
1021 .next()
1022 else {
1023 eprintln!("no forward match");
1024 continue;
1025 };
1026
1027 // small relief: we page over target ids, so we can already bail
1028 // reprocessing previous pages here
1029 if after.as_ref().map(|a| fwd_target <= *a).unwrap_or(false) {
1030 continue;
1031 }
1032
1033 // aand we can skip target ids that must be on future pages
1034 // (this check continues after the did-lookup, which we have to do)
1035 let page_is_full = grouped_counts.len() as u64 >= limit;
1036 if page_is_full {
1037 let current_max = grouped_counts.keys().next_back().unwrap(); // limit should be non-zero bleh
1038 if fwd_target > *current_max {
1039 continue;
1040 }
1041 }
1042
1043 // bit painful: 2-step lookup to make sure this did is active
1044 let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? else {
1045 eprintln!("failed to look up did from did_id {did_id:?}");
1046 continue;
1047 };
1048 let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? else {
1049 eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?");
1050 continue;
1051 };
1052 if !active {
1053 continue;
1054 }
1055
1056 // page-management, continued
1057 // if we have a full page, and we're inserting a *new* key less than
1058 // the current max, then we can evict the current max
1059 let mut should_evict = false;
1060 let entry = grouped_counts.entry(fwd_target.clone()).or_insert_with(|| {
1061 // this is a *new* key, so kick the max if we're full
1062 should_evict = page_is_full;
1063 Default::default()
1064 });
1065 entry.0 += 1;
1066 entry.1.insert(did_id);
1067
1068 if should_evict {
1069 grouped_counts.pop_last();
1070 }
1071 }
1072
1073 let mut items: Vec<(String, u64, u64)> = Vec::with_capacity(grouped_counts.len());
1074 for (target_id, (n, dids)) in &grouped_counts {
1075 let Some(target) = self
1076 .target_id_table
1077 .get_val_from_id(&self.db, target_id.0)?
1078 else {
1079 eprintln!("failed to look up target from target_id {target_id:?}");
1080 continue;
1081 };
1082 items.push((target.0 .0, *n, dids.len() as u64));
1083 }
1084
1085 let next = if grouped_counts.len() as u64 >= limit {
1086 // yeah.... it's a number saved as a string......sorry
1087 grouped_counts
1088 .keys()
1089 .next_back()
1090 .map(|k| format!("{}", k.0))
1091 } else {
1092 None
1093 };
1094
1095 Ok(PagedOrderedCollection { items, next })
1096 }
1097
1098 fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> {
1099 let target_key = TargetKey(
1100 Target(target.to_string()),
1101 Collection(collection.to_string()),
1102 RPath(path.to_string()),
1103 );
1104 if let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? {
1105 let (alive, _) = self.get_target_linkers(&target_id)?.count();
1106 Ok(alive)
1107 } else {
1108 Ok(0)
1109 }
1110 }
1111
1112 fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> {
1113 let target_key = TargetKey(
1114 Target(target.to_string()),
1115 Collection(collection.to_string()),
1116 RPath(path.to_string()),
1117 );
1118 if let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? {
1119 Ok(self.get_target_linkers(&target_id)?.count_distinct_dids())
1120 } else {
1121 Ok(0)
1122 }
1123 }
1124
1125 fn get_links(
1126 &self,
1127 target: &str,
1128 collection: &str,
1129 path: &str,
1130 limit: u64,
1131 until: Option<u64>,
1132 filter_dids: &HashSet<Did>,
1133 ) -> Result<PagedAppendingCollection<RecordId>> {
1134 let target_key = TargetKey(
1135 Target(target.to_string()),
1136 Collection(collection.to_string()),
1137 RPath(path.to_string()),
1138 );
1139
1140 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else {
1141 return Ok(PagedAppendingCollection {
1142 version: (0, 0),
1143 items: Vec::new(),
1144 next: None,
1145 total: 0,
1146 });
1147 };
1148
1149 let mut linkers = self.get_target_linkers(&target_id)?;
1150 if !filter_dids.is_empty() {
1151 let mut did_filter = HashSet::new();
1152 for did in filter_dids {
1153 let Some(DidIdValue(did_id, active)) =
1154 self.did_id_table.get_id_val(&self.db, did)?
1155 else {
1156 eprintln!("failed to find a did_id for {did:?}");
1157 continue;
1158 };
1159 if !active {
1160 eprintln!("excluding inactive did from filtered results");
1161 continue;
1162 }
1163 did_filter.insert(did_id);
1164 }
1165 linkers.0.retain(|linker| did_filter.contains(&linker.0));
1166 }
1167
1168 let (alive, gone) = linkers.count();
1169 let total = alive + gone;
1170 let end = until.map(|u| std::cmp::min(u, total)).unwrap_or(total) as usize;
1171 let begin = end.saturating_sub(limit as usize);
1172 let next = if begin == 0 { None } else { Some(begin as u64) };
1173
1174 let did_id_rkeys = linkers.0[begin..end].iter().rev().collect::<Vec<_>>();
1175
1176 let mut items = Vec::with_capacity(did_id_rkeys.len());
1177 // TODO: use get-many (or multi-get or whatever it's called)
1178 for (did_id, rkey) in did_id_rkeys {
1179 if did_id.is_empty() {
1180 continue;
1181 }
1182 if let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? {
1183 let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)?
1184 else {
1185 eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?");
1186 continue;
1187 };
1188 if !active {
1189 continue;
1190 }
1191 items.push(RecordId {
1192 did,
1193 collection: collection.to_string(),
1194 rkey: rkey.0.clone(),
1195 });
1196 } else {
1197 eprintln!("failed to look up did from did_id {did_id:?}");
1198 }
1199 }
1200
1201 Ok(PagedAppendingCollection {
1202 version: (total, gone),
1203 items,
1204 next,
1205 total: alive,
1206 })
1207 }
1208
1209 fn get_distinct_dids(
1210 &self,
1211 target: &str,
1212 collection: &str,
1213 path: &str,
1214 limit: u64,
1215 until: Option<u64>,
1216 ) -> Result<PagedAppendingCollection<Did>> {
1217 let target_key = TargetKey(
1218 Target(target.to_string()),
1219 Collection(collection.to_string()),
1220 RPath(path.to_string()),
1221 );
1222
1223 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else {
1224 return Ok(PagedAppendingCollection {
1225 version: (0, 0),
1226 items: Vec::new(),
1227 next: None,
1228 total: 0,
1229 });
1230 };
1231
1232 let linkers = self.get_distinct_target_linkers(&target_id)?;
1233
1234 let (alive, gone) = linkers.count();
1235 let total = alive + gone;
1236 let end = until.map(|u| std::cmp::min(u, total)).unwrap_or(total) as usize;
1237 let begin = end.saturating_sub(limit as usize);
1238 let next = if begin == 0 { None } else { Some(begin as u64) };
1239
1240 let did_id_rkeys = linkers.0[begin..end].iter().rev().collect::<Vec<_>>();
1241
1242 let mut items = Vec::with_capacity(did_id_rkeys.len());
1243 // TODO: use get-many (or multi-get or whatever it's called)
1244 for (did_id, _) in did_id_rkeys {
1245 if did_id.is_empty() {
1246 continue;
1247 }
1248 if let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? {
1249 let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)?
1250 else {
1251 eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?");
1252 continue;
1253 };
1254 if !active {
1255 continue;
1256 }
1257 items.push(did);
1258 } else {
1259 eprintln!("failed to look up did from did_id {did_id:?}");
1260 }
1261 }
1262
1263 Ok(PagedAppendingCollection {
1264 version: (total, gone),
1265 items,
1266 next,
1267 total: alive,
1268 })
1269 }
1270
1271 fn get_all_record_counts(&self, target: &str) -> Result<HashMap<String, HashMap<String, u64>>> {
1272 let mut out: HashMap<String, HashMap<String, u64>> = HashMap::new();
1273 for (target_key, target_id) in self.iter_targets_for_target(&Target(target.into())) {
1274 let TargetKey(_, Collection(ref collection), RPath(ref path)) = target_key;
1275 let (count, _) = self.get_target_linkers(&target_id)?.count();
1276 out.entry(collection.into())
1277 .or_default()
1278 .insert(path.clone(), count);
1279 }
1280 Ok(out)
1281 }
1282
1283 fn get_all_counts(
1284 &self,
1285 target: &str,
1286 ) -> Result<HashMap<String, HashMap<String, CountsByCount>>> {
1287 let mut out: HashMap<String, HashMap<String, CountsByCount>> = HashMap::new();
1288 for (target_key, target_id) in self.iter_targets_for_target(&Target(target.into())) {
1289 let TargetKey(_, Collection(ref collection), RPath(ref path)) = target_key;
1290 let target_linkers = self.get_target_linkers(&target_id)?;
1291 let (records, _) = target_linkers.count();
1292 let distinct_dids = target_linkers.count_distinct_dids();
1293 out.entry(collection.into()).or_default().insert(
1294 path.clone(),
1295 CountsByCount {
1296 records,
1297 distinct_dids,
1298 },
1299 );
1300 }
1301 Ok(out)
1302 }
1303
1304 fn get_stats(&self) -> Result<StorageStats> {
1305 let dids = self.did_id_table.estimate_count();
1306 let targetables = self.target_id_table.estimate_count();
1307 let lr_cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap();
1308 let linking_records = self
1309 .db
1310 .property_value_cf(&lr_cf, rocksdb::properties::ESTIMATE_NUM_KEYS)?
1311 .map(|s| s.parse::<u64>())
1312 .transpose()?
1313 .unwrap_or(0);
1314 let started_at = self
1315 .db
1316 .get(STARTED_AT_KEY)?
1317 .map(|c| _vr(&c))
1318 .transpose()?
1319 .unwrap_or(COZY_FIRST_CURSOR);
1320
1321 let other_data = self
1322 .db
1323 .get(TARGET_ID_REPAIR_STATE_KEY)?
1324 .map(|s| _vr(&s))
1325 .transpose()?
1326 .map(
1327 |TargetIdRepairState {
1328 current_us_started_at,
1329 id_when_started,
1330 latest_repaired_i,
1331 }| {
1332 HashMap::from([
1333 ("current_us_started_at".to_string(), current_us_started_at),
1334 ("id_when_started".to_string(), id_when_started),
1335 ("latest_repaired_i".to_string(), latest_repaired_i),
1336 ])
1337 },
1338 )
1339 .unwrap_or(HashMap::default());
1340
1341 Ok(StorageStats {
1342 dids,
1343 targetables,
1344 linking_records,
1345 started_at: Some(started_at),
1346 other_data,
1347 })
1348 }
1349}
1350
1351trait AsRocksKey: Serialize {}
1352trait AsRocksKeyPrefix<K: KeyFromRocks>: Serialize {}
1353trait AsRocksValue: Serialize {}
1354trait KeyFromRocks: for<'de> Deserialize<'de> {}
1355trait ValueFromRocks: for<'de> Deserialize<'de> {}
1356
1357// did_id table
1358impl AsRocksKey for &Did {}
1359impl AsRocksValue for &DidIdValue {}
1360impl ValueFromRocks for DidIdValue {}
1361
1362// temp
1363impl KeyFromRocks for Did {}
1364impl AsRocksKey for &DidId {}
1365
1366// target_ids table
1367impl AsRocksKey for &TargetKey {}
1368impl AsRocksKeyPrefix<TargetKey> for &TargetIdTargetPrefix {}
1369impl AsRocksValue for &TargetId {}
1370impl KeyFromRocks for TargetKey {}
1371impl ValueFromRocks for TargetId {}
1372
1373// temp?
1374impl KeyFromRocks for TargetId {}
1375impl AsRocksValue for &TargetKey {}
1376
1377// target_links table
1378impl AsRocksKey for &TargetId {}
1379impl AsRocksValue for &TargetLinkers {}
1380impl ValueFromRocks for TargetLinkers {}
1381
1382// record_link_targets table
1383impl AsRocksKey for &RecordLinkKey {}
1384impl AsRocksKeyPrefix<RecordLinkKey> for &RecordLinkKeyDidIdPrefix {}
1385impl AsRocksValue for &RecordLinkTargets {}
1386impl KeyFromRocks for RecordLinkKey {}
1387impl ValueFromRocks for RecordLinkTargets {}
1388
1389pub fn _bincode_opts() -> impl BincodeOptions {
1390 bincode::DefaultOptions::new().with_big_endian() // happier db -- numeric prefixes in lsm
1391}
1392fn _rk(k: impl AsRocksKey) -> Vec<u8> {
1393 _bincode_opts().serialize(&k).unwrap()
1394}
1395fn _rkp<K: KeyFromRocks>(kp: impl AsRocksKeyPrefix<K>) -> Vec<u8> {
1396 _bincode_opts().serialize(&kp).unwrap()
1397}
1398fn _rv(v: impl AsRocksValue) -> Vec<u8> {
1399 _bincode_opts().serialize(&v).unwrap()
1400}
1401fn _kr<T: KeyFromRocks>(bytes: &[u8]) -> Result<T> {
1402 Ok(_bincode_opts().deserialize(bytes)?)
1403}
1404fn _vr<T: ValueFromRocks>(bytes: &[u8]) -> Result<T> {
1405 Ok(_bincode_opts().deserialize(bytes)?)
1406}
1407
1408#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
1409pub struct Collection(pub String);
1410
1411#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
1412pub struct RPath(pub String);
1413
1414#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1415pub struct RKey(pub String);
1416
1417impl RKey {
1418 fn empty() -> Self {
1419 RKey("".to_string())
1420 }
1421}
1422
1423// did ids
1424#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
1425pub struct DidId(pub u64);
1426
1427impl DidId {
1428 fn empty() -> Self {
1429 DidId(0)
1430 }
1431 fn is_empty(&self) -> bool {
1432 self.0 == 0
1433 }
1434}
1435
1436#[derive(Debug, Clone, Serialize, Deserialize)]
1437struct DidIdValue(DidId, bool); // active or not
1438
1439impl DidIdValue {
1440 fn did_id(&self) -> DidId {
1441 let Self(id, _) = self;
1442 *id
1443 }
1444}
1445
1446// target ids
1447#[derive(Debug, Clone, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq, Hash)]
1448struct TargetId(u64); // key
1449
1450#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
1451pub struct Target(pub String); // the actual target/uri
1452
1453// targets (uris, dids, etc.): the reverse index
1454#[derive(Debug, Clone, Serialize, Deserialize)]
1455pub struct TargetKey(pub Target, pub Collection, pub RPath);
1456
1457#[derive(Debug, Default, Serialize, Deserialize)]
1458pub struct TargetLinkers(pub Vec<(DidId, RKey)>);
1459
1460impl TargetLinkers {
1461 fn remove_linker(&mut self, did: &DidId, rkey: &RKey) -> bool {
1462 if let Some(entry) = self.0.iter_mut().rfind(|d| **d == (*did, rkey.clone())) {
1463 *entry = (DidId::empty(), RKey::empty());
1464 true
1465 } else {
1466 false
1467 }
1468 }
1469 pub fn count(&self) -> (u64, u64) {
1470 // (linkers, deleted links)
1471 let total = self.0.len() as u64;
1472 let alive = self.0.iter().filter(|(DidId(id), _)| *id != 0).count() as u64;
1473 let gone = total - alive;
1474 (alive, gone)
1475 }
1476 fn count_distinct_dids(&self) -> u64 {
1477 self.0
1478 .iter()
1479 .filter_map(|(DidId(id), _)| if *id == 0 { None } else { Some(id) })
1480 .collect::<HashSet<_>>()
1481 .len() as u64
1482 }
1483}
1484
1485// forward links to targets so we can delete links
1486#[derive(Debug, Serialize, Deserialize)]
1487struct RecordLinkKey(DidId, Collection, RKey);
1488
1489// does this even work????
1490#[derive(Debug, Serialize, Deserialize)]
1491struct RecordLinkKeyDidIdPrefix(DidId);
1492
1493#[derive(Debug, Serialize, Deserialize)]
1494struct TargetIdTargetPrefix(Target);
1495
1496#[derive(Debug, Serialize, Deserialize)]
1497struct RecordLinkTarget(RPath, TargetId);
1498
1499#[derive(Debug, Default, Serialize, Deserialize)]
1500struct RecordLinkTargets(Vec<RecordLinkTarget>);
1501
1502impl RecordLinkTargets {
1503 fn with_capacity(cap: usize) -> Self {
1504 Self(Vec::with_capacity(cap))
1505 }
1506 fn add(&mut self, target: RecordLinkTarget) {
1507 self.0.push(target)
1508 }
1509}
1510
1511#[cfg(test)]
1512mod tests {
1513 use super::super::ActionableEvent;
1514 use super::*;
1515 use links::Link;
1516 use tempfile::tempdir;
1517
1518 #[test]
1519 fn rocks_delete_iterator_regression() -> Result<()> {
1520 let mut store = RocksStorage::new(tempdir()?)?;
1521
1522 // create a link from the deleter account
1523 store.push(
1524 &ActionableEvent::CreateLinks {
1525 record_id: RecordId {
1526 did: "did:plc:will-shortly-delete".into(),
1527 collection: "a.b.c".into(),
1528 rkey: "asdf".into(),
1529 },
1530 links: vec![CollectedLink {
1531 target: Link::Uri("example.com".into()),
1532 path: ".uri".into(),
1533 }],
1534 },
1535 0,
1536 )?;
1537
1538 // and a different link from a separate, new account (later in didid prefix iteration)
1539 store.push(
1540 &ActionableEvent::CreateLinks {
1541 record_id: RecordId {
1542 did: "did:plc:someone-else".into(),
1543 collection: "a.b.c".into(),
1544 rkey: "asdf".into(),
1545 },
1546 links: vec![CollectedLink {
1547 target: Link::Uri("another.example.com".into()),
1548 path: ".uri".into(),
1549 }],
1550 },
1551 0,
1552 )?;
1553
1554 // now delete the first account (this is where the buggy version explodes)
1555 store.push(
1556 &ActionableEvent::DeleteAccount("did:plc:will-shortly-delete".into()),
1557 0,
1558 )?;
1559
1560 Ok(())
1561 }
1562
1563 #[test]
1564 fn rocks_prefix_iteration_helper() -> Result<()> {
1565 #[derive(Serialize, Deserialize)]
1566 struct Key(u8, u8);
1567
1568 #[derive(Serialize)]
1569 struct KeyPrefix(u8);
1570
1571 #[derive(Serialize, Deserialize)]
1572 struct Value(());
1573
1574 impl AsRocksKey for &Key {}
1575 impl AsRocksKeyPrefix<Key> for &KeyPrefix {}
1576 impl AsRocksValue for &Value {}
1577
1578 impl KeyFromRocks for Key {}
1579 impl ValueFromRocks for Value {}
1580
1581 let data = RocksStorage::new(tempdir()?)?;
1582 let cf = data.db.cf_handle(DID_IDS_CF).unwrap();
1583 let mut batch = WriteBatch::default();
1584
1585 // not our prefix
1586 batch.put_cf(&cf, _rk(&Key(0x01, 0x00)), _rv(&Value(())));
1587 batch.put_cf(&cf, _rk(&Key(0x01, 0xFF)), _rv(&Value(())));
1588
1589 // our prefix!
1590 for i in 0..=0xFF {
1591 batch.put_cf(&cf, _rk(&Key(0x02, i)), _rv(&Value(())));
1592 }
1593
1594 // not our prefix
1595 batch.put_cf(&cf, _rk(&Key(0x03, 0x00)), _rv(&Value(())));
1596 batch.put_cf(&cf, _rk(&Key(0x03, 0xFF)), _rv(&Value(())));
1597
1598 data.db.write(batch)?;
1599
1600 let mut okays: [bool; 256] = [false; 256];
1601 for (i, (k, Value(_))) in data.prefix_iter_cf(&cf, KeyPrefix(0x02)).enumerate() {
1602 assert!(i < 256);
1603 assert_eq!(k.0, 0x02, "prefix iterated key has the right prefix");
1604 assert_eq!(k.1 as usize, i, "prefixed keys are iterated in exact order");
1605 okays[k.1 as usize] = true;
1606 }
1607 assert!(okays.iter().all(|b| *b), "every key was iterated");
1608
1609 Ok(())
1610 }
1611
1612 // TODO: add tests for key prefixes actually prefixing (bincode encoding _should_...)
1613}