forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::{ActionableEvent, CountsByCount, Did, RecordId};
2use anyhow::Result;
3use serde::{Deserialize, Serialize};
4use std::collections::{HashMap, HashSet};
5
6pub mod mem_store;
7pub use mem_store::MemStorage;
8
9#[cfg(feature = "rocks")]
10pub mod rocks_store;
11#[cfg(feature = "rocks")]
12pub use rocks_store::RocksStorage;
13
14#[derive(Debug, PartialEq)]
15pub struct PagedAppendingCollection<T> {
16 pub version: (u64, u64), // (collection length, deleted item count) // TODO: change to (total, active)? since dedups isn't "deleted"
17 pub items: Vec<T>,
18 pub next: Option<u64>,
19 pub total: u64,
20}
21
22/// A paged collection whose keys are sorted instead of indexed
23///
24/// this has weaker guarantees than PagedAppendingCollection: it might
25/// return a totally consistent snapshot. but it should avoid duplicates
26/// and each page should at least be internally consistent.
27#[derive(Debug, PartialEq, Default)]
28pub struct PagedOrderedCollection<T, K: Ord> {
29 pub items: Vec<T>,
30 pub next: Option<K>,
31}
32
33#[derive(Debug, Deserialize, Serialize, PartialEq)]
34pub struct StorageStats {
35 /// estimate of how many accounts we've seen create links. the _subjects_ of any links are not represented here.
36 /// for example: new user A follows users B and C. this count will only increment by one, for A.
37 pub dids: u64,
38
39 /// estimate targets * distinct (collection, path)s to reference them.
40 /// distinct targets alone are currently challenging to estimate.
41 pub targetables: u64,
42
43 /// estimate of the count of atproto records seen that contain links.
44 /// records with multiple links are single-counted.
45 /// for LSM stores, deleted links don't decrement this, and updated records with any links will likely increment it.
46 pub linking_records: u64,
47
48 /// first jetstream cursor when this instance first started
49 pub started_at: Option<u64>,
50
51 /// anything else we want to throw in
52 pub other_data: HashMap<String, u64>,
53}
54
55pub trait LinkStorage: Send + Sync {
56 /// jetstream cursor from last saved actions, if available
57 fn get_cursor(&mut self) -> Result<Option<u64>> {
58 Ok(None)
59 }
60
61 fn push(&mut self, event: &ActionableEvent, cursor: u64) -> Result<()>;
62
63 // readers are off from the writer instance
64 fn to_readable(&mut self) -> impl LinkReader;
65}
66
67pub trait LinkReader: Clone + Send + Sync + 'static {
68 #[allow(clippy::too_many_arguments)]
69 fn get_many_to_many_counts(
70 &self,
71 target: &str,
72 collection: &str,
73 path: &str,
74 path_to_other: &str,
75 limit: u64,
76 after: Option<String>,
77 filter_dids: &HashSet<Did>,
78 filter_to_targets: &HashSet<String>,
79 ) -> Result<PagedOrderedCollection<(String, u64, u64), String>>;
80
81 fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>;
82
83 fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>;
84
85 fn get_links(
86 &self,
87 target: &str,
88 collection: &str,
89 path: &str,
90 limit: u64,
91 until: Option<u64>,
92 filter_dids: &HashSet<Did>,
93 ) -> Result<PagedAppendingCollection<RecordId>>;
94
95 fn get_distinct_dids(
96 &self,
97 target: &str,
98 collection: &str,
99 path: &str,
100 limit: u64,
101 until: Option<u64>,
102 ) -> Result<PagedAppendingCollection<Did>>; // TODO: reflect dedups in cursor
103
104 fn get_all_record_counts(&self, _target: &str)
105 -> Result<HashMap<String, HashMap<String, u64>>>;
106
107 fn get_all_counts(
108 &self,
109 _target: &str,
110 ) -> Result<HashMap<String, HashMap<String, CountsByCount>>>;
111
112 /// assume all stats are estimates, since exact counts are very challenging for LSMs
113 fn get_stats(&self) -> Result<StorageStats>;
114}
115
116#[cfg(test)]
117mod tests {
118 use super::*;
119 use links::{CollectedLink, Link};
120 use std::ops::RangeBounds;
121
122 macro_rules! test_each_storage {
123 ($test_name:ident, |$storage_label:ident| $test_code:block) => {
124 #[test]
125 fn $test_name() -> Result<()> {
126 {
127 println!("=> testing with memstorage backend");
128 #[allow(unused_mut)]
129 let mut $storage_label = MemStorage::new();
130 $test_code
131 }
132
133 #[cfg(feature = "rocks")]
134 {
135 println!("=> testing with rocksdb backend");
136 let rocks_db_path = tempfile::tempdir()?;
137 #[allow(unused_mut)]
138 let mut $storage_label = RocksStorage::new(rocks_db_path.path())?;
139 $test_code
140 }
141
142 Ok(())
143 }
144 };
145 }
146
147 fn assert_stats(
148 stats: StorageStats,
149 dids: impl RangeBounds<u64>,
150 targetables: impl RangeBounds<u64>,
151 linking_records: impl RangeBounds<u64>,
152 ) {
153 fn check(name: &str, stat: u64, rb: impl RangeBounds<u64>) {
154 assert!(
155 rb.contains(&stat),
156 "{name:?}: {stat:?} not in range {:?}–{:?}",
157 rb.start_bound(),
158 rb.end_bound()
159 );
160 }
161 check("dids", stats.dids, dids);
162 check("targetables", stats.targetables, targetables);
163 check("linking_records", stats.linking_records, linking_records);
164 }
165
166 test_each_storage!(test_empty, |storage| {
167 assert_eq!(storage.get_count("", "", "")?, 0);
168 assert_eq!(storage.get_count("a", "b", "c")?, 0);
169 assert_eq!(
170 storage.get_count(
171 "at://did:plc:b3rzzkblqsxhr3dgcueymkqe/app.bsky.feed.post/3lf6yc4drhk2f",
172 "app.t.c",
173 ".reply.parent.uri"
174 )?,
175 0
176 );
177 assert_eq!(storage.get_distinct_did_count("", "", "")?, 0);
178 assert_eq!(
179 storage.get_links(
180 "a.com",
181 "app.t.c",
182 ".abc.uri",
183 100,
184 None,
185 &HashSet::default()
186 )?,
187 PagedAppendingCollection {
188 version: (0, 0),
189 items: vec![],
190 next: None,
191 total: 0,
192 }
193 );
194 assert_eq!(
195 storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?,
196 PagedAppendingCollection {
197 version: (0, 0),
198 items: vec![],
199 next: None,
200 total: 0,
201 }
202 );
203 assert_eq!(storage.get_all_counts("bad-example.com")?, HashMap::new());
204 assert_eq!(
205 storage.get_all_record_counts("bad-example.com")?,
206 HashMap::new()
207 );
208
209 assert_stats(storage.get_stats()?, 0..=0, 0..=0, 0..=0);
210 });
211
212 test_each_storage!(test_add_link, |storage| {
213 storage.push(
214 &ActionableEvent::CreateLinks {
215 record_id: RecordId {
216 did: "did:plc:asdf".into(),
217 collection: "app.t.c".into(),
218 rkey: "fdsa".into(),
219 },
220 links: vec![CollectedLink {
221 target: Link::Uri("e.com".into()),
222 path: ".abc.uri".into(),
223 }],
224 },
225 0,
226 )?;
227 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
228 assert_eq!(
229 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
230 1
231 );
232 assert_eq!(storage.get_count("bad.com", "app.t.c", ".abc.uri")?, 0);
233 assert_eq!(storage.get_count("e.com", "app.t.c", ".bad.uri")?, 0);
234 assert_eq!(
235 storage.get_distinct_did_count("e.com", "app.t.c", ".bad.uri")?,
236 0
237 );
238 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1);
239 });
240
241 test_each_storage!(test_links, |storage| {
242 storage.push(
243 &ActionableEvent::CreateLinks {
244 record_id: RecordId {
245 did: "did:plc:asdf".into(),
246 collection: "app.t.c".into(),
247 rkey: "fdsa".into(),
248 },
249 links: vec![CollectedLink {
250 target: Link::Uri("e.com".into()),
251 path: ".abc.uri".into(),
252 }],
253 },
254 0,
255 )?;
256
257 // delete under the wrong collection
258 storage.push(
259 &ActionableEvent::DeleteRecord(RecordId {
260 did: "did:plc:asdf".into(),
261 collection: "app.test.wrongcollection".into(),
262 rkey: "fdsa".into(),
263 }),
264 0,
265 )?;
266 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
267
268 // delete under the wrong rkey
269 storage.push(
270 &ActionableEvent::DeleteRecord(RecordId {
271 did: "did:plc:asdf".into(),
272 collection: "app.t.c".into(),
273 rkey: "wrongkey".into(),
274 }),
275 0,
276 )?;
277 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
278
279 // finally actually delete it
280 storage.push(
281 &ActionableEvent::DeleteRecord(RecordId {
282 did: "did:plc:asdf".into(),
283 collection: "app.t.c".into(),
284 rkey: "fdsa".into(),
285 }),
286 0,
287 )?;
288 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0);
289
290 // put it back
291 storage.push(
292 &ActionableEvent::CreateLinks {
293 record_id: RecordId {
294 did: "did:plc:asdf".into(),
295 collection: "app.t.c".into(),
296 rkey: "fdsa".into(),
297 },
298 links: vec![CollectedLink {
299 target: Link::Uri("e.com".into()),
300 path: ".abc.uri".into(),
301 }],
302 },
303 0,
304 )?;
305 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
306 assert_eq!(
307 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
308 1
309 );
310
311 // add another link from this user
312 storage.push(
313 &ActionableEvent::CreateLinks {
314 record_id: RecordId {
315 did: "did:plc:asdf".into(),
316 collection: "app.t.c".into(),
317 rkey: "fdsa2".into(),
318 },
319 links: vec![CollectedLink {
320 target: Link::Uri("e.com".into()),
321 path: ".abc.uri".into(),
322 }],
323 },
324 0,
325 )?;
326 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2);
327 assert_eq!(
328 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
329 1
330 );
331
332 // add a link from someone else
333 storage.push(
334 &ActionableEvent::CreateLinks {
335 record_id: RecordId {
336 did: "did:plc:asdfasdf".into(),
337 collection: "app.t.c".into(),
338 rkey: "fdsa".into(),
339 },
340 links: vec![CollectedLink {
341 target: Link::Uri("e.com".into()),
342 path: ".abc.uri".into(),
343 }],
344 },
345 0,
346 )?;
347 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 3);
348 assert_eq!(
349 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
350 2
351 );
352
353 // aaaand delete the first one again
354 storage.push(
355 &ActionableEvent::DeleteRecord(RecordId {
356 did: "did:plc:asdf".into(),
357 collection: "app.t.c".into(),
358 rkey: "fdsa".into(),
359 }),
360 0,
361 )?;
362 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2);
363 assert_eq!(
364 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
365 2
366 );
367 assert_stats(storage.get_stats()?, 2..=2, 1..=1, 2..=2);
368 });
369
370 test_each_storage!(test_two_user_links_delete_one, |storage| {
371 // create the first link
372 storage.push(
373 &ActionableEvent::CreateLinks {
374 record_id: RecordId {
375 did: "did:plc:asdf".into(),
376 collection: "app.t.c".into(),
377 rkey: "A".into(),
378 },
379 links: vec![CollectedLink {
380 target: Link::Uri("e.com".into()),
381 path: ".abc.uri".into(),
382 }],
383 },
384 0,
385 )?;
386 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
387 assert_eq!(
388 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
389 1
390 );
391
392 // create the second link (same user, different rkey)
393 storage.push(
394 &ActionableEvent::CreateLinks {
395 record_id: RecordId {
396 did: "did:plc:asdf".into(),
397 collection: "app.t.c".into(),
398 rkey: "B".into(),
399 },
400 links: vec![CollectedLink {
401 target: Link::Uri("e.com".into()),
402 path: ".abc.uri".into(),
403 }],
404 },
405 0,
406 )?;
407 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2);
408 assert_eq!(
409 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
410 1
411 );
412
413 // aaaand delete the first link
414 storage.push(
415 &ActionableEvent::DeleteRecord(RecordId {
416 did: "did:plc:asdf".into(),
417 collection: "app.t.c".into(),
418 rkey: "A".into(),
419 }),
420 0,
421 )?;
422 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
423 assert_eq!(
424 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
425 1
426 );
427 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1);
428 });
429
430 test_each_storage!(test_accounts, |storage| {
431 // create two links
432 storage.push(
433 &ActionableEvent::CreateLinks {
434 record_id: RecordId {
435 did: "did:plc:asdf".into(),
436 collection: "app.t.c".into(),
437 rkey: "A".into(),
438 },
439 links: vec![CollectedLink {
440 target: Link::Uri("a.com".into()),
441 path: ".abc.uri".into(),
442 }],
443 },
444 0,
445 )?;
446 storage.push(
447 &ActionableEvent::CreateLinks {
448 record_id: RecordId {
449 did: "did:plc:asdf".into(),
450 collection: "app.t.c".into(),
451 rkey: "B".into(),
452 },
453 links: vec![CollectedLink {
454 target: Link::Uri("b.com".into()),
455 path: ".abc.uri".into(),
456 }],
457 },
458 0,
459 )?;
460 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1);
461 assert_eq!(storage.get_count("b.com", "app.t.c", ".abc.uri")?, 1);
462
463 // and a third from a different account
464 storage.push(
465 &ActionableEvent::CreateLinks {
466 record_id: RecordId {
467 did: "did:plc:fdsa".into(),
468 collection: "app.t.c".into(),
469 rkey: "A".into(),
470 },
471 links: vec![CollectedLink {
472 target: Link::Uri("a.com".into()),
473 path: ".abc.uri".into(),
474 }],
475 },
476 0,
477 )?;
478 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 2);
479
480 // delete the first account
481 storage.push(&ActionableEvent::DeleteAccount("did:plc:asdf".into()), 0)?;
482 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1);
483 assert_eq!(storage.get_count("b.com", "app.t.c", ".abc.uri")?, 0);
484 assert_stats(storage.get_stats()?, 1..=2, 2..=2, 1..=1);
485 });
486
487 test_each_storage!(multi_link, |storage| {
488 storage.push(
489 &ActionableEvent::CreateLinks {
490 record_id: RecordId {
491 did: "did:plc:asdf".into(),
492 collection: "app.t.c".into(),
493 rkey: "fdsa".into(),
494 },
495 links: vec![
496 CollectedLink {
497 target: Link::Uri("e.com".into()),
498 path: ".abc.uri".into(),
499 },
500 CollectedLink {
501 target: Link::Uri("f.com".into()),
502 path: ".xyz[].uri".into(),
503 },
504 CollectedLink {
505 target: Link::Uri("g.com".into()),
506 path: ".xyz[].uri".into(),
507 },
508 ],
509 },
510 0,
511 )?;
512 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
513 assert_eq!(
514 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
515 1
516 );
517 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1);
518 assert_eq!(
519 storage.get_distinct_did_count("f.com", "app.t.c", ".xyz[].uri")?,
520 1
521 );
522 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 1);
523 assert_eq!(
524 storage.get_distinct_did_count("g.com", "app.t.c", ".xyz[].uri")?,
525 1
526 );
527
528 storage.push(
529 &ActionableEvent::DeleteRecord(RecordId {
530 did: "did:plc:asdf".into(),
531 collection: "app.t.c".into(),
532 rkey: "fdsa".into(),
533 }),
534 0,
535 )?;
536 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0);
537 assert_eq!(
538 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
539 0
540 );
541 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 0);
542 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 0);
543 assert_stats(storage.get_stats()?, 1..=1, 3..=3, 0..=0);
544 });
545
546 test_each_storage!(update_link, |storage| {
547 // create the links
548 storage.push(
549 &ActionableEvent::CreateLinks {
550 record_id: RecordId {
551 did: "did:plc:asdf".into(),
552 collection: "app.t.c".into(),
553 rkey: "fdsa".into(),
554 },
555 links: vec![
556 CollectedLink {
557 target: Link::Uri("e.com".into()),
558 path: ".abc.uri".into(),
559 },
560 CollectedLink {
561 target: Link::Uri("f.com".into()),
562 path: ".xyz[].uri".into(),
563 },
564 CollectedLink {
565 target: Link::Uri("g.com".into()),
566 path: ".xyz[].uri".into(),
567 },
568 ],
569 },
570 0,
571 )?;
572 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
573 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1);
574 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 1);
575
576 // update them
577 storage.push(
578 &ActionableEvent::UpdateLinks {
579 record_id: RecordId {
580 did: "did:plc:asdf".into(),
581 collection: "app.t.c".into(),
582 rkey: "fdsa".into(),
583 },
584 new_links: vec![
585 CollectedLink {
586 target: Link::Uri("h.com".into()),
587 path: ".abc.uri".into(),
588 },
589 CollectedLink {
590 target: Link::Uri("f.com".into()),
591 path: ".xyz[].uri".into(),
592 },
593 CollectedLink {
594 target: Link::Uri("i.com".into()),
595 path: ".xyz[].uri".into(),
596 },
597 ],
598 },
599 0,
600 )?;
601 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0);
602 assert_eq!(storage.get_count("h.com", "app.t.c", ".abc.uri")?, 1);
603 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1);
604 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 0);
605 assert_eq!(storage.get_count("i.com", "app.t.c", ".xyz[].uri")?, 1);
606 assert_stats(storage.get_stats()?, 1..=1, 5..=5, 1..=1);
607 });
608
609 test_each_storage!(update_no_links_to_links, |storage| {
610 // update without prior create (consumer would have filtered out the original)
611 storage.push(
612 &ActionableEvent::UpdateLinks {
613 record_id: RecordId {
614 did: "did:plc:asdf".into(),
615 collection: "app.t.c".into(),
616 rkey: "asdf".into(),
617 },
618 new_links: vec![CollectedLink {
619 target: Link::Uri("a.com".into()),
620 path: ".abc.uri".into(),
621 }],
622 },
623 0,
624 )?;
625 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1);
626 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1);
627 });
628
629 test_each_storage!(delete_multi_link_same_target, |storage| {
630 storage.push(
631 &ActionableEvent::CreateLinks {
632 record_id: RecordId {
633 did: "did:plc:asdf".into(),
634 collection: "app.t.c".into(),
635 rkey: "asdf".into(),
636 },
637 links: vec![
638 CollectedLink {
639 target: Link::Uri("a.com".into()),
640 path: ".abc.uri".into(),
641 },
642 CollectedLink {
643 target: Link::Uri("a.com".into()),
644 path: ".def.uri".into(),
645 },
646 ],
647 },
648 0,
649 )?;
650 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1);
651 assert_eq!(storage.get_count("a.com", "app.t.c", ".def.uri")?, 1);
652
653 storage.push(
654 &ActionableEvent::DeleteRecord(RecordId {
655 did: "did:plc:asdf".into(),
656 collection: "app.t.c".into(),
657 rkey: "asdf".into(),
658 }),
659 0,
660 )?;
661 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 0);
662 assert_eq!(storage.get_count("a.com", "app.t.c", ".def.uri")?, 0);
663 assert_stats(storage.get_stats()?, 1..=1, 2..=2, 0..=0);
664 });
665
666 test_each_storage!(get_links_basic, |storage| {
667 storage.push(
668 &ActionableEvent::CreateLinks {
669 record_id: RecordId {
670 did: "did:plc:asdf".into(),
671 collection: "app.t.c".into(),
672 rkey: "asdf".into(),
673 },
674 links: vec![CollectedLink {
675 target: Link::Uri("a.com".into()),
676 path: ".abc.uri".into(),
677 }],
678 },
679 0,
680 )?;
681 assert_eq!(
682 storage.get_links(
683 "a.com",
684 "app.t.c",
685 ".abc.uri",
686 100,
687 None,
688 &HashSet::default()
689 )?,
690 PagedAppendingCollection {
691 version: (1, 0),
692 items: vec![RecordId {
693 did: "did:plc:asdf".into(),
694 collection: "app.t.c".into(),
695 rkey: "asdf".into(),
696 }],
697 next: None,
698 total: 1,
699 }
700 );
701 assert_eq!(
702 storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?,
703 PagedAppendingCollection {
704 version: (1, 0),
705 items: vec!["did:plc:asdf".into()],
706 next: None,
707 total: 1,
708 }
709 );
710 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1);
711 });
712
713 test_each_storage!(get_links_paged, |storage| {
714 for i in 1..=5 {
715 storage.push(
716 &ActionableEvent::CreateLinks {
717 record_id: RecordId {
718 did: format!("did:plc:asdf-{i}").into(),
719 collection: "app.t.c".into(),
720 rkey: "asdf".into(),
721 },
722 links: vec![CollectedLink {
723 target: Link::Uri("a.com".into()),
724 path: ".abc.uri".into(),
725 }],
726 },
727 0,
728 )?;
729 }
730 let links =
731 storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?;
732 let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, None)?;
733 assert_eq!(
734 links,
735 PagedAppendingCollection {
736 version: (5, 0),
737 items: vec![
738 RecordId {
739 did: "did:plc:asdf-5".into(),
740 collection: "app.t.c".into(),
741 rkey: "asdf".into(),
742 },
743 RecordId {
744 did: "did:plc:asdf-4".into(),
745 collection: "app.t.c".into(),
746 rkey: "asdf".into(),
747 },
748 ],
749 next: Some(3),
750 total: 5,
751 }
752 );
753 assert_eq!(
754 dids,
755 PagedAppendingCollection {
756 version: (5, 0),
757 items: vec!["did:plc:asdf-5".into(), "did:plc:asdf-4".into()],
758 next: Some(3),
759 total: 5,
760 }
761 );
762 let links = storage.get_links(
763 "a.com",
764 "app.t.c",
765 ".abc.uri",
766 2,
767 links.next,
768 &HashSet::default(),
769 )?;
770 let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, dids.next)?;
771 assert_eq!(
772 links,
773 PagedAppendingCollection {
774 version: (5, 0),
775 items: vec![
776 RecordId {
777 did: "did:plc:asdf-3".into(),
778 collection: "app.t.c".into(),
779 rkey: "asdf".into(),
780 },
781 RecordId {
782 did: "did:plc:asdf-2".into(),
783 collection: "app.t.c".into(),
784 rkey: "asdf".into(),
785 },
786 ],
787 next: Some(1),
788 total: 5,
789 }
790 );
791 assert_eq!(
792 dids,
793 PagedAppendingCollection {
794 version: (5, 0),
795 items: vec!["did:plc:asdf-3".into(), "did:plc:asdf-2".into()],
796 next: Some(1),
797 total: 5,
798 }
799 );
800 let links = storage.get_links(
801 "a.com",
802 "app.t.c",
803 ".abc.uri",
804 2,
805 links.next,
806 &HashSet::default(),
807 )?;
808 let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, dids.next)?;
809 assert_eq!(
810 links,
811 PagedAppendingCollection {
812 version: (5, 0),
813 items: vec![RecordId {
814 did: "did:plc:asdf-1".into(),
815 collection: "app.t.c".into(),
816 rkey: "asdf".into(),
817 },],
818 next: None,
819 total: 5,
820 }
821 );
822 assert_eq!(
823 dids,
824 PagedAppendingCollection {
825 version: (5, 0),
826 items: vec!["did:plc:asdf-1".into()],
827 next: None,
828 total: 5,
829 }
830 );
831 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5);
832 });
833
834 test_each_storage!(get_filtered_links, |storage| {
835 let links = storage.get_links(
836 "a.com",
837 "app.t.c",
838 ".abc.uri",
839 2,
840 None,
841 &HashSet::from([Did("did:plc:linker".to_string())]),
842 )?;
843 assert_eq!(
844 links,
845 PagedAppendingCollection {
846 version: (0, 0),
847 items: vec![],
848 next: None,
849 total: 0,
850 }
851 );
852
853 storage.push(
854 &ActionableEvent::CreateLinks {
855 record_id: RecordId {
856 did: "did:plc:linker".into(),
857 collection: "app.t.c".into(),
858 rkey: "asdf".into(),
859 },
860 links: vec![CollectedLink {
861 target: Link::Uri("a.com".into()),
862 path: ".abc.uri".into(),
863 }],
864 },
865 0,
866 )?;
867
868 let links = storage.get_links(
869 "a.com",
870 "app.t.c",
871 ".abc.uri",
872 2,
873 None,
874 &HashSet::from([Did("did:plc:linker".to_string())]),
875 )?;
876 assert_eq!(
877 links,
878 PagedAppendingCollection {
879 version: (1, 0),
880 items: vec![RecordId {
881 did: "did:plc:linker".into(),
882 collection: "app.t.c".into(),
883 rkey: "asdf".into(),
884 },],
885 next: None,
886 total: 1,
887 }
888 );
889
890 let links = storage.get_links(
891 "a.com",
892 "app.t.c",
893 ".abc.uri",
894 2,
895 None,
896 &HashSet::from([Did("did:plc:someone-else".to_string())]),
897 )?;
898 assert_eq!(
899 links,
900 PagedAppendingCollection {
901 version: (0, 0),
902 items: vec![],
903 next: None,
904 total: 0,
905 }
906 );
907
908 storage.push(
909 &ActionableEvent::CreateLinks {
910 record_id: RecordId {
911 did: "did:plc:linker".into(),
912 collection: "app.t.c".into(),
913 rkey: "asdf-2".into(),
914 },
915 links: vec![CollectedLink {
916 target: Link::Uri("a.com".into()),
917 path: ".abc.uri".into(),
918 }],
919 },
920 0,
921 )?;
922 storage.push(
923 &ActionableEvent::CreateLinks {
924 record_id: RecordId {
925 did: "did:plc:someone-else".into(),
926 collection: "app.t.c".into(),
927 rkey: "asdf".into(),
928 },
929 links: vec![CollectedLink {
930 target: Link::Uri("a.com".into()),
931 path: ".abc.uri".into(),
932 }],
933 },
934 0,
935 )?;
936
937 let links = storage.get_links(
938 "a.com",
939 "app.t.c",
940 ".abc.uri",
941 2,
942 None,
943 &HashSet::from([Did("did:plc:linker".to_string())]),
944 )?;
945 assert_eq!(
946 links,
947 PagedAppendingCollection {
948 version: (2, 0),
949 items: vec![
950 RecordId {
951 did: "did:plc:linker".into(),
952 collection: "app.t.c".into(),
953 rkey: "asdf-2".into(),
954 },
955 RecordId {
956 did: "did:plc:linker".into(),
957 collection: "app.t.c".into(),
958 rkey: "asdf".into(),
959 },
960 ],
961 next: None,
962 total: 2,
963 }
964 );
965
966 let links = storage.get_links(
967 "a.com",
968 "app.t.c",
969 ".abc.uri",
970 2,
971 None,
972 &HashSet::from([
973 Did("did:plc:linker".to_string()),
974 Did("did:plc:someone-else".to_string()),
975 ]),
976 )?;
977 assert_eq!(
978 links,
979 PagedAppendingCollection {
980 version: (3, 0),
981 items: vec![
982 RecordId {
983 did: "did:plc:someone-else".into(),
984 collection: "app.t.c".into(),
985 rkey: "asdf".into(),
986 },
987 RecordId {
988 did: "did:plc:linker".into(),
989 collection: "app.t.c".into(),
990 rkey: "asdf-2".into(),
991 },
992 ],
993 next: Some(1),
994 total: 3,
995 }
996 );
997
998 let links = storage.get_links(
999 "a.com",
1000 "app.t.c",
1001 ".abc.uri",
1002 2,
1003 None,
1004 &HashSet::from([Did("did:plc:someone-unknown".to_string())]),
1005 )?;
1006 assert_eq!(
1007 links,
1008 PagedAppendingCollection {
1009 version: (0, 0),
1010 items: vec![],
1011 next: None,
1012 total: 0,
1013 }
1014 );
1015 });
1016
1017 test_each_storage!(get_links_exact_multiple, |storage| {
1018 for i in 1..=4 {
1019 storage.push(
1020 &ActionableEvent::CreateLinks {
1021 record_id: RecordId {
1022 did: format!("did:plc:asdf-{i}").into(),
1023 collection: "app.t.c".into(),
1024 rkey: "asdf".into(),
1025 },
1026 links: vec![CollectedLink {
1027 target: Link::Uri("a.com".into()),
1028 path: ".abc.uri".into(),
1029 }],
1030 },
1031 0,
1032 )?;
1033 }
1034 let links =
1035 storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?;
1036 assert_eq!(
1037 links,
1038 PagedAppendingCollection {
1039 version: (4, 0),
1040 items: vec![
1041 RecordId {
1042 did: "did:plc:asdf-4".into(),
1043 collection: "app.t.c".into(),
1044 rkey: "asdf".into(),
1045 },
1046 RecordId {
1047 did: "did:plc:asdf-3".into(),
1048 collection: "app.t.c".into(),
1049 rkey: "asdf".into(),
1050 },
1051 ],
1052 next: Some(2),
1053 total: 4,
1054 }
1055 );
1056 let links = storage.get_links(
1057 "a.com",
1058 "app.t.c",
1059 ".abc.uri",
1060 2,
1061 links.next,
1062 &HashSet::default(),
1063 )?;
1064 assert_eq!(
1065 links,
1066 PagedAppendingCollection {
1067 version: (4, 0),
1068 items: vec![
1069 RecordId {
1070 did: "did:plc:asdf-2".into(),
1071 collection: "app.t.c".into(),
1072 rkey: "asdf".into(),
1073 },
1074 RecordId {
1075 did: "did:plc:asdf-1".into(),
1076 collection: "app.t.c".into(),
1077 rkey: "asdf".into(),
1078 },
1079 ],
1080 next: None,
1081 total: 4,
1082 }
1083 );
1084 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 4..=4);
1085 });
1086
1087 test_each_storage!(page_links_while_new_links_arrive, |storage| {
1088 for i in 1..=4 {
1089 storage.push(
1090 &ActionableEvent::CreateLinks {
1091 record_id: RecordId {
1092 did: format!("did:plc:asdf-{i}").into(),
1093 collection: "app.t.c".into(),
1094 rkey: "asdf".into(),
1095 },
1096 links: vec![CollectedLink {
1097 target: Link::Uri("a.com".into()),
1098 path: ".abc.uri".into(),
1099 }],
1100 },
1101 0,
1102 )?;
1103 }
1104 let links =
1105 storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?;
1106 assert_eq!(
1107 links,
1108 PagedAppendingCollection {
1109 version: (4, 0),
1110 items: vec![
1111 RecordId {
1112 did: "did:plc:asdf-4".into(),
1113 collection: "app.t.c".into(),
1114 rkey: "asdf".into(),
1115 },
1116 RecordId {
1117 did: "did:plc:asdf-3".into(),
1118 collection: "app.t.c".into(),
1119 rkey: "asdf".into(),
1120 },
1121 ],
1122 next: Some(2),
1123 total: 4,
1124 }
1125 );
1126 storage.push(
1127 &ActionableEvent::CreateLinks {
1128 record_id: RecordId {
1129 did: "did:plc:asdf-5".into(),
1130 collection: "app.t.c".into(),
1131 rkey: "asdf".into(),
1132 },
1133 links: vec![CollectedLink {
1134 target: Link::Uri("a.com".into()),
1135 path: ".abc.uri".into(),
1136 }],
1137 },
1138 0,
1139 )?;
1140 let links = storage.get_links(
1141 "a.com",
1142 "app.t.c",
1143 ".abc.uri",
1144 2,
1145 links.next,
1146 &HashSet::default(),
1147 )?;
1148 assert_eq!(
1149 links,
1150 PagedAppendingCollection {
1151 version: (5, 0),
1152 items: vec![
1153 RecordId {
1154 did: "did:plc:asdf-2".into(),
1155 collection: "app.t.c".into(),
1156 rkey: "asdf".into(),
1157 },
1158 RecordId {
1159 did: "did:plc:asdf-1".into(),
1160 collection: "app.t.c".into(),
1161 rkey: "asdf".into(),
1162 },
1163 ],
1164 next: None,
1165 total: 5,
1166 }
1167 );
1168 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5);
1169 });
1170
1171 test_each_storage!(page_links_while_some_are_deleted, |storage| {
1172 for i in 1..=4 {
1173 storage.push(
1174 &ActionableEvent::CreateLinks {
1175 record_id: RecordId {
1176 did: format!("did:plc:asdf-{i}").into(),
1177 collection: "app.t.c".into(),
1178 rkey: "asdf".into(),
1179 },
1180 links: vec![CollectedLink {
1181 target: Link::Uri("a.com".into()),
1182 path: ".abc.uri".into(),
1183 }],
1184 },
1185 0,
1186 )?;
1187 }
1188 let links =
1189 storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?;
1190 assert_eq!(
1191 links,
1192 PagedAppendingCollection {
1193 version: (4, 0),
1194 items: vec![
1195 RecordId {
1196 did: "did:plc:asdf-4".into(),
1197 collection: "app.t.c".into(),
1198 rkey: "asdf".into(),
1199 },
1200 RecordId {
1201 did: "did:plc:asdf-3".into(),
1202 collection: "app.t.c".into(),
1203 rkey: "asdf".into(),
1204 },
1205 ],
1206 next: Some(2),
1207 total: 4,
1208 }
1209 );
1210 storage.push(
1211 &ActionableEvent::DeleteRecord(RecordId {
1212 did: "did:plc:asdf-2".into(),
1213 collection: "app.t.c".into(),
1214 rkey: "asdf".into(),
1215 }),
1216 0,
1217 )?;
1218 let links = storage.get_links(
1219 "a.com",
1220 "app.t.c",
1221 ".abc.uri",
1222 2,
1223 links.next,
1224 &HashSet::default(),
1225 )?;
1226 assert_eq!(
1227 links,
1228 PagedAppendingCollection {
1229 version: (4, 1),
1230 items: vec![RecordId {
1231 did: "did:plc:asdf-1".into(),
1232 collection: "app.t.c".into(),
1233 rkey: "asdf".into(),
1234 },],
1235 next: None,
1236 total: 3,
1237 }
1238 );
1239 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 3..=3);
1240 });
1241
1242 test_each_storage!(page_links_accounts_inactive, |storage| {
1243 for i in 1..=4 {
1244 storage.push(
1245 &ActionableEvent::CreateLinks {
1246 record_id: RecordId {
1247 did: format!("did:plc:asdf-{i}").into(),
1248 collection: "app.t.c".into(),
1249 rkey: "asdf".into(),
1250 },
1251 links: vec![CollectedLink {
1252 target: Link::Uri("a.com".into()),
1253 path: ".abc.uri".into(),
1254 }],
1255 },
1256 0,
1257 )?;
1258 }
1259 let links =
1260 storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?;
1261 assert_eq!(
1262 links,
1263 PagedAppendingCollection {
1264 version: (4, 0),
1265 items: vec![
1266 RecordId {
1267 did: "did:plc:asdf-4".into(),
1268 collection: "app.t.c".into(),
1269 rkey: "asdf".into(),
1270 },
1271 RecordId {
1272 did: "did:plc:asdf-3".into(),
1273 collection: "app.t.c".into(),
1274 rkey: "asdf".into(),
1275 },
1276 ],
1277 next: Some(2),
1278 total: 4,
1279 }
1280 );
1281 storage.push(
1282 &ActionableEvent::DeactivateAccount("did:plc:asdf-1".into()),
1283 0,
1284 )?;
1285 let links = storage.get_links(
1286 "a.com",
1287 "app.t.c",
1288 ".abc.uri",
1289 2,
1290 links.next,
1291 &HashSet::default(),
1292 )?;
1293 assert_eq!(
1294 links,
1295 PagedAppendingCollection {
1296 version: (4, 0),
1297 items: vec![RecordId {
1298 did: "did:plc:asdf-2".into(),
1299 collection: "app.t.c".into(),
1300 rkey: "asdf".into(),
1301 },],
1302 next: None,
1303 total: 4,
1304 }
1305 );
1306 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 4..=4);
1307 });
1308
1309 test_each_storage!(get_all_counts, |storage| {
1310 storage.push(
1311 &ActionableEvent::CreateLinks {
1312 record_id: RecordId {
1313 did: "did:plc:asdf".into(),
1314 collection: "app.t.c".into(),
1315 rkey: "asdf".into(),
1316 },
1317 links: vec![
1318 CollectedLink {
1319 target: Link::Uri("a.com".into()),
1320 path: ".abc.uri".into(),
1321 },
1322 CollectedLink {
1323 target: Link::Uri("a.com".into()),
1324 path: ".def.uri".into(),
1325 },
1326 ],
1327 },
1328 0,
1329 )?;
1330 assert_eq!(storage.get_all_record_counts("a.com")?, {
1331 let mut counts = HashMap::new();
1332 let mut t_c_counts = HashMap::new();
1333 t_c_counts.insert(".abc.uri".into(), 1);
1334 t_c_counts.insert(".def.uri".into(), 1);
1335 counts.insert("app.t.c".into(), t_c_counts);
1336 counts
1337 });
1338 assert_eq!(storage.get_all_counts("a.com")?, {
1339 let mut counts = HashMap::new();
1340 let mut t_c_counts = HashMap::new();
1341 t_c_counts.insert(
1342 ".abc.uri".into(),
1343 CountsByCount {
1344 records: 1,
1345 distinct_dids: 1,
1346 },
1347 );
1348 t_c_counts.insert(
1349 ".def.uri".into(),
1350 CountsByCount {
1351 records: 1,
1352 distinct_dids: 1,
1353 },
1354 );
1355 counts.insert("app.t.c".into(), t_c_counts);
1356 counts
1357 });
1358 assert_stats(storage.get_stats()?, 1..=1, 2..=2, 1..=1);
1359 });
1360
1361 //////// many-to-many /////////
1362
1363 test_each_storage!(get_m2m_counts_empty, |storage| {
1364 assert_eq!(
1365 storage.get_many_to_many_counts(
1366 "a.com",
1367 "a.b.c",
1368 ".d.e",
1369 ".f.g",
1370 10,
1371 None,
1372 &HashSet::new(),
1373 &HashSet::new(),
1374 )?,
1375 PagedOrderedCollection {
1376 items: vec![],
1377 next: None,
1378 }
1379 );
1380 });
1381
1382 test_each_storage!(get_m2m_counts_single, |storage| {
1383 storage.push(
1384 &ActionableEvent::CreateLinks {
1385 record_id: RecordId {
1386 did: "did:plc:asdf".into(),
1387 collection: "app.t.c".into(),
1388 rkey: "asdf".into(),
1389 },
1390 links: vec![
1391 CollectedLink {
1392 target: Link::Uri("a.com".into()),
1393 path: ".abc.uri".into(),
1394 },
1395 CollectedLink {
1396 target: Link::Uri("b.com".into()),
1397 path: ".def.uri".into(),
1398 },
1399 CollectedLink {
1400 target: Link::Uri("b.com".into()),
1401 path: ".ghi.uri".into(),
1402 },
1403 ],
1404 },
1405 0,
1406 )?;
1407 assert_eq!(
1408 storage.get_many_to_many_counts(
1409 "a.com",
1410 "app.t.c",
1411 ".abc.uri",
1412 ".def.uri",
1413 10,
1414 None,
1415 &HashSet::new(),
1416 &HashSet::new(),
1417 )?,
1418 PagedOrderedCollection {
1419 items: vec![("b.com".to_string(), 1, 1)],
1420 next: None,
1421 }
1422 );
1423 });
1424
1425 test_each_storage!(get_m2m_counts_filters, |storage| {
1426 storage.push(
1427 &ActionableEvent::CreateLinks {
1428 record_id: RecordId {
1429 did: "did:plc:asdf".into(),
1430 collection: "app.t.c".into(),
1431 rkey: "asdf".into(),
1432 },
1433 links: vec![
1434 CollectedLink {
1435 target: Link::Uri("a.com".into()),
1436 path: ".abc.uri".into(),
1437 },
1438 CollectedLink {
1439 target: Link::Uri("b.com".into()),
1440 path: ".def.uri".into(),
1441 },
1442 ],
1443 },
1444 0,
1445 )?;
1446 storage.push(
1447 &ActionableEvent::CreateLinks {
1448 record_id: RecordId {
1449 did: "did:plc:asdfasdf".into(),
1450 collection: "app.t.c".into(),
1451 rkey: "asdf".into(),
1452 },
1453 links: vec![
1454 CollectedLink {
1455 target: Link::Uri("a.com".into()),
1456 path: ".abc.uri".into(),
1457 },
1458 CollectedLink {
1459 target: Link::Uri("b.com".into()),
1460 path: ".def.uri".into(),
1461 },
1462 ],
1463 },
1464 1,
1465 )?;
1466 storage.push(
1467 &ActionableEvent::CreateLinks {
1468 record_id: RecordId {
1469 did: "did:plc:fdsa".into(),
1470 collection: "app.t.c".into(),
1471 rkey: "asdf".into(),
1472 },
1473 links: vec![
1474 CollectedLink {
1475 target: Link::Uri("a.com".into()),
1476 path: ".abc.uri".into(),
1477 },
1478 CollectedLink {
1479 target: Link::Uri("c.com".into()),
1480 path: ".def.uri".into(),
1481 },
1482 ],
1483 },
1484 2,
1485 )?;
1486 storage.push(
1487 &ActionableEvent::CreateLinks {
1488 record_id: RecordId {
1489 did: "did:plc:fdsa".into(),
1490 collection: "app.t.c".into(),
1491 rkey: "asdf2".into(),
1492 },
1493 links: vec![
1494 CollectedLink {
1495 target: Link::Uri("a.com".into()),
1496 path: ".abc.uri".into(),
1497 },
1498 CollectedLink {
1499 target: Link::Uri("c.com".into()),
1500 path: ".def.uri".into(),
1501 },
1502 ],
1503 },
1504 3,
1505 )?;
1506 assert_eq!(
1507 storage.get_many_to_many_counts(
1508 "a.com",
1509 "app.t.c",
1510 ".abc.uri",
1511 ".def.uri",
1512 10,
1513 None,
1514 &HashSet::new(),
1515 &HashSet::new(),
1516 )?,
1517 PagedOrderedCollection {
1518 items: vec![("b.com".to_string(), 2, 2), ("c.com".to_string(), 2, 1),],
1519 next: None,
1520 }
1521 );
1522 assert_eq!(
1523 storage.get_many_to_many_counts(
1524 "a.com",
1525 "app.t.c",
1526 ".abc.uri",
1527 ".def.uri",
1528 10,
1529 None,
1530 &HashSet::from_iter([Did("did:plc:fdsa".to_string())]),
1531 &HashSet::new(),
1532 )?,
1533 PagedOrderedCollection {
1534 items: vec![("c.com".to_string(), 2, 1),],
1535 next: None,
1536 }
1537 );
1538 assert_eq!(
1539 storage.get_many_to_many_counts(
1540 "a.com",
1541 "app.t.c",
1542 ".abc.uri",
1543 ".def.uri",
1544 10,
1545 None,
1546 &HashSet::new(),
1547 &HashSet::from_iter(["b.com".to_string()]),
1548 )?,
1549 PagedOrderedCollection {
1550 items: vec![("b.com".to_string(), 2, 2),],
1551 next: None,
1552 }
1553 );
1554 });
1555}