forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use super::{ActionableEvent, LinkReader, LinkStorage, PagedAppendingCollection, StorageStats};
2use crate::{CountsByCount, Did, RecordId};
3use anyhow::{bail, Result};
4use bincode::Options as BincodeOptions;
5use links::CollectedLink;
6use metrics::{counter, describe_counter, describe_histogram, histogram, Unit};
7use ratelimit::Ratelimiter;
8use rocksdb::backup::{BackupEngine, BackupEngineOptions};
9use rocksdb::{
10 AsColumnFamilyRef, ColumnFamilyDescriptor, DBWithThreadMode, IteratorMode, MergeOperands,
11 MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch,
12};
13use serde::{Deserialize, Serialize};
14use std::collections::{HashMap, HashSet};
15use std::io::Read;
16use std::marker::PhantomData;
17use std::path::{Path, PathBuf};
18use std::sync::{
19 atomic::{AtomicU64, Ordering},
20 Arc,
21};
22use std::thread;
23use std::time::{Duration, Instant};
24use tokio_util::sync::CancellationToken;
25
26static DID_IDS_CF: &str = "did_ids";
27static TARGET_IDS_CF: &str = "target_ids";
28static TARGET_LINKERS_CF: &str = "target_links";
29static LINK_TARGETS_CF: &str = "link_targets";
30
31static JETSTREAM_CURSOR_KEY: &str = "jetstream_cursor";
32
33// todo: actually understand and set these options probably better
34fn rocks_opts_base() -> Options {
35 let mut opts = Options::default();
36 opts.set_level_compaction_dynamic_level_bytes(true);
37 opts.create_if_missing(true);
38 opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
39 opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd); // this probably doesn't work because it hasn't been enabled
40 // TODO: actually enable the bottommost compression. but after other changes run for a bit in case zstd is cpu- or mem-expensive.
41 opts
42}
43fn get_db_opts() -> Options {
44 let mut opts = rocks_opts_base();
45 opts.create_missing_column_families(true);
46 opts.increase_parallelism(4); // todo: make configurable if anyone else actually runs a different instance. start at # of cores
47 // consider doing optimize_level_style_compaction or optimize_universal_style_compaction
48 opts
49}
50fn get_db_read_opts() -> Options {
51 let mut opts = Options::default();
52 opts.optimize_for_point_lookup(16_384); // mb (run this on big machines)
53 opts
54}
55
56#[derive(Debug, Clone)]
57pub struct RocksStorage {
58 pub db: Arc<DBWithThreadMode<MultiThreaded>>, // TODO: mov seqs here (concat merge op will be fun)
59 did_id_table: IdTable<Did, DidIdValue, true>,
60 target_id_table: IdTable<TargetKey, TargetId, false>,
61 is_writer: bool,
62 backup_task: Arc<Option<thread::JoinHandle<Result<()>>>>,
63}
64
65trait IdTableValue: ValueFromRocks + Clone {
66 fn new(v: u64) -> Self;
67 fn id(&self) -> u64;
68}
69#[derive(Debug, Clone)]
70struct IdTableBase<Orig, IdVal: IdTableValue>
71where
72 Orig: KeyFromRocks,
73 for<'a> &'a Orig: AsRocksKey,
74{
75 _key_marker: PhantomData<Orig>,
76 _val_marker: PhantomData<IdVal>,
77 name: String,
78 id_seq: Arc<AtomicU64>,
79}
80impl<Orig, IdVal: IdTableValue> IdTableBase<Orig, IdVal>
81where
82 Orig: KeyFromRocks,
83 for<'a> &'a Orig: AsRocksKey,
84{
85 fn cf_descriptor(&self) -> ColumnFamilyDescriptor {
86 ColumnFamilyDescriptor::new(&self.name, rocks_opts_base())
87 }
88 fn init<const WITH_REVERSE: bool>(
89 self,
90 db: &DBWithThreadMode<MultiThreaded>,
91 ) -> Result<IdTable<Orig, IdVal, WITH_REVERSE>> {
92 if db.cf_handle(&self.name).is_none() {
93 bail!("failed to get cf handle from db -- was the db open with our .cf_descriptor()?");
94 }
95 let priv_id_seq = if let Some(seq_bytes) = db.get(self.seq_key())? {
96 if seq_bytes.len() != 8 {
97 bail!(
98 "reading bytes for u64 id seq {:?}: found the wrong number of bytes",
99 self.seq_key()
100 );
101 }
102 let mut buf: [u8; 8] = [0; 8];
103 seq_bytes.as_slice().read_exact(&mut buf)?;
104 let last_seq = u64::from_le_bytes(buf);
105 last_seq + 1
106 } else {
107 1
108 };
109 self.id_seq.store(priv_id_seq, Ordering::SeqCst);
110 Ok(IdTable {
111 base: self,
112 priv_id_seq,
113 })
114 }
115 fn seq_key(&self) -> Vec<u8> {
116 let mut k = b"__id_seq_key_plz_be_unique:".to_vec();
117 k.extend(self.name.as_bytes());
118 k
119 }
120}
121#[derive(Debug, Clone)]
122struct IdTable<Orig, IdVal: IdTableValue, const WITH_REVERSE: bool>
123where
124 Orig: KeyFromRocks,
125 for<'a> &'a Orig: AsRocksKey,
126{
127 base: IdTableBase<Orig, IdVal>,
128 priv_id_seq: u64,
129}
130impl<Orig: Clone, IdVal: IdTableValue, const WITH_REVERSE: bool> IdTable<Orig, IdVal, WITH_REVERSE>
131where
132 Orig: KeyFromRocks,
133 for<'v> &'v IdVal: AsRocksValue,
134 for<'k> &'k Orig: AsRocksKey,
135{
136 #[must_use]
137 fn setup(name: &str) -> IdTableBase<Orig, IdVal> {
138 IdTableBase::<Orig, IdVal> {
139 _key_marker: PhantomData,
140 _val_marker: PhantomData,
141 name: name.into(),
142 id_seq: Arc::new(AtomicU64::new(0)), // zero is "uninint", first seq num will be 1
143 }
144 }
145 fn get_id_val(
146 &self,
147 db: &DBWithThreadMode<MultiThreaded>,
148 orig: &Orig,
149 ) -> Result<Option<IdVal>> {
150 let cf = db.cf_handle(&self.base.name).unwrap();
151 if let Some(_id_bytes) = db.get_cf(&cf, _rk(orig))? {
152 Ok(Some(_vr(&_id_bytes)?))
153 } else {
154 Ok(None)
155 }
156 }
157 fn __get_or_create_id_val<CF>(
158 &mut self,
159 cf: &CF,
160 db: &DBWithThreadMode<MultiThreaded>,
161 batch: &mut WriteBatch,
162 orig: &Orig,
163 ) -> Result<IdVal>
164 where
165 CF: AsColumnFamilyRef,
166 {
167 Ok(self.get_id_val(db, orig)?.unwrap_or_else(|| {
168 let prev_priv_seq = self.priv_id_seq;
169 self.priv_id_seq += 1;
170 let prev_public_seq = self.base.id_seq.swap(self.priv_id_seq, Ordering::SeqCst);
171 assert_eq!(
172 prev_public_seq, prev_priv_seq,
173 "public seq may have been modified??"
174 );
175 let id_value = IdVal::new(self.priv_id_seq);
176 batch.put(self.base.seq_key(), self.priv_id_seq.to_le_bytes());
177 batch.put_cf(cf, _rk(orig), _rv(&id_value));
178 id_value
179 }))
180 }
181 fn estimate_count(&self) -> u64 {
182 self.base.id_seq.load(Ordering::SeqCst) - 1 // -1 because seq zero is reserved
183 }
184}
185impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal, true>
186where
187 Orig: KeyFromRocks,
188 for<'v> &'v IdVal: AsRocksValue,
189 for<'k> &'k Orig: AsRocksKey,
190{
191 fn get_or_create_id_val(
192 &mut self,
193 db: &DBWithThreadMode<MultiThreaded>,
194 batch: &mut WriteBatch,
195 orig: &Orig,
196 ) -> Result<IdVal> {
197 let cf = db.cf_handle(&self.base.name).unwrap();
198 let id_val = self.__get_or_create_id_val(&cf, db, batch, orig)?;
199 // TODO: assert that the original is never a u64 that could collide
200 batch.put_cf(&cf, id_val.id().to_be_bytes(), _rk(orig)); // reversed rk/rv on purpose here :/
201 Ok(id_val)
202 }
203
204 fn get_val_from_id(
205 &self,
206 db: &DBWithThreadMode<MultiThreaded>,
207 id: u64,
208 ) -> Result<Option<Orig>> {
209 let cf = db.cf_handle(&self.base.name).unwrap();
210 if let Some(orig_bytes) = db.get_cf(&cf, id.to_be_bytes())? {
211 // HACK ish
212 Ok(Some(_kr(&orig_bytes)?))
213 } else {
214 Ok(None)
215 }
216 }
217}
218impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal, false>
219where
220 Orig: KeyFromRocks,
221 for<'v> &'v IdVal: AsRocksValue,
222 for<'k> &'k Orig: AsRocksKey,
223{
224 fn get_or_create_id_val(
225 &mut self,
226 db: &DBWithThreadMode<MultiThreaded>,
227 batch: &mut WriteBatch,
228 orig: &Orig,
229 ) -> Result<IdVal> {
230 let cf = db.cf_handle(&self.base.name).unwrap();
231 self.__get_or_create_id_val(&cf, db, batch, orig)
232 }
233}
234
235impl IdTableValue for DidIdValue {
236 fn new(v: u64) -> Self {
237 DidIdValue(DidId(v), true)
238 }
239 fn id(&self) -> u64 {
240 self.0 .0
241 }
242}
243impl IdTableValue for TargetId {
244 fn new(v: u64) -> Self {
245 TargetId(v)
246 }
247 fn id(&self) -> u64 {
248 self.0
249 }
250}
251
252impl RocksStorage {
253 pub fn new(path: impl AsRef<Path>) -> Result<Self> {
254 Self::describe_metrics();
255 RocksStorage::open_readmode(path, false)
256 }
257
258 pub fn open_readonly(path: impl AsRef<Path>) -> Result<Self> {
259 RocksStorage::open_readmode(path, true)
260 }
261
262 fn open_readmode(path: impl AsRef<Path>, readonly: bool) -> Result<Self> {
263 let did_id_table = IdTable::<_, _, true>::setup(DID_IDS_CF);
264 let target_id_table = IdTable::<_, _, false>::setup(TARGET_IDS_CF);
265
266 let cfs = vec![
267 // id reference tables
268 did_id_table.cf_descriptor(),
269 target_id_table.cf_descriptor(),
270 // the reverse links:
271 ColumnFamilyDescriptor::new(TARGET_LINKERS_CF, {
272 let mut opts = rocks_opts_base();
273 opts.set_merge_operator_associative(
274 "merge_op_extend_did_ids",
275 Self::merge_op_extend_did_ids,
276 );
277 opts
278 }),
279 // unfortunately we also need forward links to handle deletes
280 ColumnFamilyDescriptor::new(LINK_TARGETS_CF, rocks_opts_base()),
281 ];
282
283 let db = if readonly {
284 DBWithThreadMode::open_cf_descriptors_read_only(&get_db_read_opts(), path, cfs, false)?
285 } else {
286 DBWithThreadMode::open_cf_descriptors(&get_db_opts(), path, cfs)?
287 };
288
289 let db = Arc::new(db);
290 let did_id_table = did_id_table.init(&db)?;
291 let target_id_table = target_id_table.init(&db)?;
292 Ok(Self {
293 db,
294 did_id_table,
295 target_id_table,
296 is_writer: !readonly,
297 backup_task: None.into(),
298 })
299 }
300
301 pub fn start_backup(
302 &mut self,
303 path: PathBuf,
304 auto: Option<(u64, Option<usize>)>,
305 stay_alive: CancellationToken,
306 ) -> Result<()> {
307 let task = if let Some((interval_hrs, copies)) = auto {
308 eprintln!("backups: starting background task...");
309 self.backup_task(path, interval_hrs, copies, stay_alive)
310 } else {
311 eprintln!("backups: starting a one-off backup...");
312 thread::spawn({
313 let db = self.db.clone();
314 move || Self::do_backup(db, path)
315 })
316 };
317 self.backup_task = Arc::new(Some(task));
318 Ok(())
319 }
320
321 fn backup_task(
322 &self,
323 path: PathBuf,
324 interval_hrs: u64,
325 copies: Option<usize>,
326 stay_alive: CancellationToken,
327 ) -> std::thread::JoinHandle<Result<()>> {
328 let db = self.db.clone();
329 thread::spawn(move || {
330 let limit =
331 Ratelimiter::builder(1, Duration::from_secs(interval_hrs * 60 * 60)).build()?;
332 let minimum_sleep = Duration::from_secs(1);
333
334 'quit: loop {
335 if let Err(sleep) = limit.try_wait() {
336 eprintln!("backups: background: next backup scheduled in {sleep:?}");
337 let waiting = Instant::now();
338 loop {
339 let remaining = sleep - waiting.elapsed();
340 if stay_alive.is_cancelled() {
341 break 'quit;
342 } else if remaining <= Duration::ZERO {
343 break;
344 } else if remaining < minimum_sleep {
345 thread::sleep(remaining);
346 break;
347 } else {
348 thread::sleep(minimum_sleep);
349 }
350 }
351 }
352 eprintln!("backups: background: starting backup...");
353 if let Err(e) = Self::do_backup(db.clone(), &path) {
354 eprintln!("backups: background: backup failed: {e:?}");
355 // todo: metrics
356 } else {
357 eprintln!("backups: background: backup succeeded yay");
358 }
359 if let Some(copies) = copies {
360 eprintln!("backups: background: trimming to {copies} saved backups...");
361 if let Err(e) = Self::trim_backups(copies, &path) {
362 eprintln!("backups: background: failed to trim backups: {e:?}");
363 } else {
364 eprintln!("backups: background: trimming worked!")
365 }
366 }
367 }
368
369 Ok(())
370 })
371 }
372
373 fn do_backup(db: Arc<DBWithThreadMode<MultiThreaded>>, path: impl AsRef<Path>) -> Result<()> {
374 let mut engine =
375 BackupEngine::open(&BackupEngineOptions::new(path)?, &rocksdb::Env::new()?)?;
376 eprintln!("backups: starting backup...");
377 let t0 = Instant::now();
378 if let Err(e) = engine.create_new_backup(&db) {
379 eprintln!("backups: oh no, backup failed: {e:?}");
380 } else {
381 eprintln!("backups: yay, backup worked?");
382 }
383 eprintln!(
384 "backups: backup finished after {:.2}s",
385 t0.elapsed().as_secs_f32()
386 );
387 Ok(())
388 }
389
390 fn trim_backups(num_backups_to_keep: usize, path: impl AsRef<Path>) -> Result<()> {
391 let mut engine =
392 BackupEngine::open(&BackupEngineOptions::new(path)?, &rocksdb::Env::new()?)?;
393 engine.purge_old_backups(num_backups_to_keep)?;
394 Ok(())
395 }
396
397 fn describe_metrics() {
398 describe_histogram!(
399 "storage_rocksdb_read_seconds",
400 Unit::Seconds,
401 "duration of the read stage of actions"
402 );
403 describe_histogram!(
404 "storage_rocksdb_action_seconds",
405 Unit::Seconds,
406 "duration of read + write of actions"
407 );
408 describe_counter!(
409 "storage_rocksdb_batch_ops_total",
410 Unit::Count,
411 "total batched operations from actions"
412 );
413 describe_histogram!(
414 "storage_rocksdb_delete_account_ops",
415 Unit::Count,
416 "total batched ops for account deletions"
417 );
418 }
419
420 fn merge_op_extend_did_ids(
421 key: &[u8],
422 existing: Option<&[u8]>,
423 operands: &MergeOperands,
424 ) -> Option<Vec<u8>> {
425 let mut linkers: Vec<_> = if let Some(existing_bytes) = existing {
426 match _vr(existing_bytes) {
427 Ok(TargetLinkers(mut existing_linkers)) => {
428 existing_linkers.reserve(operands.len());
429 existing_linkers
430 }
431 Err(e) => {
432 eprintln!("bug? could not deserialize existing target linkers: {e:?}. key={key:?}. continuing, but data will be lost!");
433 if existing_bytes.len() < 1000 {
434 eprintln!("dropping: {existing_bytes:?}");
435 } else {
436 eprintln!("(too long to print)");
437 }
438 Vec::with_capacity(operands.len())
439 }
440 }
441 } else {
442 Vec::with_capacity(operands.len())
443 };
444 for new_linkers in operands {
445 match _vr(new_linkers) {
446 Ok(TargetLinkers(new_linkers)) => linkers.extend(new_linkers),
447 Err(e) => {
448 eprintln!("bug? could not deserialize new target linkers: {e:?}. key={key:?}. continuing, but data will be lost!");
449 if new_linkers.len() < 1000 {
450 eprintln!("skipping: {new_linkers:?}");
451 } else {
452 eprintln!("(too long to print)");
453 }
454 }
455 }
456 }
457 Some(_rv(&TargetLinkers(linkers)))
458 }
459
460 fn prefix_iter_cf<K, V, CF, P>(
461 &self,
462 cf: &CF,
463 pre: P,
464 ) -> impl Iterator<Item = (K, V)> + use<'_, K, V, CF, P>
465 where
466 K: KeyFromRocks,
467 V: ValueFromRocks,
468 CF: AsColumnFamilyRef,
469 for<'a> &'a P: AsRocksKeyPrefix<K>,
470 {
471 let mut read_opts = ReadOptions::default();
472 read_opts.set_iterate_range(PrefixRange(_rkp(&pre))); // TODO verify: inclusive bounds?
473 self.db
474 .iterator_cf_opt(cf, read_opts, IteratorMode::Start)
475 .map_while(Result::ok)
476 .map_while(|(k, v)| Some((_kr(&k).ok()?, _vr(&v).ok()?)))
477 }
478
479 fn update_did_id_value<F>(&self, batch: &mut WriteBatch, did: &Did, update: F) -> Result<bool>
480 where
481 F: FnOnce(DidIdValue) -> Option<DidIdValue>,
482 {
483 let cf = self.db.cf_handle(DID_IDS_CF).unwrap();
484 let Some(did_id_value) = self.did_id_table.get_id_val(&self.db, did)? else {
485 return Ok(false);
486 };
487 let Some(new_did_id_value) = update(did_id_value) else {
488 return Ok(false);
489 };
490 batch.put_cf(&cf, _rk(did), _rv(&new_did_id_value));
491 Ok(true)
492 }
493 fn delete_did_id_value(&self, batch: &mut WriteBatch, did: &Did) {
494 let cf = self.db.cf_handle(DID_IDS_CF).unwrap();
495 batch.delete_cf(&cf, _rk(did));
496 }
497
498 fn get_target_linkers(&self, target_id: &TargetId) -> Result<TargetLinkers> {
499 let cf = self.db.cf_handle(TARGET_LINKERS_CF).unwrap();
500 let Some(linkers_bytes) = self.db.get_cf(&cf, _rk(target_id))? else {
501 return Ok(TargetLinkers::default());
502 };
503 _vr(&linkers_bytes)
504 }
505 /// zero out every duplicate did. bit of a hack, looks the same as deleted, but eh
506 fn get_distinct_target_linkers(&self, target_id: &TargetId) -> Result<TargetLinkers> {
507 let mut seen = HashSet::new();
508 let mut linkers = self.get_target_linkers(target_id)?;
509 for (did_id, _) in linkers.0.iter_mut() {
510 if seen.contains(did_id) {
511 did_id.0 = 0;
512 } else {
513 seen.insert(*did_id);
514 }
515 }
516 Ok(linkers)
517 }
518 fn merge_target_linker(
519 &self,
520 batch: &mut WriteBatch,
521 target_id: &TargetId,
522 linker_did_id: &DidId,
523 linker_rkey: &RKey,
524 ) {
525 let cf = self.db.cf_handle(TARGET_LINKERS_CF).unwrap();
526 batch.merge_cf(
527 &cf,
528 _rk(target_id),
529 _rv(&TargetLinkers(vec![(*linker_did_id, linker_rkey.clone())])),
530 );
531 }
532 fn update_target_linkers<F>(
533 &self,
534 batch: &mut WriteBatch,
535 target_id: &TargetId,
536 update: F,
537 ) -> Result<bool>
538 where
539 F: FnOnce(TargetLinkers) -> Option<TargetLinkers>,
540 {
541 let cf = self.db.cf_handle(TARGET_LINKERS_CF).unwrap();
542 let existing_linkers = self.get_target_linkers(target_id)?;
543 let Some(new_linkers) = update(existing_linkers) else {
544 return Ok(false);
545 };
546 batch.put_cf(&cf, _rk(target_id), _rv(&new_linkers));
547 Ok(true)
548 }
549
550 fn put_link_targets(
551 &self,
552 batch: &mut WriteBatch,
553 record_link_key: &RecordLinkKey,
554 targets: &RecordLinkTargets,
555 ) {
556 // todo: we are almost idempotent to link creates with this blind write, but we'll still
557 // merge in the reverse index. we could read+modify+write here but it'll be SLOWWWWW on
558 // the path that we need to be fast. we could go back to a merge op and probably be
559 // consistent. or we can accept just a littttttle inconsistency and be idempotent on
560 // forward links but not reverse, slightly messing up deletes :/
561 // _maybe_ we could run in slow idempotent r-m-w mode during firehose catch-up at the start,
562 // then switch to the fast version?
563 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap();
564 batch.put_cf(&cf, _rk(record_link_key), _rv(targets));
565 }
566 fn get_record_link_targets(
567 &self,
568 record_link_key: &RecordLinkKey,
569 ) -> Result<Option<RecordLinkTargets>> {
570 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap();
571 if let Some(bytes) = self.db.get_cf(&cf, _rk(record_link_key))? {
572 Ok(Some(_vr(&bytes)?))
573 } else {
574 Ok(None)
575 }
576 }
577 fn delete_record_link(&self, batch: &mut WriteBatch, record_link_key: &RecordLinkKey) {
578 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap();
579 batch.delete_cf(&cf, _rk(record_link_key));
580 }
581 fn iter_links_for_did_id(
582 &self,
583 did_id: &DidId,
584 ) -> impl Iterator<Item = (RecordLinkKey, RecordLinkTargets)> + use<'_> {
585 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap();
586 self.prefix_iter_cf(&cf, RecordLinkKeyDidIdPrefix(*did_id))
587 }
588 fn iter_targets_for_target(
589 &self,
590 target: &Target,
591 ) -> impl Iterator<Item = (TargetKey, TargetId)> + use<'_> {
592 let cf = self.db.cf_handle(TARGET_IDS_CF).unwrap();
593 self.prefix_iter_cf(&cf, TargetIdTargetPrefix(target.clone()))
594 }
595
596 //
597 // higher-level event action handlers
598 //
599
600 fn add_links(
601 &mut self,
602 record_id: &RecordId,
603 links: &[CollectedLink],
604 batch: &mut WriteBatch,
605 ) -> Result<()> {
606 let DidIdValue(did_id, _) =
607 self.did_id_table
608 .get_or_create_id_val(&self.db, batch, &record_id.did)?;
609
610 let record_link_key = RecordLinkKey(
611 did_id,
612 Collection(record_id.collection()),
613 RKey(record_id.rkey()),
614 );
615 let mut record_link_targets = RecordLinkTargets::with_capacity(links.len());
616
617 for CollectedLink { target, path } in links {
618 let target_key = TargetKey(
619 Target(target.clone().into_string()),
620 Collection(record_id.collection()),
621 RPath(path.clone()),
622 );
623 let target_id =
624 self.target_id_table
625 .get_or_create_id_val(&self.db, batch, &target_key)?;
626 self.merge_target_linker(batch, &target_id, &did_id, &RKey(record_id.rkey()));
627
628 record_link_targets.add(RecordLinkTarget(RPath(path.clone()), target_id))
629 }
630
631 self.put_link_targets(batch, &record_link_key, &record_link_targets);
632 Ok(())
633 }
634
635 fn remove_links(&mut self, record_id: &RecordId, batch: &mut WriteBatch) -> Result<()> {
636 let Some(DidIdValue(linking_did_id, _)) =
637 self.did_id_table.get_id_val(&self.db, &record_id.did)?
638 else {
639 return Ok(()); // we don't know her: nothing to do
640 };
641
642 let record_link_key = RecordLinkKey(
643 linking_did_id,
644 Collection(record_id.collection()),
645 RKey(record_id.rkey()),
646 );
647 let Some(record_link_targets) = self.get_record_link_targets(&record_link_key)? else {
648 return Ok(()); // we don't have these links
649 };
650
651 // we do read -> modify -> write here: could merge-op in the deletes instead?
652 // otherwise it's another single-thread-constraining thing.
653 for RecordLinkTarget(_, target_id) in record_link_targets.0 {
654 self.update_target_linkers(batch, &target_id, |mut linkers| {
655 if linkers.0.is_empty() {
656 eprintln!("bug? linked target was missing when removing links");
657 }
658 if !linkers.remove_linker(&linking_did_id, &RKey(record_id.rkey.clone())) {
659 eprintln!("bug? linked target was missing a link when removing links");
660 }
661 Some(linkers)
662 })?;
663 }
664
665 self.delete_record_link(batch, &record_link_key);
666 Ok(())
667 }
668
669 fn set_account(&mut self, did: &Did, active: bool, batch: &mut WriteBatch) -> Result<()> {
670 // this needs to be read-modify-write since the did_id needs to stay the same,
671 // which has a benefit of allowing to avoid adding entries for dids we don't
672 // need. reading on dids needs to be cheap anyway for the current design, and
673 // did active/inactive updates are low-freq in the firehose so, eh, it's fine.
674 self.update_did_id_value(batch, did, |current_value| {
675 Some(DidIdValue(current_value.did_id(), active))
676 })?;
677 Ok(())
678 }
679
680 fn delete_account(&mut self, did: &Did, batch: &mut WriteBatch) -> Result<usize> {
681 let mut total_batched_ops = 0;
682 let Some(DidIdValue(did_id, _)) = self.did_id_table.get_id_val(&self.db, did)? else {
683 return Ok(total_batched_ops); // ignore updates for dids we don't know about
684 };
685 self.delete_did_id_value(batch, did);
686 // TODO: also delete the reverse!!
687
688 // use a separate batch for all their links, since it can be a lot and make us crash at around 1GiB batch size.
689 // this should still hopefully be crash-safe: as long as we don't actually delete the DidId entry until after all links are cleared.
690 // the above .delete_did_id_value is batched, so it shouldn't be written until we've returned from this fn successfully
691 // TODO: queue a background delete task or whatever
692 // TODO: test delete account with more links than chunk size
693 let stuff: Vec<_> = self.iter_links_for_did_id(&did_id).collect();
694 for chunk in stuff.chunks(1024) {
695 let mut mini_batch = WriteBatch::default();
696
697 for (record_link_key, links) in chunk {
698 self.delete_record_link(&mut mini_batch, record_link_key); // _could_ use delete range here instead of individual deletes, but since we have to scan anyway it's not obvious if it's better
699
700 for RecordLinkTarget(_, target_link_id) in links.0.iter() {
701 self.update_target_linkers(&mut mini_batch, target_link_id, |mut linkers| {
702 if !linkers.remove_linker(&did_id, &record_link_key.2) {
703 eprintln!("bug? could not find linker when removing links while deleting an account");
704 }
705 Some(linkers)
706 })?;
707 }
708 }
709 total_batched_ops += mini_batch.len();
710 self.db.write(mini_batch)?; // todo
711 }
712 Ok(total_batched_ops)
713 }
714}
715
716impl Drop for RocksStorage {
717 fn drop(&mut self) {
718 if self.is_writer {
719 println!("rocksdb writer: cleaning up for shutdown...");
720 if let Err(e) = self.db.flush_wal(true) {
721 eprintln!("rocks: flushing wal failed: {e:?}");
722 }
723 if let Err(e) = self.db.flush_opt(&{
724 let mut opt = rocksdb::FlushOptions::default();
725 opt.set_wait(true);
726 opt
727 }) {
728 eprintln!("rocks: flushing memtables failed: {e:?}");
729 }
730 match Arc::get_mut(&mut self.backup_task) {
731 Some(maybe_task) => {
732 if let Some(task) = maybe_task.take() {
733 eprintln!("waiting for backup task to complete...");
734 if let Err(e) = task.join() {
735 eprintln!("failed to join backup task: {e:?}");
736 }
737 }
738 }
739 None => eprintln!("rocks: failed to get backup task, likely a bug."),
740 }
741 self.db.cancel_all_background_work(true);
742 }
743 }
744}
745
746impl AsRocksValue for u64 {}
747impl ValueFromRocks for u64 {}
748
749impl LinkStorage for RocksStorage {
750 fn get_cursor(&mut self) -> Result<Option<u64>> {
751 self.db
752 .get(JETSTREAM_CURSOR_KEY)?
753 .map(|b| _vr(&b))
754 .transpose()
755 }
756
757 fn push(&mut self, event: &ActionableEvent, cursor: u64) -> Result<()> {
758 // normal ops
759 let mut batch = WriteBatch::default();
760 let t0 = Instant::now();
761 if let Some(action) = match event {
762 ActionableEvent::CreateLinks { record_id, links } => {
763 self.add_links(record_id, links, &mut batch)?;
764 Some("create_links")
765 }
766 ActionableEvent::UpdateLinks {
767 record_id,
768 new_links,
769 } => {
770 self.remove_links(record_id, &mut batch)?;
771 self.add_links(record_id, new_links, &mut batch)?;
772 Some("update_links")
773 }
774 ActionableEvent::DeleteRecord(record_id) => {
775 self.remove_links(record_id, &mut batch)?;
776 Some("delete_record")
777 }
778 ActionableEvent::ActivateAccount(did) => {
779 self.set_account(did, true, &mut batch)?;
780 Some("set_account_status")
781 }
782 ActionableEvent::DeactivateAccount(did) => {
783 self.set_account(did, false, &mut batch)?;
784 Some("set_account_status")
785 }
786 ActionableEvent::DeleteAccount(_) => None, // delete account is handled specially
787 } {
788 let t_read = t0.elapsed();
789 batch.put(JETSTREAM_CURSOR_KEY.as_bytes(), _rv(cursor));
790 let batch_ops = batch.len();
791 self.db.write(batch)?;
792 let t_total = t0.elapsed();
793
794 histogram!("storage_rocksdb_read_seconds", "action" => action)
795 .record(t_read.as_secs_f64());
796 histogram!("storage_rocksdb_action_seconds", "action" => action)
797 .record(t_total.as_secs_f64());
798 counter!("storage_rocksdb_batch_ops_total", "action" => action)
799 .increment(batch_ops as u64);
800 }
801
802 // special metrics for account deletion which can be arbitrarily expensive
803 let mut outer_batch = WriteBatch::default();
804 let t0 = Instant::now();
805 if let ActionableEvent::DeleteAccount(did) = event {
806 let inner_batch_ops = self.delete_account(did, &mut outer_batch)?;
807 let total_batch_ops = inner_batch_ops + outer_batch.len();
808 self.db.write(outer_batch)?;
809 let t_total = t0.elapsed();
810
811 histogram!("storage_rocksdb_action_seconds", "action" => "delete_account")
812 .record(t_total.as_secs_f64());
813 counter!("storage_rocksdb_batch_ops_total", "action" => "delete_account")
814 .increment(total_batch_ops as u64);
815 histogram!("storage_rocksdb_delete_account_ops").record(total_batch_ops as f64);
816 }
817
818 Ok(())
819 }
820
821 fn to_readable(&mut self) -> impl LinkReader {
822 let mut readable = self.clone();
823 readable.is_writer = false;
824 readable
825 }
826}
827
828impl LinkReader for RocksStorage {
829 fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> {
830 let target_key = TargetKey(
831 Target(target.to_string()),
832 Collection(collection.to_string()),
833 RPath(path.to_string()),
834 );
835 if let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? {
836 let (alive, _) = self.get_target_linkers(&target_id)?.count();
837 Ok(alive)
838 } else {
839 Ok(0)
840 }
841 }
842
843 fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> {
844 let target_key = TargetKey(
845 Target(target.to_string()),
846 Collection(collection.to_string()),
847 RPath(path.to_string()),
848 );
849 if let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? {
850 Ok(self.get_target_linkers(&target_id)?.count_distinct_dids())
851 } else {
852 Ok(0)
853 }
854 }
855
856 fn get_links(
857 &self,
858 target: &str,
859 collection: &str,
860 path: &str,
861 limit: u64,
862 until: Option<u64>,
863 ) -> Result<PagedAppendingCollection<RecordId>> {
864 let target_key = TargetKey(
865 Target(target.to_string()),
866 Collection(collection.to_string()),
867 RPath(path.to_string()),
868 );
869
870 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else {
871 return Ok(PagedAppendingCollection {
872 version: (0, 0),
873 items: Vec::new(),
874 next: None,
875 total: 0,
876 });
877 };
878
879 let linkers = self.get_target_linkers(&target_id)?;
880
881 let (alive, gone) = linkers.count();
882 let total = alive + gone;
883 let end = until.map(|u| std::cmp::min(u, total)).unwrap_or(total) as usize;
884 let begin = end.saturating_sub(limit as usize);
885 let next = if begin == 0 { None } else { Some(begin as u64) };
886
887 let did_id_rkeys = linkers.0[begin..end].iter().rev().collect::<Vec<_>>();
888
889 let mut items = Vec::with_capacity(did_id_rkeys.len());
890 // TODO: use get-many (or multi-get or whatever it's called)
891 for (did_id, rkey) in did_id_rkeys {
892 if did_id.is_empty() {
893 continue;
894 }
895 if let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? {
896 let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)?
897 else {
898 eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?");
899 continue;
900 };
901 if !active {
902 continue;
903 }
904 items.push(RecordId {
905 did,
906 collection: collection.to_string(),
907 rkey: rkey.0.clone(),
908 });
909 } else {
910 eprintln!("failed to look up did from did_id {did_id:?}");
911 }
912 }
913
914 Ok(PagedAppendingCollection {
915 version: (total, gone),
916 items,
917 next,
918 total: alive,
919 })
920 }
921
922 fn get_distinct_dids(
923 &self,
924 target: &str,
925 collection: &str,
926 path: &str,
927 limit: u64,
928 until: Option<u64>,
929 ) -> Result<PagedAppendingCollection<Did>> {
930 let target_key = TargetKey(
931 Target(target.to_string()),
932 Collection(collection.to_string()),
933 RPath(path.to_string()),
934 );
935
936 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else {
937 return Ok(PagedAppendingCollection {
938 version: (0, 0),
939 items: Vec::new(),
940 next: None,
941 total: 0,
942 });
943 };
944
945 let linkers = self.get_distinct_target_linkers(&target_id)?;
946
947 let (alive, gone) = linkers.count();
948 let total = alive + gone;
949 let end = until.map(|u| std::cmp::min(u, total)).unwrap_or(total) as usize;
950 let begin = end.saturating_sub(limit as usize);
951 let next = if begin == 0 { None } else { Some(begin as u64) };
952
953 let did_id_rkeys = linkers.0[begin..end].iter().rev().collect::<Vec<_>>();
954
955 let mut items = Vec::with_capacity(did_id_rkeys.len());
956 // TODO: use get-many (or multi-get or whatever it's called)
957 for (did_id, _) in did_id_rkeys {
958 if did_id.is_empty() {
959 continue;
960 }
961 if let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? {
962 let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)?
963 else {
964 eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?");
965 continue;
966 };
967 if !active {
968 continue;
969 }
970 items.push(did);
971 } else {
972 eprintln!("failed to look up did from did_id {did_id:?}");
973 }
974 }
975
976 Ok(PagedAppendingCollection {
977 version: (total, gone),
978 items,
979 next,
980 total: alive,
981 })
982 }
983
984 fn get_all_record_counts(&self, target: &str) -> Result<HashMap<String, HashMap<String, u64>>> {
985 let mut out: HashMap<String, HashMap<String, u64>> = HashMap::new();
986 for (target_key, target_id) in self.iter_targets_for_target(&Target(target.into())) {
987 let TargetKey(_, Collection(ref collection), RPath(ref path)) = target_key;
988 let (count, _) = self.get_target_linkers(&target_id)?.count();
989 out.entry(collection.into())
990 .or_default()
991 .insert(path.clone(), count);
992 }
993 Ok(out)
994 }
995
996 fn get_all_counts(
997 &self,
998 target: &str,
999 ) -> Result<HashMap<String, HashMap<String, CountsByCount>>> {
1000 let mut out: HashMap<String, HashMap<String, CountsByCount>> = HashMap::new();
1001 for (target_key, target_id) in self.iter_targets_for_target(&Target(target.into())) {
1002 let TargetKey(_, Collection(ref collection), RPath(ref path)) = target_key;
1003 let target_linkers = self.get_target_linkers(&target_id)?;
1004 let (records, _) = target_linkers.count();
1005 let distinct_dids = target_linkers.count_distinct_dids();
1006 out.entry(collection.into()).or_default().insert(
1007 path.clone(),
1008 CountsByCount {
1009 records,
1010 distinct_dids,
1011 },
1012 );
1013 }
1014 Ok(out)
1015 }
1016
1017 fn get_stats(&self) -> Result<StorageStats> {
1018 let dids = self.did_id_table.estimate_count();
1019 let targetables = self.target_id_table.estimate_count();
1020 let lr_cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap();
1021 let linking_records = self
1022 .db
1023 .property_value_cf(&lr_cf, rocksdb::properties::ESTIMATE_NUM_KEYS)?
1024 .map(|s| s.parse::<u64>())
1025 .transpose()?
1026 .unwrap_or(0);
1027 Ok(StorageStats {
1028 dids,
1029 targetables,
1030 linking_records,
1031 })
1032 }
1033}
1034
1035trait AsRocksKey: Serialize {}
1036trait AsRocksKeyPrefix<K: KeyFromRocks>: Serialize {}
1037trait AsRocksValue: Serialize {}
1038trait KeyFromRocks: for<'de> Deserialize<'de> {}
1039trait ValueFromRocks: for<'de> Deserialize<'de> {}
1040
1041// did_id table
1042impl AsRocksKey for &Did {}
1043impl AsRocksValue for &DidIdValue {}
1044impl ValueFromRocks for DidIdValue {}
1045
1046// temp
1047impl KeyFromRocks for Did {}
1048impl AsRocksKey for &DidId {}
1049
1050// target_ids table
1051impl AsRocksKey for &TargetKey {}
1052impl AsRocksKeyPrefix<TargetKey> for &TargetIdTargetPrefix {}
1053impl AsRocksValue for &TargetId {}
1054impl KeyFromRocks for TargetKey {}
1055impl ValueFromRocks for TargetId {}
1056
1057// target_links table
1058impl AsRocksKey for &TargetId {}
1059impl AsRocksValue for &TargetLinkers {}
1060impl ValueFromRocks for TargetLinkers {}
1061
1062// record_link_targets table
1063impl AsRocksKey for &RecordLinkKey {}
1064impl AsRocksKeyPrefix<RecordLinkKey> for &RecordLinkKeyDidIdPrefix {}
1065impl AsRocksValue for &RecordLinkTargets {}
1066impl KeyFromRocks for RecordLinkKey {}
1067impl ValueFromRocks for RecordLinkTargets {}
1068
1069pub fn _bincode_opts() -> impl BincodeOptions {
1070 bincode::DefaultOptions::new().with_big_endian() // happier db -- numeric prefixes in lsm
1071}
1072fn _rk(k: impl AsRocksKey) -> Vec<u8> {
1073 _bincode_opts().serialize(&k).unwrap()
1074}
1075fn _rkp<K: KeyFromRocks>(kp: impl AsRocksKeyPrefix<K>) -> Vec<u8> {
1076 _bincode_opts().serialize(&kp).unwrap()
1077}
1078fn _rv(v: impl AsRocksValue) -> Vec<u8> {
1079 _bincode_opts().serialize(&v).unwrap()
1080}
1081fn _kr<T: KeyFromRocks>(bytes: &[u8]) -> Result<T> {
1082 Ok(_bincode_opts().deserialize(bytes)?)
1083}
1084fn _vr<T: ValueFromRocks>(bytes: &[u8]) -> Result<T> {
1085 Ok(_bincode_opts().deserialize(bytes)?)
1086}
1087
1088#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
1089pub struct Collection(pub String);
1090
1091#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
1092pub struct RPath(pub String);
1093
1094#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1095pub struct RKey(pub String);
1096
1097impl RKey {
1098 fn empty() -> Self {
1099 RKey("".to_string())
1100 }
1101}
1102
1103// did ids
1104#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
1105pub struct DidId(pub u64);
1106
1107impl DidId {
1108 fn empty() -> Self {
1109 DidId(0)
1110 }
1111 fn is_empty(&self) -> bool {
1112 self.0 == 0
1113 }
1114}
1115
1116#[derive(Debug, Clone, Serialize, Deserialize)]
1117struct DidIdValue(DidId, bool); // active or not
1118
1119impl DidIdValue {
1120 fn did_id(&self) -> DidId {
1121 let Self(id, _) = self;
1122 *id
1123 }
1124}
1125
1126// target ids
1127#[derive(Debug, Clone, Serialize, Deserialize)]
1128struct TargetId(u64); // key
1129
1130#[derive(Debug, Clone, Serialize, Deserialize)]
1131pub struct Target(pub String); // the actual target/uri
1132
1133// targets (uris, dids, etc.): the reverse index
1134#[derive(Debug, Clone, Serialize, Deserialize)]
1135pub struct TargetKey(pub Target, pub Collection, pub RPath);
1136
1137#[derive(Debug, Default, Serialize, Deserialize)]
1138pub struct TargetLinkers(pub Vec<(DidId, RKey)>);
1139
1140impl TargetLinkers {
1141 fn remove_linker(&mut self, did: &DidId, rkey: &RKey) -> bool {
1142 if let Some(entry) = self.0.iter_mut().rfind(|d| **d == (*did, rkey.clone())) {
1143 *entry = (DidId::empty(), RKey::empty());
1144 true
1145 } else {
1146 false
1147 }
1148 }
1149 pub fn count(&self) -> (u64, u64) {
1150 // (linkers, deleted links)
1151 let total = self.0.len() as u64;
1152 let alive = self.0.iter().filter(|(DidId(id), _)| *id != 0).count() as u64;
1153 let gone = total - alive;
1154 (alive, gone)
1155 }
1156 fn count_distinct_dids(&self) -> u64 {
1157 self.0
1158 .iter()
1159 .filter_map(|(DidId(id), _)| if *id == 0 { None } else { Some(id) })
1160 .collect::<HashSet<_>>()
1161 .len() as u64
1162 }
1163}
1164
1165// forward links to targets so we can delete links
1166#[derive(Debug, Serialize, Deserialize)]
1167struct RecordLinkKey(DidId, Collection, RKey);
1168
1169// does this even work????
1170#[derive(Debug, Serialize, Deserialize)]
1171struct RecordLinkKeyDidIdPrefix(DidId);
1172
1173#[derive(Debug, Serialize, Deserialize)]
1174struct TargetIdTargetPrefix(Target);
1175
1176#[derive(Debug, Serialize, Deserialize)]
1177struct RecordLinkTarget(RPath, TargetId);
1178
1179#[derive(Debug, Default, Serialize, Deserialize)]
1180struct RecordLinkTargets(Vec<RecordLinkTarget>);
1181
1182impl RecordLinkTargets {
1183 fn with_capacity(cap: usize) -> Self {
1184 Self(Vec::with_capacity(cap))
1185 }
1186 fn add(&mut self, target: RecordLinkTarget) {
1187 self.0.push(target)
1188 }
1189}
1190
1191#[cfg(test)]
1192mod tests {
1193 use super::super::ActionableEvent;
1194 use super::*;
1195 use links::Link;
1196 use tempfile::tempdir;
1197
1198 #[test]
1199 fn rocks_delete_iterator_regression() -> Result<()> {
1200 let mut store = RocksStorage::new(tempdir()?)?;
1201
1202 // create a link from the deleter account
1203 store.push(
1204 &ActionableEvent::CreateLinks {
1205 record_id: RecordId {
1206 did: "did:plc:will-shortly-delete".into(),
1207 collection: "a.b.c".into(),
1208 rkey: "asdf".into(),
1209 },
1210 links: vec![CollectedLink {
1211 target: Link::Uri("example.com".into()),
1212 path: ".uri".into(),
1213 }],
1214 },
1215 0,
1216 )?;
1217
1218 // and a different link from a separate, new account (later in didid prefix iteration)
1219 store.push(
1220 &ActionableEvent::CreateLinks {
1221 record_id: RecordId {
1222 did: "did:plc:someone-else".into(),
1223 collection: "a.b.c".into(),
1224 rkey: "asdf".into(),
1225 },
1226 links: vec![CollectedLink {
1227 target: Link::Uri("another.example.com".into()),
1228 path: ".uri".into(),
1229 }],
1230 },
1231 0,
1232 )?;
1233
1234 // now delete the first account (this is where the buggy version explodes)
1235 store.push(
1236 &ActionableEvent::DeleteAccount("did:plc:will-shortly-delete".into()),
1237 0,
1238 )?;
1239
1240 Ok(())
1241 }
1242
1243 #[test]
1244 fn rocks_prefix_iteration_helper() -> Result<()> {
1245 #[derive(Serialize, Deserialize)]
1246 struct Key(u8, u8);
1247
1248 #[derive(Serialize)]
1249 struct KeyPrefix(u8);
1250
1251 #[derive(Serialize, Deserialize)]
1252 struct Value(());
1253
1254 impl AsRocksKey for &Key {}
1255 impl AsRocksKeyPrefix<Key> for &KeyPrefix {}
1256 impl AsRocksValue for &Value {}
1257
1258 impl KeyFromRocks for Key {}
1259 impl ValueFromRocks for Value {}
1260
1261 let data = RocksStorage::new(tempdir()?)?;
1262 let cf = data.db.cf_handle(DID_IDS_CF).unwrap();
1263 let mut batch = WriteBatch::default();
1264
1265 // not our prefix
1266 batch.put_cf(&cf, _rk(&Key(0x01, 0x00)), _rv(&Value(())));
1267 batch.put_cf(&cf, _rk(&Key(0x01, 0xFF)), _rv(&Value(())));
1268
1269 // our prefix!
1270 for i in 0..=0xFF {
1271 batch.put_cf(&cf, _rk(&Key(0x02, i)), _rv(&Value(())));
1272 }
1273
1274 // not our prefix
1275 batch.put_cf(&cf, _rk(&Key(0x03, 0x00)), _rv(&Value(())));
1276 batch.put_cf(&cf, _rk(&Key(0x03, 0xFF)), _rv(&Value(())));
1277
1278 data.db.write(batch)?;
1279
1280 let mut okays: [bool; 256] = [false; 256];
1281 for (i, (k, Value(_))) in data.prefix_iter_cf(&cf, KeyPrefix(0x02)).enumerate() {
1282 assert!(i < 256);
1283 assert_eq!(k.0, 0x02, "prefix iterated key has the right prefix");
1284 assert_eq!(k.1 as usize, i, "prefixed keys are iterated in exact order");
1285 okays[k.1 as usize] = true;
1286 }
1287 assert!(okays.iter().all(|b| *b), "every key was iterated");
1288
1289 Ok(())
1290 }
1291
1292 // TODO: add tests for key prefixes actually prefixing (bincode encoding _should_...)
1293}