forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use super::{ActionableEvent, LinkReader, LinkStorage, PagedAppendingCollection, StorageStats};
2use crate::{CountsByCount, Did, RecordId};
3use anyhow::{bail, Result};
4use bincode::Options as BincodeOptions;
5use links::CollectedLink;
6use metrics::{counter, describe_counter, describe_histogram, histogram, Unit};
7use ratelimit::Ratelimiter;
8use rocksdb::backup::{BackupEngine, BackupEngineOptions};
9use rocksdb::{
10 AsColumnFamilyRef, ColumnFamilyDescriptor, DBWithThreadMode, IteratorMode, MergeOperands,
11 MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch,
12};
13use serde::{Deserialize, Serialize};
14use std::collections::{HashMap, HashSet};
15use std::io::Read;
16use std::marker::PhantomData;
17use std::path::{Path, PathBuf};
18use std::sync::{
19 atomic::{AtomicU64, Ordering},
20 Arc,
21};
22use std::thread;
23use std::time::{Duration, Instant};
24use tokio_util::sync::CancellationToken;
25
26static DID_IDS_CF: &str = "did_ids";
27static TARGET_IDS_CF: &str = "target_ids";
28static TARGET_LINKERS_CF: &str = "target_links";
29static LINK_TARGETS_CF: &str = "link_targets";
30
31static JETSTREAM_CURSOR_KEY: &str = "jetstream_cursor";
32
33// todo: actually understand and set these options probably better
34fn rocks_opts_base() -> Options {
35 let mut opts = Options::default();
36 opts.set_level_compaction_dynamic_level_bytes(true);
37 opts.create_if_missing(true);
38 opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
39 opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd); // this probably doesn't work because it hasn't been enabled
40 // TODO: actually enable the bottommost compression. but after other changes run for a bit in case zstd is cpu- or mem-expensive.
41 opts
42}
43fn get_db_opts() -> Options {
44 let mut opts = rocks_opts_base();
45 opts.create_missing_column_families(true);
46 opts.increase_parallelism(4); // todo: make configurable if anyone else actually runs a different instance. start at # of cores
47 // consider doing optimize_level_style_compaction or optimize_universal_style_compaction
48 opts
49}
50fn get_db_read_opts() -> Options {
51 let mut opts = Options::default();
52 opts.optimize_for_point_lookup(16_384); // mb (run this on big machines)
53 opts
54}
55
56#[derive(Debug, Clone)]
57pub struct RocksStorage {
58 pub db: Arc<DBWithThreadMode<MultiThreaded>>, // TODO: mov seqs here (concat merge op will be fun)
59 did_id_table: IdTable<Did, DidIdValue, true>,
60 target_id_table: IdTable<TargetKey, TargetId, false>,
61 is_writer: bool,
62 backup_task: Arc<Option<thread::JoinHandle<Result<()>>>>,
63}
64
65trait IdTableValue: ValueFromRocks + Clone {
66 fn new(v: u64) -> Self;
67 fn id(&self) -> u64;
68}
69#[derive(Debug, Clone)]
70struct IdTableBase<Orig, IdVal: IdTableValue>
71where
72 Orig: KeyFromRocks,
73 for<'a> &'a Orig: AsRocksKey,
74{
75 _key_marker: PhantomData<Orig>,
76 _val_marker: PhantomData<IdVal>,
77 name: String,
78 id_seq: Arc<AtomicU64>,
79}
80impl<Orig, IdVal: IdTableValue> IdTableBase<Orig, IdVal>
81where
82 Orig: KeyFromRocks,
83 for<'a> &'a Orig: AsRocksKey,
84{
85 fn cf_descriptor(&self) -> ColumnFamilyDescriptor {
86 ColumnFamilyDescriptor::new(&self.name, rocks_opts_base())
87 }
88 fn init<const WITH_REVERSE: bool>(
89 self,
90 db: &DBWithThreadMode<MultiThreaded>,
91 ) -> Result<IdTable<Orig, IdVal, WITH_REVERSE>> {
92 if db.cf_handle(&self.name).is_none() {
93 bail!("failed to get cf handle from db -- was the db open with our .cf_descriptor()?");
94 }
95 let priv_id_seq = if let Some(seq_bytes) = db.get(self.seq_key())? {
96 if seq_bytes.len() != 8 {
97 bail!(
98 "reading bytes for u64 id seq {:?}: found the wrong number of bytes",
99 self.seq_key()
100 );
101 }
102 let mut buf: [u8; 8] = [0; 8];
103 seq_bytes.as_slice().read_exact(&mut buf)?;
104 let last_seq = u64::from_le_bytes(buf);
105 last_seq + 1
106 } else {
107 1
108 };
109 self.id_seq.store(priv_id_seq, Ordering::SeqCst);
110 Ok(IdTable {
111 base: self,
112 priv_id_seq,
113 })
114 }
115 fn seq_key(&self) -> Vec<u8> {
116 let mut k = b"__id_seq_key_plz_be_unique:".to_vec();
117 k.extend(self.name.as_bytes());
118 k
119 }
120}
121#[derive(Debug, Clone)]
122struct IdTable<Orig, IdVal: IdTableValue, const WITH_REVERSE: bool>
123where
124 Orig: KeyFromRocks,
125 for<'a> &'a Orig: AsRocksKey,
126{
127 base: IdTableBase<Orig, IdVal>,
128 priv_id_seq: u64,
129}
130impl<Orig: Clone, IdVal: IdTableValue, const WITH_REVERSE: bool> IdTable<Orig, IdVal, WITH_REVERSE>
131where
132 Orig: KeyFromRocks,
133 for<'v> &'v IdVal: AsRocksValue,
134 for<'k> &'k Orig: AsRocksKey,
135{
136 #[must_use]
137 fn setup(name: &str) -> IdTableBase<Orig, IdVal> {
138 IdTableBase::<Orig, IdVal> {
139 _key_marker: PhantomData,
140 _val_marker: PhantomData,
141 name: name.into(),
142 id_seq: Arc::new(AtomicU64::new(0)), // zero is "uninint", first seq num will be 1
143 }
144 }
145 fn get_id_val(
146 &self,
147 db: &DBWithThreadMode<MultiThreaded>,
148 orig: &Orig,
149 ) -> Result<Option<IdVal>> {
150 let cf = db.cf_handle(&self.base.name).unwrap();
151 if let Some(_id_bytes) = db.get_cf(&cf, _rk(orig))? {
152 Ok(Some(_vr(&_id_bytes)?))
153 } else {
154 Ok(None)
155 }
156 }
157 fn __get_or_create_id_val<CF>(
158 &mut self,
159 cf: &CF,
160 db: &DBWithThreadMode<MultiThreaded>,
161 batch: &mut WriteBatch,
162 orig: &Orig,
163 ) -> Result<IdVal>
164 where
165 CF: AsColumnFamilyRef,
166 {
167 Ok(self.get_id_val(db, orig)?.unwrap_or_else(|| {
168 let prev_priv_seq = self.priv_id_seq;
169 self.priv_id_seq += 1;
170 let prev_public_seq = self.base.id_seq.swap(self.priv_id_seq, Ordering::SeqCst);
171 assert_eq!(
172 prev_public_seq, prev_priv_seq,
173 "public seq may have been modified??"
174 );
175 let id_value = IdVal::new(self.priv_id_seq);
176 batch.put(self.base.seq_key(), self.priv_id_seq.to_le_bytes());
177 batch.put_cf(cf, _rk(orig), _rv(&id_value));
178 id_value
179 }))
180 }
181 fn estimate_count(&self) -> u64 {
182 self.base.id_seq.load(Ordering::SeqCst) - 1 // -1 because seq zero is reserved
183 }
184}
185impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal, true>
186where
187 Orig: KeyFromRocks,
188 for<'v> &'v IdVal: AsRocksValue,
189 for<'k> &'k Orig: AsRocksKey,
190{
191 fn get_or_create_id_val(
192 &mut self,
193 db: &DBWithThreadMode<MultiThreaded>,
194 batch: &mut WriteBatch,
195 orig: &Orig,
196 ) -> Result<IdVal> {
197 let cf = db.cf_handle(&self.base.name).unwrap();
198 let id_val = self.__get_or_create_id_val(&cf, db, batch, orig)?;
199 // TODO: assert that the original is never a u64 that could collide
200 batch.put_cf(&cf, id_val.id().to_be_bytes(), _rk(orig)); // reversed rk/rv on purpose here :/
201 Ok(id_val)
202 }
203
204 fn get_val_from_id(
205 &self,
206 db: &DBWithThreadMode<MultiThreaded>,
207 id: u64,
208 ) -> Result<Option<Orig>> {
209 let cf = db.cf_handle(&self.base.name).unwrap();
210 if let Some(orig_bytes) = db.get_cf(&cf, id.to_be_bytes())? {
211 // HACK ish
212 Ok(Some(_kr(&orig_bytes)?))
213 } else {
214 Ok(None)
215 }
216 }
217}
218impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal, false>
219where
220 Orig: KeyFromRocks,
221 for<'v> &'v IdVal: AsRocksValue,
222 for<'k> &'k Orig: AsRocksKey,
223{
224 fn get_or_create_id_val(
225 &mut self,
226 db: &DBWithThreadMode<MultiThreaded>,
227 batch: &mut WriteBatch,
228 orig: &Orig,
229 ) -> Result<IdVal> {
230 let cf = db.cf_handle(&self.base.name).unwrap();
231 self.__get_or_create_id_val(&cf, db, batch, orig)
232 }
233}
234
235impl IdTableValue for DidIdValue {
236 fn new(v: u64) -> Self {
237 DidIdValue(DidId(v), true)
238 }
239 fn id(&self) -> u64 {
240 self.0 .0
241 }
242}
243impl IdTableValue for TargetId {
244 fn new(v: u64) -> Self {
245 TargetId(v)
246 }
247 fn id(&self) -> u64 {
248 self.0
249 }
250}
251
252impl RocksStorage {
253 pub fn new(path: impl AsRef<Path>) -> Result<Self> {
254 Self::describe_metrics();
255 RocksStorage::open_readmode(path, false)
256 }
257
258 pub fn open_readonly(path: impl AsRef<Path>) -> Result<Self> {
259 RocksStorage::open_readmode(path, true)
260 }
261
262 fn open_readmode(path: impl AsRef<Path>, readonly: bool) -> Result<Self> {
263 let did_id_table = IdTable::<_, _, true>::setup(DID_IDS_CF);
264 let target_id_table = IdTable::<_, _, false>::setup(TARGET_IDS_CF);
265
266 let cfs = vec![
267 // id reference tables
268 did_id_table.cf_descriptor(),
269 target_id_table.cf_descriptor(),
270 // the reverse links:
271 ColumnFamilyDescriptor::new(TARGET_LINKERS_CF, {
272 let mut opts = rocks_opts_base();
273 opts.set_merge_operator_associative(
274 "merge_op_extend_did_ids",
275 Self::merge_op_extend_did_ids,
276 );
277 opts
278 }),
279 // unfortunately we also need forward links to handle deletes
280 ColumnFamilyDescriptor::new(LINK_TARGETS_CF, rocks_opts_base()),
281 ];
282
283 let db = if readonly {
284 DBWithThreadMode::open_cf_descriptors_read_only(&get_db_read_opts(), path, cfs, false)?
285 } else {
286 DBWithThreadMode::open_cf_descriptors(&get_db_opts(), path, cfs)?
287 };
288
289 let db = Arc::new(db);
290 let did_id_table = did_id_table.init(&db)?;
291 let target_id_table = target_id_table.init(&db)?;
292 Ok(Self {
293 db,
294 did_id_table,
295 target_id_table,
296 is_writer: !readonly,
297 backup_task: None.into(),
298 })
299 }
300
301 pub fn start_backup(
302 &mut self,
303 path: PathBuf,
304 auto: Option<(u64, Option<usize>)>,
305 stay_alive: CancellationToken,
306 ) -> Result<()> {
307 let task = if let Some((interval_hrs, copies)) = auto {
308 eprintln!("backups: starting background task...");
309 self.backup_task(path, interval_hrs, copies, stay_alive)
310 } else {
311 eprintln!("backups: starting a one-off backup...");
312 thread::spawn({
313 let db = self.db.clone();
314 move || Self::do_backup(db, path)
315 })
316 };
317 self.backup_task = Arc::new(Some(task));
318 Ok(())
319 }
320
321 fn backup_task(
322 &self,
323 path: PathBuf,
324 interval_hrs: u64,
325 copies: Option<usize>,
326 stay_alive: CancellationToken,
327 ) -> std::thread::JoinHandle<Result<()>> {
328 let db = self.db.clone();
329 thread::spawn(move || {
330 let limit =
331 Ratelimiter::builder(1, Duration::from_secs(interval_hrs * 60 * 60)).build()?;
332 let minimum_sleep = Duration::from_secs(1);
333
334 'quit: loop {
335 if let Err(sleep) = limit.try_wait() {
336 eprintln!("backups: background: next backup scheduled in {sleep:?}");
337 let waiting = Instant::now();
338 loop {
339 let remaining = sleep - waiting.elapsed();
340 if stay_alive.is_cancelled() {
341 break 'quit;
342 } else if remaining <= Duration::ZERO {
343 break;
344 } else if remaining < minimum_sleep {
345 thread::sleep(remaining);
346 break;
347 } else {
348 thread::sleep(minimum_sleep);
349 }
350 }
351 }
352 eprintln!("backups: background: starting backup...");
353 if let Err(e) = Self::do_backup(db.clone(), &path) {
354 eprintln!("backups: background: backup failed: {e:?}");
355 // todo: metrics
356 } else {
357 eprintln!("backups: background: backup succeeded yay");
358 }
359 if let Some(copies) = copies {
360 eprintln!("backups: background: trimming to {copies} saved backups...");
361 if let Err(e) = Self::trim_backups(copies, &path) {
362 eprintln!("backups: background: failed to trim backups: {e:?}");
363 } else {
364 eprintln!("backups: background: trimming worked!")
365 }
366 }
367 }
368
369 Ok(())
370 })
371 }
372
373 fn do_backup(db: Arc<DBWithThreadMode<MultiThreaded>>, path: impl AsRef<Path>) -> Result<()> {
374 let mut engine =
375 BackupEngine::open(&BackupEngineOptions::new(path)?, &rocksdb::Env::new()?)?;
376 eprintln!("backups: starting backup...");
377 let t0 = Instant::now();
378 if let Err(e) = engine.create_new_backup(&db) {
379 eprintln!("backups: oh no, backup failed: {e:?}");
380 } else {
381 eprintln!("backups: yay, backup worked?");
382 }
383 eprintln!(
384 "backups: backup finished after {:.2}s",
385 t0.elapsed().as_secs_f32()
386 );
387 Ok(())
388 }
389
390 fn trim_backups(num_backups_to_keep: usize, path: impl AsRef<Path>) -> Result<()> {
391 let mut engine =
392 BackupEngine::open(&BackupEngineOptions::new(path)?, &rocksdb::Env::new()?)?;
393 engine.purge_old_backups(num_backups_to_keep)?;
394 Ok(())
395 }
396
397 fn describe_metrics() {
398 describe_histogram!(
399 "storage_rocksdb_read_seconds",
400 Unit::Seconds,
401 "duration of the read stage of actions"
402 );
403 describe_histogram!(
404 "storage_rocksdb_action_seconds",
405 Unit::Seconds,
406 "duration of read + write of actions"
407 );
408 describe_counter!(
409 "storage_rocksdb_batch_ops_total",
410 Unit::Count,
411 "total batched operations from actions"
412 );
413 describe_histogram!(
414 "storage_rocksdb_delete_account_ops",
415 Unit::Count,
416 "total batched ops for account deletions"
417 );
418 }
419
420 fn merge_op_extend_did_ids(
421 key: &[u8],
422 existing: Option<&[u8]>,
423 operands: &MergeOperands,
424 ) -> Option<Vec<u8>> {
425 let mut linkers: Vec<_> = if let Some(existing_bytes) = existing {
426 match _vr(existing_bytes) {
427 Ok(TargetLinkers(mut existing_linkers)) => {
428 existing_linkers.reserve(operands.len());
429 existing_linkers
430 }
431 Err(e) => {
432 eprintln!("bug? could not deserialize existing target linkers: {e:?}. key={key:?}. continuing, but data will be lost!");
433 if existing_bytes.len() < 1000 {
434 eprintln!("dropping: {existing_bytes:?}");
435 } else {
436 eprintln!("(too long to print)");
437 }
438 Vec::with_capacity(operands.len())
439 }
440 }
441 } else {
442 Vec::with_capacity(operands.len())
443 };
444 for new_linkers in operands {
445 match _vr(new_linkers) {
446 Ok(TargetLinkers(new_linkers)) => linkers.extend(new_linkers),
447 Err(e) => {
448 eprintln!("bug? could not deserialize new target linkers: {e:?}. key={key:?}. continuing, but data will be lost!");
449 if new_linkers.len() < 1000 {
450 eprintln!("skipping: {new_linkers:?}");
451 } else {
452 eprintln!("(too long to print)");
453 }
454 }
455 }
456 }
457 Some(_rv(&TargetLinkers(linkers)))
458 }
459
460 fn prefix_iter_cf<K, V, CF, P>(
461 &self,
462 cf: &CF,
463 pre: P,
464 ) -> impl Iterator<Item = (K, V)> + use<'_, K, V, CF, P>
465 where
466 K: KeyFromRocks,
467 V: ValueFromRocks,
468 CF: AsColumnFamilyRef,
469 for<'a> &'a P: AsRocksKeyPrefix<K>,
470 {
471 let mut read_opts = ReadOptions::default();
472 read_opts.set_iterate_range(PrefixRange(_rkp(&pre))); // TODO verify: inclusive bounds?
473 self.db
474 .iterator_cf_opt(cf, read_opts, IteratorMode::Start)
475 .map_while(Result::ok)
476 .map_while(|(k, v)| Some((_kr(&k).ok()?, _vr(&v).ok()?)))
477 }
478
479 fn update_did_id_value<F>(&self, batch: &mut WriteBatch, did: &Did, update: F) -> Result<bool>
480 where
481 F: FnOnce(DidIdValue) -> Option<DidIdValue>,
482 {
483 let cf = self.db.cf_handle(DID_IDS_CF).unwrap();
484 let Some(did_id_value) = self.did_id_table.get_id_val(&self.db, did)? else {
485 return Ok(false);
486 };
487 let Some(new_did_id_value) = update(did_id_value) else {
488 return Ok(false);
489 };
490 batch.put_cf(&cf, _rk(did), _rv(&new_did_id_value));
491 Ok(true)
492 }
493 fn delete_did_id_value(&self, batch: &mut WriteBatch, did: &Did) {
494 let cf = self.db.cf_handle(DID_IDS_CF).unwrap();
495 batch.delete_cf(&cf, _rk(did));
496 }
497
498 fn get_target_linkers(&self, target_id: &TargetId) -> Result<TargetLinkers> {
499 let cf = self.db.cf_handle(TARGET_LINKERS_CF).unwrap();
500 let Some(linkers_bytes) = self.db.get_cf(&cf, _rk(target_id))? else {
501 return Ok(TargetLinkers::default());
502 };
503 _vr(&linkers_bytes)
504 }
505 /// zero out every duplicate did. bit of a hack, looks the same as deleted, but eh
506 fn get_distinct_target_linkers(&self, target_id: &TargetId) -> Result<TargetLinkers> {
507 let mut seen = HashSet::new();
508 let mut linkers = self.get_target_linkers(target_id)?;
509 for (did_id, _) in linkers.0.iter_mut() {
510 if seen.contains(did_id) {
511 did_id.0 = 0;
512 } else {
513 seen.insert(*did_id);
514 }
515 }
516 Ok(linkers)
517 }
518 fn merge_target_linker(
519 &self,
520 batch: &mut WriteBatch,
521 target_id: &TargetId,
522 linker_did_id: &DidId,
523 linker_rkey: &RKey,
524 ) {
525 let cf = self.db.cf_handle(TARGET_LINKERS_CF).unwrap();
526 batch.merge_cf(
527 &cf,
528 _rk(target_id),
529 _rv(&TargetLinkers(vec![(*linker_did_id, linker_rkey.clone())])),
530 );
531 }
532 fn update_target_linkers<F>(
533 &self,
534 batch: &mut WriteBatch,
535 target_id: &TargetId,
536 update: F,
537 ) -> Result<bool>
538 where
539 F: FnOnce(TargetLinkers) -> Option<TargetLinkers>,
540 {
541 let cf = self.db.cf_handle(TARGET_LINKERS_CF).unwrap();
542 let existing_linkers = self.get_target_linkers(target_id)?;
543 let Some(new_linkers) = update(existing_linkers) else {
544 return Ok(false);
545 };
546 batch.put_cf(&cf, _rk(target_id), _rv(&new_linkers));
547 Ok(true)
548 }
549
550 fn put_link_targets(
551 &self,
552 batch: &mut WriteBatch,
553 record_link_key: &RecordLinkKey,
554 targets: &RecordLinkTargets,
555 ) {
556 // todo: we are almost idempotent to link creates with this blind write, but we'll still
557 // merge in the reverse index. we could read+modify+write here but it'll be SLOWWWWW on
558 // the path that we need to be fast. we could go back to a merge op and probably be
559 // consistent. or we can accept just a littttttle inconsistency and be idempotent on
560 // forward links but not reverse, slightly messing up deletes :/
561 // _maybe_ we could run in slow idempotent r-m-w mode during firehose catch-up at the start,
562 // then switch to the fast version?
563 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap();
564 batch.put_cf(&cf, _rk(record_link_key), _rv(targets));
565 }
566 fn get_record_link_targets(
567 &self,
568 record_link_key: &RecordLinkKey,
569 ) -> Result<Option<RecordLinkTargets>> {
570 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap();
571 if let Some(bytes) = self.db.get_cf(&cf, _rk(record_link_key))? {
572 Ok(Some(_vr(&bytes)?))
573 } else {
574 Ok(None)
575 }
576 }
577 fn delete_record_link(&self, batch: &mut WriteBatch, record_link_key: &RecordLinkKey) {
578 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap();
579 batch.delete_cf(&cf, _rk(record_link_key));
580 }
581 fn iter_links_for_did_id(
582 &self,
583 did_id: &DidId,
584 ) -> impl Iterator<Item = (RecordLinkKey, RecordLinkTargets)> + use<'_> {
585 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap();
586 self.prefix_iter_cf(&cf, RecordLinkKeyDidIdPrefix(*did_id))
587 }
588 fn iter_targets_for_target(
589 &self,
590 target: &Target,
591 ) -> impl Iterator<Item = (TargetKey, TargetId)> + use<'_> {
592 let cf = self.db.cf_handle(TARGET_IDS_CF).unwrap();
593 self.prefix_iter_cf(&cf, TargetIdTargetPrefix(target.clone()))
594 }
595
596 //
597 // higher-level event action handlers
598 //
599
600 fn add_links(
601 &mut self,
602 record_id: &RecordId,
603 links: &[CollectedLink],
604 batch: &mut WriteBatch,
605 ) -> Result<()> {
606 let DidIdValue(did_id, _) =
607 self.did_id_table
608 .get_or_create_id_val(&self.db, batch, &record_id.did)?;
609
610 let record_link_key = RecordLinkKey(
611 did_id,
612 Collection(record_id.collection()),
613 RKey(record_id.rkey()),
614 );
615 let mut record_link_targets = RecordLinkTargets::with_capacity(links.len());
616
617 for CollectedLink { target, path } in links {
618 let target_key = TargetKey(
619 Target(target.clone().into_string()),
620 Collection(record_id.collection()),
621 RPath(path.clone()),
622 );
623 let target_id =
624 self.target_id_table
625 .get_or_create_id_val(&self.db, batch, &target_key)?;
626 self.merge_target_linker(batch, &target_id, &did_id, &RKey(record_id.rkey()));
627
628 record_link_targets.add(RecordLinkTarget(RPath(path.clone()), target_id))
629 }
630
631 self.put_link_targets(batch, &record_link_key, &record_link_targets);
632 Ok(())
633 }
634
635 fn remove_links(&mut self, record_id: &RecordId, batch: &mut WriteBatch) -> Result<()> {
636 let Some(DidIdValue(linking_did_id, _)) =
637 self.did_id_table.get_id_val(&self.db, &record_id.did)?
638 else {
639 return Ok(()); // we don't know her: nothing to do
640 };
641
642 let record_link_key = RecordLinkKey(
643 linking_did_id,
644 Collection(record_id.collection()),
645 RKey(record_id.rkey()),
646 );
647 let Some(record_link_targets) = self.get_record_link_targets(&record_link_key)? else {
648 return Ok(()); // we don't have these links
649 };
650
651 // we do read -> modify -> write here: could merge-op in the deletes instead?
652 // otherwise it's another single-thread-constraining thing.
653 for RecordLinkTarget(_, target_id) in record_link_targets.0 {
654 self.update_target_linkers(batch, &target_id, |mut linkers| {
655 if linkers.0.is_empty() {
656 eprintln!("bug? linked target was missing when removing links");
657 }
658 if !linkers.remove_linker(&linking_did_id, &RKey(record_id.rkey.clone())) {
659 eprintln!("bug? linked target was missing a link when removing links");
660 }
661 Some(linkers)
662 })?;
663 }
664
665 self.delete_record_link(batch, &record_link_key);
666 Ok(())
667 }
668
669 fn set_account(&mut self, did: &Did, active: bool, batch: &mut WriteBatch) -> Result<()> {
670 // this needs to be read-modify-write since the did_id needs to stay the same,
671 // which has a benefit of allowing to avoid adding entries for dids we don't
672 // need. reading on dids needs to be cheap anyway for the current design, and
673 // did active/inactive updates are low-freq in the firehose so, eh, it's fine.
674 self.update_did_id_value(batch, did, |current_value| {
675 Some(DidIdValue(current_value.did_id(), active))
676 })?;
677 Ok(())
678 }
679
680 fn delete_account(&mut self, did: &Did, batch: &mut WriteBatch) -> Result<usize> {
681 let mut total_batched_ops = 0;
682 let Some(DidIdValue(did_id, _)) = self.did_id_table.get_id_val(&self.db, did)? else {
683 return Ok(total_batched_ops); // ignore updates for dids we don't know about
684 };
685 self.delete_did_id_value(batch, did);
686 // TODO: also delete the reverse!!
687
688 // use a separate batch for all their links, since it can be a lot and make us crash at around 1GiB batch size.
689 // this should still hopefully be crash-safe: as long as we don't actually delete the DidId entry until after all links are cleared.
690 // the above .delete_did_id_value is batched, so it shouldn't be written until we've returned from this fn successfully
691 // TODO: queue a background delete task or whatever
692 // TODO: test delete account with more links than chunk size
693 let stuff: Vec<_> = self.iter_links_for_did_id(&did_id).collect();
694 for chunk in stuff.chunks(1024) {
695 let mut mini_batch = WriteBatch::default();
696
697 for (record_link_key, links) in chunk {
698 self.delete_record_link(&mut mini_batch, record_link_key); // _could_ use delete range here instead of individual deletes, but since we have to scan anyway it's not obvious if it's better
699
700 for RecordLinkTarget(_, target_link_id) in links.0.iter() {
701 self.update_target_linkers(&mut mini_batch, target_link_id, |mut linkers| {
702 if !linkers.remove_linker(&did_id, &record_link_key.2) {
703 eprintln!("bug? could not find linker when removing links while deleting an account");
704 }
705 Some(linkers)
706 })?;
707 }
708 }
709 total_batched_ops += mini_batch.len();
710 self.db.write(mini_batch)?; // todo
711 }
712 Ok(total_batched_ops)
713 }
714}
715
716impl Drop for RocksStorage {
717 fn drop(&mut self) {
718 if self.is_writer {
719 println!("rocksdb writer: cleaning up for shutdown...");
720 if let Err(e) = self.db.flush_wal(true) {
721 eprintln!("rocks: flushing wal failed: {e:?}");
722 }
723 if let Err(e) = self.db.flush_opt(&{
724 let mut opt = rocksdb::FlushOptions::default();
725 opt.set_wait(true);
726 opt
727 }) {
728 eprintln!("rocks: flushing memtables failed: {e:?}");
729 }
730 match Arc::get_mut(&mut self.backup_task) {
731 Some(maybe_task) => {
732 if let Some(task) = maybe_task.take() {
733 eprintln!("waiting for backup task to complete...");
734 if let Err(e) = task.join() {
735 eprintln!("failed to join backup task: {e:?}");
736 }
737 }
738 }
739 None => eprintln!("rocks: failed to get backup task, likely a bug."),
740 }
741 self.db.cancel_all_background_work(true);
742 }
743 }
744}
745
746impl AsRocksValue for u64 {}
747impl ValueFromRocks for u64 {}
748
749impl LinkStorage for RocksStorage {
750 fn get_cursor(&mut self) -> Result<Option<u64>> {
751 self.db
752 .get(JETSTREAM_CURSOR_KEY)?
753 .map(|b| _vr(&b))
754 .transpose()
755 }
756
757 fn push(&mut self, event: &ActionableEvent, cursor: u64) -> Result<()> {
758 // normal ops
759 let mut batch = WriteBatch::default();
760 let t0 = Instant::now();
761 if let Some(action) = match event {
762 ActionableEvent::CreateLinks { record_id, links } => {
763 self.add_links(record_id, links, &mut batch)?;
764 Some("create_links")
765 }
766 ActionableEvent::UpdateLinks {
767 record_id,
768 new_links,
769 } => {
770 self.remove_links(record_id, &mut batch)?;
771 self.add_links(record_id, new_links, &mut batch)?;
772 Some("update_links")
773 }
774 ActionableEvent::DeleteRecord(record_id) => {
775 self.remove_links(record_id, &mut batch)?;
776 Some("delete_record")
777 }
778 ActionableEvent::ActivateAccount(did) => {
779 self.set_account(did, true, &mut batch)?;
780 Some("set_account_status")
781 }
782 ActionableEvent::DeactivateAccount(did) => {
783 self.set_account(did, false, &mut batch)?;
784 Some("set_account_status")
785 }
786 ActionableEvent::DeleteAccount(_) => None, // delete account is handled specially
787 } {
788 let t_read = t0.elapsed();
789 batch.put(JETSTREAM_CURSOR_KEY.as_bytes(), _rv(cursor));
790 let batch_ops = batch.len();
791 self.db.write(batch)?;
792 let t_total = t0.elapsed();
793
794 histogram!("storage_rocksdb_read_seconds", "action" => action)
795 .record(t_read.as_secs_f64());
796 histogram!("storage_rocksdb_action_seconds", "action" => action)
797 .record(t_total.as_secs_f64());
798 counter!("storage_rocksdb_batch_ops_total", "action" => action)
799 .increment(batch_ops as u64);
800 }
801
802 // special metrics for account deletion which can be arbitrarily expensive
803 let mut outer_batch = WriteBatch::default();
804 let t0 = Instant::now();
805 if let ActionableEvent::DeleteAccount(did) = event {
806 let inner_batch_ops = self.delete_account(did, &mut outer_batch)?;
807 let total_batch_ops = inner_batch_ops + outer_batch.len();
808 self.db.write(outer_batch)?;
809 let t_total = t0.elapsed();
810
811 histogram!("storage_rocksdb_action_seconds", "action" => "delete_account")
812 .record(t_total.as_secs_f64());
813 counter!("storage_rocksdb_batch_ops_total", "action" => "delete_account")
814 .increment(total_batch_ops as u64);
815 histogram!("storage_rocksdb_delete_account_ops").record(total_batch_ops as f64);
816 }
817
818 Ok(())
819 }
820
821 fn to_readable(&mut self) -> impl LinkReader {
822 let mut readable = self.clone();
823 readable.is_writer = false;
824 readable
825 }
826}
827
828impl LinkReader for RocksStorage {
829 fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> {
830 let target_key = TargetKey(
831 Target(target.to_string()),
832 Collection(collection.to_string()),
833 RPath(path.to_string()),
834 );
835 if let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? {
836 let (alive, _) = self.get_target_linkers(&target_id)?.count();
837 Ok(alive)
838 } else {
839 Ok(0)
840 }
841 }
842
843 fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> {
844 let target_key = TargetKey(
845 Target(target.to_string()),
846 Collection(collection.to_string()),
847 RPath(path.to_string()),
848 );
849 if let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? {
850 Ok(self.get_target_linkers(&target_id)?.count_distinct_dids())
851 } else {
852 Ok(0)
853 }
854 }
855
856 fn get_links(
857 &self,
858 target: &str,
859 collection: &str,
860 path: &str,
861 limit: u64,
862 until: Option<u64>,
863 filter_dids: &HashSet<Did>,
864 ) -> Result<PagedAppendingCollection<RecordId>> {
865 let target_key = TargetKey(
866 Target(target.to_string()),
867 Collection(collection.to_string()),
868 RPath(path.to_string()),
869 );
870
871 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else {
872 return Ok(PagedAppendingCollection {
873 version: (0, 0),
874 items: Vec::new(),
875 next: None,
876 total: 0,
877 });
878 };
879
880 let mut linkers = self.get_target_linkers(&target_id)?;
881 if !filter_dids.is_empty() {
882 let mut did_filter = HashSet::new();
883 for did in filter_dids {
884 let Some(DidIdValue(did_id, active)) =
885 self.did_id_table.get_id_val(&self.db, did)?
886 else {
887 eprintln!("failed to find a did_id for {did:?}");
888 continue;
889 };
890 if !active {
891 eprintln!("excluding inactive did from filtered results");
892 continue;
893 }
894 did_filter.insert(did_id);
895 }
896 linkers.0.retain(|linker| did_filter.contains(&linker.0));
897 }
898
899 let (alive, gone) = linkers.count();
900 let total = alive + gone;
901 let end = until.map(|u| std::cmp::min(u, total)).unwrap_or(total) as usize;
902 let begin = end.saturating_sub(limit as usize);
903 let next = if begin == 0 { None } else { Some(begin as u64) };
904
905 let did_id_rkeys = linkers.0[begin..end].iter().rev().collect::<Vec<_>>();
906
907 let mut items = Vec::with_capacity(did_id_rkeys.len());
908 // TODO: use get-many (or multi-get or whatever it's called)
909 for (did_id, rkey) in did_id_rkeys {
910 if did_id.is_empty() {
911 continue;
912 }
913 if let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? {
914 let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)?
915 else {
916 eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?");
917 continue;
918 };
919 if !active {
920 continue;
921 }
922 items.push(RecordId {
923 did,
924 collection: collection.to_string(),
925 rkey: rkey.0.clone(),
926 });
927 } else {
928 eprintln!("failed to look up did from did_id {did_id:?}");
929 }
930 }
931
932 Ok(PagedAppendingCollection {
933 version: (total, gone),
934 items,
935 next,
936 total: alive,
937 })
938 }
939
940 fn get_distinct_dids(
941 &self,
942 target: &str,
943 collection: &str,
944 path: &str,
945 limit: u64,
946 until: Option<u64>,
947 ) -> Result<PagedAppendingCollection<Did>> {
948 let target_key = TargetKey(
949 Target(target.to_string()),
950 Collection(collection.to_string()),
951 RPath(path.to_string()),
952 );
953
954 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else {
955 return Ok(PagedAppendingCollection {
956 version: (0, 0),
957 items: Vec::new(),
958 next: None,
959 total: 0,
960 });
961 };
962
963 let linkers = self.get_distinct_target_linkers(&target_id)?;
964
965 let (alive, gone) = linkers.count();
966 let total = alive + gone;
967 let end = until.map(|u| std::cmp::min(u, total)).unwrap_or(total) as usize;
968 let begin = end.saturating_sub(limit as usize);
969 let next = if begin == 0 { None } else { Some(begin as u64) };
970
971 let did_id_rkeys = linkers.0[begin..end].iter().rev().collect::<Vec<_>>();
972
973 let mut items = Vec::with_capacity(did_id_rkeys.len());
974 // TODO: use get-many (or multi-get or whatever it's called)
975 for (did_id, _) in did_id_rkeys {
976 if did_id.is_empty() {
977 continue;
978 }
979 if let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? {
980 let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)?
981 else {
982 eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?");
983 continue;
984 };
985 if !active {
986 continue;
987 }
988 items.push(did);
989 } else {
990 eprintln!("failed to look up did from did_id {did_id:?}");
991 }
992 }
993
994 Ok(PagedAppendingCollection {
995 version: (total, gone),
996 items,
997 next,
998 total: alive,
999 })
1000 }
1001
1002 fn get_all_record_counts(&self, target: &str) -> Result<HashMap<String, HashMap<String, u64>>> {
1003 let mut out: HashMap<String, HashMap<String, u64>> = HashMap::new();
1004 for (target_key, target_id) in self.iter_targets_for_target(&Target(target.into())) {
1005 let TargetKey(_, Collection(ref collection), RPath(ref path)) = target_key;
1006 let (count, _) = self.get_target_linkers(&target_id)?.count();
1007 out.entry(collection.into())
1008 .or_default()
1009 .insert(path.clone(), count);
1010 }
1011 Ok(out)
1012 }
1013
1014 fn get_all_counts(
1015 &self,
1016 target: &str,
1017 ) -> Result<HashMap<String, HashMap<String, CountsByCount>>> {
1018 let mut out: HashMap<String, HashMap<String, CountsByCount>> = HashMap::new();
1019 for (target_key, target_id) in self.iter_targets_for_target(&Target(target.into())) {
1020 let TargetKey(_, Collection(ref collection), RPath(ref path)) = target_key;
1021 let target_linkers = self.get_target_linkers(&target_id)?;
1022 let (records, _) = target_linkers.count();
1023 let distinct_dids = target_linkers.count_distinct_dids();
1024 out.entry(collection.into()).or_default().insert(
1025 path.clone(),
1026 CountsByCount {
1027 records,
1028 distinct_dids,
1029 },
1030 );
1031 }
1032 Ok(out)
1033 }
1034
1035 fn get_stats(&self) -> Result<StorageStats> {
1036 let dids = self.did_id_table.estimate_count();
1037 let targetables = self.target_id_table.estimate_count();
1038 let lr_cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap();
1039 let linking_records = self
1040 .db
1041 .property_value_cf(&lr_cf, rocksdb::properties::ESTIMATE_NUM_KEYS)?
1042 .map(|s| s.parse::<u64>())
1043 .transpose()?
1044 .unwrap_or(0);
1045 Ok(StorageStats {
1046 dids,
1047 targetables,
1048 linking_records,
1049 })
1050 }
1051}
1052
1053trait AsRocksKey: Serialize {}
1054trait AsRocksKeyPrefix<K: KeyFromRocks>: Serialize {}
1055trait AsRocksValue: Serialize {}
1056trait KeyFromRocks: for<'de> Deserialize<'de> {}
1057trait ValueFromRocks: for<'de> Deserialize<'de> {}
1058
1059// did_id table
1060impl AsRocksKey for &Did {}
1061impl AsRocksValue for &DidIdValue {}
1062impl ValueFromRocks for DidIdValue {}
1063
1064// temp
1065impl KeyFromRocks for Did {}
1066impl AsRocksKey for &DidId {}
1067
1068// target_ids table
1069impl AsRocksKey for &TargetKey {}
1070impl AsRocksKeyPrefix<TargetKey> for &TargetIdTargetPrefix {}
1071impl AsRocksValue for &TargetId {}
1072impl KeyFromRocks for TargetKey {}
1073impl ValueFromRocks for TargetId {}
1074
1075// target_links table
1076impl AsRocksKey for &TargetId {}
1077impl AsRocksValue for &TargetLinkers {}
1078impl ValueFromRocks for TargetLinkers {}
1079
1080// record_link_targets table
1081impl AsRocksKey for &RecordLinkKey {}
1082impl AsRocksKeyPrefix<RecordLinkKey> for &RecordLinkKeyDidIdPrefix {}
1083impl AsRocksValue for &RecordLinkTargets {}
1084impl KeyFromRocks for RecordLinkKey {}
1085impl ValueFromRocks for RecordLinkTargets {}
1086
1087pub fn _bincode_opts() -> impl BincodeOptions {
1088 bincode::DefaultOptions::new().with_big_endian() // happier db -- numeric prefixes in lsm
1089}
1090fn _rk(k: impl AsRocksKey) -> Vec<u8> {
1091 _bincode_opts().serialize(&k).unwrap()
1092}
1093fn _rkp<K: KeyFromRocks>(kp: impl AsRocksKeyPrefix<K>) -> Vec<u8> {
1094 _bincode_opts().serialize(&kp).unwrap()
1095}
1096fn _rv(v: impl AsRocksValue) -> Vec<u8> {
1097 _bincode_opts().serialize(&v).unwrap()
1098}
1099fn _kr<T: KeyFromRocks>(bytes: &[u8]) -> Result<T> {
1100 Ok(_bincode_opts().deserialize(bytes)?)
1101}
1102fn _vr<T: ValueFromRocks>(bytes: &[u8]) -> Result<T> {
1103 Ok(_bincode_opts().deserialize(bytes)?)
1104}
1105
1106#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
1107pub struct Collection(pub String);
1108
1109#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
1110pub struct RPath(pub String);
1111
1112#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1113pub struct RKey(pub String);
1114
1115impl RKey {
1116 fn empty() -> Self {
1117 RKey("".to_string())
1118 }
1119}
1120
1121// did ids
1122#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
1123pub struct DidId(pub u64);
1124
1125impl DidId {
1126 fn empty() -> Self {
1127 DidId(0)
1128 }
1129 fn is_empty(&self) -> bool {
1130 self.0 == 0
1131 }
1132}
1133
1134#[derive(Debug, Clone, Serialize, Deserialize)]
1135struct DidIdValue(DidId, bool); // active or not
1136
1137impl DidIdValue {
1138 fn did_id(&self) -> DidId {
1139 let Self(id, _) = self;
1140 *id
1141 }
1142}
1143
1144// target ids
1145#[derive(Debug, Clone, Serialize, Deserialize)]
1146struct TargetId(u64); // key
1147
1148#[derive(Debug, Clone, Serialize, Deserialize)]
1149pub struct Target(pub String); // the actual target/uri
1150
1151// targets (uris, dids, etc.): the reverse index
1152#[derive(Debug, Clone, Serialize, Deserialize)]
1153pub struct TargetKey(pub Target, pub Collection, pub RPath);
1154
1155#[derive(Debug, Default, Serialize, Deserialize)]
1156pub struct TargetLinkers(pub Vec<(DidId, RKey)>);
1157
1158impl TargetLinkers {
1159 fn remove_linker(&mut self, did: &DidId, rkey: &RKey) -> bool {
1160 if let Some(entry) = self.0.iter_mut().rfind(|d| **d == (*did, rkey.clone())) {
1161 *entry = (DidId::empty(), RKey::empty());
1162 true
1163 } else {
1164 false
1165 }
1166 }
1167 pub fn count(&self) -> (u64, u64) {
1168 // (linkers, deleted links)
1169 let total = self.0.len() as u64;
1170 let alive = self.0.iter().filter(|(DidId(id), _)| *id != 0).count() as u64;
1171 let gone = total - alive;
1172 (alive, gone)
1173 }
1174 fn count_distinct_dids(&self) -> u64 {
1175 self.0
1176 .iter()
1177 .filter_map(|(DidId(id), _)| if *id == 0 { None } else { Some(id) })
1178 .collect::<HashSet<_>>()
1179 .len() as u64
1180 }
1181}
1182
1183// forward links to targets so we can delete links
1184#[derive(Debug, Serialize, Deserialize)]
1185struct RecordLinkKey(DidId, Collection, RKey);
1186
1187// does this even work????
1188#[derive(Debug, Serialize, Deserialize)]
1189struct RecordLinkKeyDidIdPrefix(DidId);
1190
1191#[derive(Debug, Serialize, Deserialize)]
1192struct TargetIdTargetPrefix(Target);
1193
1194#[derive(Debug, Serialize, Deserialize)]
1195struct RecordLinkTarget(RPath, TargetId);
1196
1197#[derive(Debug, Default, Serialize, Deserialize)]
1198struct RecordLinkTargets(Vec<RecordLinkTarget>);
1199
1200impl RecordLinkTargets {
1201 fn with_capacity(cap: usize) -> Self {
1202 Self(Vec::with_capacity(cap))
1203 }
1204 fn add(&mut self, target: RecordLinkTarget) {
1205 self.0.push(target)
1206 }
1207}
1208
1209#[cfg(test)]
1210mod tests {
1211 use super::super::ActionableEvent;
1212 use super::*;
1213 use links::Link;
1214 use tempfile::tempdir;
1215
1216 #[test]
1217 fn rocks_delete_iterator_regression() -> Result<()> {
1218 let mut store = RocksStorage::new(tempdir()?)?;
1219
1220 // create a link from the deleter account
1221 store.push(
1222 &ActionableEvent::CreateLinks {
1223 record_id: RecordId {
1224 did: "did:plc:will-shortly-delete".into(),
1225 collection: "a.b.c".into(),
1226 rkey: "asdf".into(),
1227 },
1228 links: vec![CollectedLink {
1229 target: Link::Uri("example.com".into()),
1230 path: ".uri".into(),
1231 }],
1232 },
1233 0,
1234 )?;
1235
1236 // and a different link from a separate, new account (later in didid prefix iteration)
1237 store.push(
1238 &ActionableEvent::CreateLinks {
1239 record_id: RecordId {
1240 did: "did:plc:someone-else".into(),
1241 collection: "a.b.c".into(),
1242 rkey: "asdf".into(),
1243 },
1244 links: vec![CollectedLink {
1245 target: Link::Uri("another.example.com".into()),
1246 path: ".uri".into(),
1247 }],
1248 },
1249 0,
1250 )?;
1251
1252 // now delete the first account (this is where the buggy version explodes)
1253 store.push(
1254 &ActionableEvent::DeleteAccount("did:plc:will-shortly-delete".into()),
1255 0,
1256 )?;
1257
1258 Ok(())
1259 }
1260
1261 #[test]
1262 fn rocks_prefix_iteration_helper() -> Result<()> {
1263 #[derive(Serialize, Deserialize)]
1264 struct Key(u8, u8);
1265
1266 #[derive(Serialize)]
1267 struct KeyPrefix(u8);
1268
1269 #[derive(Serialize, Deserialize)]
1270 struct Value(());
1271
1272 impl AsRocksKey for &Key {}
1273 impl AsRocksKeyPrefix<Key> for &KeyPrefix {}
1274 impl AsRocksValue for &Value {}
1275
1276 impl KeyFromRocks for Key {}
1277 impl ValueFromRocks for Value {}
1278
1279 let data = RocksStorage::new(tempdir()?)?;
1280 let cf = data.db.cf_handle(DID_IDS_CF).unwrap();
1281 let mut batch = WriteBatch::default();
1282
1283 // not our prefix
1284 batch.put_cf(&cf, _rk(&Key(0x01, 0x00)), _rv(&Value(())));
1285 batch.put_cf(&cf, _rk(&Key(0x01, 0xFF)), _rv(&Value(())));
1286
1287 // our prefix!
1288 for i in 0..=0xFF {
1289 batch.put_cf(&cf, _rk(&Key(0x02, i)), _rv(&Value(())));
1290 }
1291
1292 // not our prefix
1293 batch.put_cf(&cf, _rk(&Key(0x03, 0x00)), _rv(&Value(())));
1294 batch.put_cf(&cf, _rk(&Key(0x03, 0xFF)), _rv(&Value(())));
1295
1296 data.db.write(batch)?;
1297
1298 let mut okays: [bool; 256] = [false; 256];
1299 for (i, (k, Value(_))) in data.prefix_iter_cf(&cf, KeyPrefix(0x02)).enumerate() {
1300 assert!(i < 256);
1301 assert_eq!(k.0, 0x02, "prefix iterated key has the right prefix");
1302 assert_eq!(k.1 as usize, i, "prefixed keys are iterated in exact order");
1303 okays[k.1 as usize] = true;
1304 }
1305 assert!(okays.iter().all(|b| *b), "every key was iterated");
1306
1307 Ok(())
1308 }
1309
1310 // TODO: add tests for key prefixes actually prefixing (bincode encoding _should_...)
1311}