forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::{ActionableEvent, CountsByCount, Did, RecordId};
2use anyhow::Result;
3use serde::{Deserialize, Serialize};
4use std::collections::{HashMap, HashSet};
5
6pub mod mem_store;
7pub use mem_store::MemStorage;
8
9#[cfg(feature = "rocks")]
10pub mod rocks_store;
11#[cfg(feature = "rocks")]
12pub use rocks_store::RocksStorage;
13
14#[derive(Debug, PartialEq)]
15pub struct PagedAppendingCollection<T> {
16 pub version: (u64, u64), // (collection length, deleted item count) // TODO: change to (total, active)? since dedups isn't "deleted"
17 pub items: Vec<T>,
18 pub next: Option<u64>,
19 pub total: u64,
20}
21
22#[derive(Debug, Deserialize, Serialize, PartialEq)]
23pub struct StorageStats {
24 /// estimate of how many accounts we've seen create links. the _subjects_ of any links are not represented here.
25 /// for example: new user A follows users B and C. this count will only increment by one, for A.
26 pub dids: u64,
27
28 /// estimate targets * distinct (collection, path)s to reference them.
29 /// distinct targets alone are currently challenging to estimate.
30 pub targetables: u64,
31
32 /// estimate of the count of atproto records seen that contain links.
33 /// records with multiple links are single-counted.
34 /// for LSM stores, deleted links don't decrement this, and updated records with any links will likely increment it.
35 pub linking_records: u64,
36}
37
38pub trait LinkStorage: Send + Sync {
39 /// jetstream cursor from last saved actions, if available
40 fn get_cursor(&mut self) -> Result<Option<u64>> {
41 Ok(None)
42 }
43
44 fn push(&mut self, event: &ActionableEvent, cursor: u64) -> Result<()>;
45
46 // readers are off from the writer instance
47 fn to_readable(&mut self) -> impl LinkReader;
48}
49
50pub trait LinkReader: Clone + Send + Sync + 'static {
51 fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>;
52
53 fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>;
54
55 fn get_links(
56 &self,
57 target: &str,
58 collection: &str,
59 path: &str,
60 limit: u64,
61 until: Option<u64>,
62 filter_dids: &HashSet<Did>,
63 ) -> Result<PagedAppendingCollection<RecordId>>;
64
65 fn get_distinct_dids(
66 &self,
67 target: &str,
68 collection: &str,
69 path: &str,
70 limit: u64,
71 until: Option<u64>,
72 ) -> Result<PagedAppendingCollection<Did>>; // TODO: reflect dedups in cursor
73
74 fn get_all_record_counts(&self, _target: &str)
75 -> Result<HashMap<String, HashMap<String, u64>>>;
76
77 fn get_all_counts(
78 &self,
79 _target: &str,
80 ) -> Result<HashMap<String, HashMap<String, CountsByCount>>>;
81
82 /// assume all stats are estimates, since exact counts are very challenging for LSMs
83 fn get_stats(&self) -> Result<StorageStats>;
84}
85
86#[cfg(test)]
87mod tests {
88 use super::*;
89 use links::{CollectedLink, Link};
90 use std::ops::RangeBounds;
91
92 macro_rules! test_each_storage {
93 ($test_name:ident, |$storage_label:ident| $test_code:block) => {
94 #[test]
95 fn $test_name() -> Result<()> {
96 {
97 println!("=> testing with memstorage backend");
98 #[allow(unused_mut)]
99 let mut $storage_label = MemStorage::new();
100 $test_code
101 }
102
103 #[cfg(feature = "rocks")]
104 {
105 println!("=> testing with rocksdb backend");
106 let rocks_db_path = tempfile::tempdir()?;
107 #[allow(unused_mut)]
108 let mut $storage_label = RocksStorage::new(rocks_db_path.path())?;
109 $test_code
110 }
111
112 Ok(())
113 }
114 };
115 }
116
117 fn assert_stats(
118 stats: StorageStats,
119 dids: impl RangeBounds<u64>,
120 targetables: impl RangeBounds<u64>,
121 linking_records: impl RangeBounds<u64>,
122 ) {
123 fn check(name: &str, stat: u64, rb: impl RangeBounds<u64>) {
124 assert!(
125 rb.contains(&stat),
126 "{name:?}: {stat:?} not in range {:?}–{:?}",
127 rb.start_bound(),
128 rb.end_bound()
129 );
130 }
131 check("dids", stats.dids, dids);
132 check("targetables", stats.targetables, targetables);
133 check("linking_records", stats.linking_records, linking_records);
134 }
135
136 test_each_storage!(test_empty, |storage| {
137 assert_eq!(storage.get_count("", "", "")?, 0);
138 assert_eq!(storage.get_count("a", "b", "c")?, 0);
139 assert_eq!(
140 storage.get_count(
141 "at://did:plc:b3rzzkblqsxhr3dgcueymkqe/app.bsky.feed.post/3lf6yc4drhk2f",
142 "app.t.c",
143 ".reply.parent.uri"
144 )?,
145 0
146 );
147 assert_eq!(storage.get_distinct_did_count("", "", "")?, 0);
148 assert_eq!(
149 storage.get_links(
150 "a.com",
151 "app.t.c",
152 ".abc.uri",
153 100,
154 None,
155 &HashSet::default()
156 )?,
157 PagedAppendingCollection {
158 version: (0, 0),
159 items: vec![],
160 next: None,
161 total: 0,
162 }
163 );
164 assert_eq!(
165 storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?,
166 PagedAppendingCollection {
167 version: (0, 0),
168 items: vec![],
169 next: None,
170 total: 0,
171 }
172 );
173 assert_eq!(storage.get_all_counts("bad-example.com")?, HashMap::new());
174 assert_eq!(
175 storage.get_all_record_counts("bad-example.com")?,
176 HashMap::new()
177 );
178
179 assert_stats(storage.get_stats()?, 0..=0, 0..=0, 0..=0);
180 });
181
182 test_each_storage!(test_add_link, |storage| {
183 storage.push(
184 &ActionableEvent::CreateLinks {
185 record_id: RecordId {
186 did: "did:plc:asdf".into(),
187 collection: "app.t.c".into(),
188 rkey: "fdsa".into(),
189 },
190 links: vec![CollectedLink {
191 target: Link::Uri("e.com".into()),
192 path: ".abc.uri".into(),
193 }],
194 },
195 0,
196 )?;
197 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
198 assert_eq!(
199 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
200 1
201 );
202 assert_eq!(storage.get_count("bad.com", "app.t.c", ".abc.uri")?, 0);
203 assert_eq!(storage.get_count("e.com", "app.t.c", ".bad.uri")?, 0);
204 assert_eq!(
205 storage.get_distinct_did_count("e.com", "app.t.c", ".bad.uri")?,
206 0
207 );
208 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1);
209 });
210
211 test_each_storage!(test_links, |storage| {
212 storage.push(
213 &ActionableEvent::CreateLinks {
214 record_id: RecordId {
215 did: "did:plc:asdf".into(),
216 collection: "app.t.c".into(),
217 rkey: "fdsa".into(),
218 },
219 links: vec![CollectedLink {
220 target: Link::Uri("e.com".into()),
221 path: ".abc.uri".into(),
222 }],
223 },
224 0,
225 )?;
226
227 // delete under the wrong collection
228 storage.push(
229 &ActionableEvent::DeleteRecord(RecordId {
230 did: "did:plc:asdf".into(),
231 collection: "app.test.wrongcollection".into(),
232 rkey: "fdsa".into(),
233 }),
234 0,
235 )?;
236 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
237
238 // delete under the wrong rkey
239 storage.push(
240 &ActionableEvent::DeleteRecord(RecordId {
241 did: "did:plc:asdf".into(),
242 collection: "app.t.c".into(),
243 rkey: "wrongkey".into(),
244 }),
245 0,
246 )?;
247 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
248
249 // finally actually delete it
250 storage.push(
251 &ActionableEvent::DeleteRecord(RecordId {
252 did: "did:plc:asdf".into(),
253 collection: "app.t.c".into(),
254 rkey: "fdsa".into(),
255 }),
256 0,
257 )?;
258 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0);
259
260 // put it back
261 storage.push(
262 &ActionableEvent::CreateLinks {
263 record_id: RecordId {
264 did: "did:plc:asdf".into(),
265 collection: "app.t.c".into(),
266 rkey: "fdsa".into(),
267 },
268 links: vec![CollectedLink {
269 target: Link::Uri("e.com".into()),
270 path: ".abc.uri".into(),
271 }],
272 },
273 0,
274 )?;
275 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
276 assert_eq!(
277 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
278 1
279 );
280
281 // add another link from this user
282 storage.push(
283 &ActionableEvent::CreateLinks {
284 record_id: RecordId {
285 did: "did:plc:asdf".into(),
286 collection: "app.t.c".into(),
287 rkey: "fdsa2".into(),
288 },
289 links: vec![CollectedLink {
290 target: Link::Uri("e.com".into()),
291 path: ".abc.uri".into(),
292 }],
293 },
294 0,
295 )?;
296 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2);
297 assert_eq!(
298 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
299 1
300 );
301
302 // add a link from someone else
303 storage.push(
304 &ActionableEvent::CreateLinks {
305 record_id: RecordId {
306 did: "did:plc:asdfasdf".into(),
307 collection: "app.t.c".into(),
308 rkey: "fdsa".into(),
309 },
310 links: vec![CollectedLink {
311 target: Link::Uri("e.com".into()),
312 path: ".abc.uri".into(),
313 }],
314 },
315 0,
316 )?;
317 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 3);
318 assert_eq!(
319 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
320 2
321 );
322
323 // aaaand delete the first one again
324 storage.push(
325 &ActionableEvent::DeleteRecord(RecordId {
326 did: "did:plc:asdf".into(),
327 collection: "app.t.c".into(),
328 rkey: "fdsa".into(),
329 }),
330 0,
331 )?;
332 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2);
333 assert_eq!(
334 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
335 2
336 );
337 assert_stats(storage.get_stats()?, 2..=2, 1..=1, 2..=2);
338 });
339
340 test_each_storage!(test_two_user_links_delete_one, |storage| {
341 // create the first link
342 storage.push(
343 &ActionableEvent::CreateLinks {
344 record_id: RecordId {
345 did: "did:plc:asdf".into(),
346 collection: "app.t.c".into(),
347 rkey: "A".into(),
348 },
349 links: vec![CollectedLink {
350 target: Link::Uri("e.com".into()),
351 path: ".abc.uri".into(),
352 }],
353 },
354 0,
355 )?;
356 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
357 assert_eq!(
358 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
359 1
360 );
361
362 // create the second link (same user, different rkey)
363 storage.push(
364 &ActionableEvent::CreateLinks {
365 record_id: RecordId {
366 did: "did:plc:asdf".into(),
367 collection: "app.t.c".into(),
368 rkey: "B".into(),
369 },
370 links: vec![CollectedLink {
371 target: Link::Uri("e.com".into()),
372 path: ".abc.uri".into(),
373 }],
374 },
375 0,
376 )?;
377 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2);
378 assert_eq!(
379 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
380 1
381 );
382
383 // aaaand delete the first link
384 storage.push(
385 &ActionableEvent::DeleteRecord(RecordId {
386 did: "did:plc:asdf".into(),
387 collection: "app.t.c".into(),
388 rkey: "A".into(),
389 }),
390 0,
391 )?;
392 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
393 assert_eq!(
394 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
395 1
396 );
397 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1);
398 });
399
400 test_each_storage!(test_accounts, |storage| {
401 // create two links
402 storage.push(
403 &ActionableEvent::CreateLinks {
404 record_id: RecordId {
405 did: "did:plc:asdf".into(),
406 collection: "app.t.c".into(),
407 rkey: "A".into(),
408 },
409 links: vec![CollectedLink {
410 target: Link::Uri("a.com".into()),
411 path: ".abc.uri".into(),
412 }],
413 },
414 0,
415 )?;
416 storage.push(
417 &ActionableEvent::CreateLinks {
418 record_id: RecordId {
419 did: "did:plc:asdf".into(),
420 collection: "app.t.c".into(),
421 rkey: "B".into(),
422 },
423 links: vec![CollectedLink {
424 target: Link::Uri("b.com".into()),
425 path: ".abc.uri".into(),
426 }],
427 },
428 0,
429 )?;
430 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1);
431 assert_eq!(storage.get_count("b.com", "app.t.c", ".abc.uri")?, 1);
432
433 // and a third from a different account
434 storage.push(
435 &ActionableEvent::CreateLinks {
436 record_id: RecordId {
437 did: "did:plc:fdsa".into(),
438 collection: "app.t.c".into(),
439 rkey: "A".into(),
440 },
441 links: vec![CollectedLink {
442 target: Link::Uri("a.com".into()),
443 path: ".abc.uri".into(),
444 }],
445 },
446 0,
447 )?;
448 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 2);
449
450 // delete the first account
451 storage.push(&ActionableEvent::DeleteAccount("did:plc:asdf".into()), 0)?;
452 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1);
453 assert_eq!(storage.get_count("b.com", "app.t.c", ".abc.uri")?, 0);
454 assert_stats(storage.get_stats()?, 1..=2, 2..=2, 1..=1);
455 });
456
457 test_each_storage!(multi_link, |storage| {
458 storage.push(
459 &ActionableEvent::CreateLinks {
460 record_id: RecordId {
461 did: "did:plc:asdf".into(),
462 collection: "app.t.c".into(),
463 rkey: "fdsa".into(),
464 },
465 links: vec![
466 CollectedLink {
467 target: Link::Uri("e.com".into()),
468 path: ".abc.uri".into(),
469 },
470 CollectedLink {
471 target: Link::Uri("f.com".into()),
472 path: ".xyz[].uri".into(),
473 },
474 CollectedLink {
475 target: Link::Uri("g.com".into()),
476 path: ".xyz[].uri".into(),
477 },
478 ],
479 },
480 0,
481 )?;
482 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
483 assert_eq!(
484 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
485 1
486 );
487 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1);
488 assert_eq!(
489 storage.get_distinct_did_count("f.com", "app.t.c", ".xyz[].uri")?,
490 1
491 );
492 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 1);
493 assert_eq!(
494 storage.get_distinct_did_count("g.com", "app.t.c", ".xyz[].uri")?,
495 1
496 );
497
498 storage.push(
499 &ActionableEvent::DeleteRecord(RecordId {
500 did: "did:plc:asdf".into(),
501 collection: "app.t.c".into(),
502 rkey: "fdsa".into(),
503 }),
504 0,
505 )?;
506 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0);
507 assert_eq!(
508 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
509 0
510 );
511 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 0);
512 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 0);
513 assert_stats(storage.get_stats()?, 1..=1, 3..=3, 0..=0);
514 });
515
516 test_each_storage!(update_link, |storage| {
517 // create the links
518 storage.push(
519 &ActionableEvent::CreateLinks {
520 record_id: RecordId {
521 did: "did:plc:asdf".into(),
522 collection: "app.t.c".into(),
523 rkey: "fdsa".into(),
524 },
525 links: vec![
526 CollectedLink {
527 target: Link::Uri("e.com".into()),
528 path: ".abc.uri".into(),
529 },
530 CollectedLink {
531 target: Link::Uri("f.com".into()),
532 path: ".xyz[].uri".into(),
533 },
534 CollectedLink {
535 target: Link::Uri("g.com".into()),
536 path: ".xyz[].uri".into(),
537 },
538 ],
539 },
540 0,
541 )?;
542 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
543 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1);
544 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 1);
545
546 // update them
547 storage.push(
548 &ActionableEvent::UpdateLinks {
549 record_id: RecordId {
550 did: "did:plc:asdf".into(),
551 collection: "app.t.c".into(),
552 rkey: "fdsa".into(),
553 },
554 new_links: vec![
555 CollectedLink {
556 target: Link::Uri("h.com".into()),
557 path: ".abc.uri".into(),
558 },
559 CollectedLink {
560 target: Link::Uri("f.com".into()),
561 path: ".xyz[].uri".into(),
562 },
563 CollectedLink {
564 target: Link::Uri("i.com".into()),
565 path: ".xyz[].uri".into(),
566 },
567 ],
568 },
569 0,
570 )?;
571 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0);
572 assert_eq!(storage.get_count("h.com", "app.t.c", ".abc.uri")?, 1);
573 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1);
574 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 0);
575 assert_eq!(storage.get_count("i.com", "app.t.c", ".xyz[].uri")?, 1);
576 assert_stats(storage.get_stats()?, 1..=1, 5..=5, 1..=1);
577 });
578
579 test_each_storage!(update_no_links_to_links, |storage| {
580 // update without prior create (consumer would have filtered out the original)
581 storage.push(
582 &ActionableEvent::UpdateLinks {
583 record_id: RecordId {
584 did: "did:plc:asdf".into(),
585 collection: "app.t.c".into(),
586 rkey: "asdf".into(),
587 },
588 new_links: vec![CollectedLink {
589 target: Link::Uri("a.com".into()),
590 path: ".abc.uri".into(),
591 }],
592 },
593 0,
594 )?;
595 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1);
596 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1);
597 });
598
599 test_each_storage!(delete_multi_link_same_target, |storage| {
600 storage.push(
601 &ActionableEvent::CreateLinks {
602 record_id: RecordId {
603 did: "did:plc:asdf".into(),
604 collection: "app.t.c".into(),
605 rkey: "asdf".into(),
606 },
607 links: vec![
608 CollectedLink {
609 target: Link::Uri("a.com".into()),
610 path: ".abc.uri".into(),
611 },
612 CollectedLink {
613 target: Link::Uri("a.com".into()),
614 path: ".def.uri".into(),
615 },
616 ],
617 },
618 0,
619 )?;
620 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1);
621 assert_eq!(storage.get_count("a.com", "app.t.c", ".def.uri")?, 1);
622
623 storage.push(
624 &ActionableEvent::DeleteRecord(RecordId {
625 did: "did:plc:asdf".into(),
626 collection: "app.t.c".into(),
627 rkey: "asdf".into(),
628 }),
629 0,
630 )?;
631 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 0);
632 assert_eq!(storage.get_count("a.com", "app.t.c", ".def.uri")?, 0);
633 assert_stats(storage.get_stats()?, 1..=1, 2..=2, 0..=0);
634 });
635
636 test_each_storage!(get_links_basic, |storage| {
637 storage.push(
638 &ActionableEvent::CreateLinks {
639 record_id: RecordId {
640 did: "did:plc:asdf".into(),
641 collection: "app.t.c".into(),
642 rkey: "asdf".into(),
643 },
644 links: vec![CollectedLink {
645 target: Link::Uri("a.com".into()),
646 path: ".abc.uri".into(),
647 }],
648 },
649 0,
650 )?;
651 assert_eq!(
652 storage.get_links(
653 "a.com",
654 "app.t.c",
655 ".abc.uri",
656 100,
657 None,
658 &HashSet::default()
659 )?,
660 PagedAppendingCollection {
661 version: (1, 0),
662 items: vec![RecordId {
663 did: "did:plc:asdf".into(),
664 collection: "app.t.c".into(),
665 rkey: "asdf".into(),
666 }],
667 next: None,
668 total: 1,
669 }
670 );
671 assert_eq!(
672 storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?,
673 PagedAppendingCollection {
674 version: (1, 0),
675 items: vec!["did:plc:asdf".into()],
676 next: None,
677 total: 1,
678 }
679 );
680 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1);
681 });
682
683 test_each_storage!(get_links_paged, |storage| {
684 for i in 1..=5 {
685 storage.push(
686 &ActionableEvent::CreateLinks {
687 record_id: RecordId {
688 did: format!("did:plc:asdf-{i}").into(),
689 collection: "app.t.c".into(),
690 rkey: "asdf".into(),
691 },
692 links: vec![CollectedLink {
693 target: Link::Uri("a.com".into()),
694 path: ".abc.uri".into(),
695 }],
696 },
697 0,
698 )?;
699 }
700 let links =
701 storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?;
702 let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, None)?;
703 assert_eq!(
704 links,
705 PagedAppendingCollection {
706 version: (5, 0),
707 items: vec![
708 RecordId {
709 did: "did:plc:asdf-5".into(),
710 collection: "app.t.c".into(),
711 rkey: "asdf".into(),
712 },
713 RecordId {
714 did: "did:plc:asdf-4".into(),
715 collection: "app.t.c".into(),
716 rkey: "asdf".into(),
717 },
718 ],
719 next: Some(3),
720 total: 5,
721 }
722 );
723 assert_eq!(
724 dids,
725 PagedAppendingCollection {
726 version: (5, 0),
727 items: vec!["did:plc:asdf-5".into(), "did:plc:asdf-4".into()],
728 next: Some(3),
729 total: 5,
730 }
731 );
732 let links = storage.get_links(
733 "a.com",
734 "app.t.c",
735 ".abc.uri",
736 2,
737 links.next,
738 &HashSet::default(),
739 )?;
740 let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, dids.next)?;
741 assert_eq!(
742 links,
743 PagedAppendingCollection {
744 version: (5, 0),
745 items: vec![
746 RecordId {
747 did: "did:plc:asdf-3".into(),
748 collection: "app.t.c".into(),
749 rkey: "asdf".into(),
750 },
751 RecordId {
752 did: "did:plc:asdf-2".into(),
753 collection: "app.t.c".into(),
754 rkey: "asdf".into(),
755 },
756 ],
757 next: Some(1),
758 total: 5,
759 }
760 );
761 assert_eq!(
762 dids,
763 PagedAppendingCollection {
764 version: (5, 0),
765 items: vec!["did:plc:asdf-3".into(), "did:plc:asdf-2".into()],
766 next: Some(1),
767 total: 5,
768 }
769 );
770 let links = storage.get_links(
771 "a.com",
772 "app.t.c",
773 ".abc.uri",
774 2,
775 links.next,
776 &HashSet::default(),
777 )?;
778 let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, dids.next)?;
779 assert_eq!(
780 links,
781 PagedAppendingCollection {
782 version: (5, 0),
783 items: vec![RecordId {
784 did: "did:plc:asdf-1".into(),
785 collection: "app.t.c".into(),
786 rkey: "asdf".into(),
787 },],
788 next: None,
789 total: 5,
790 }
791 );
792 assert_eq!(
793 dids,
794 PagedAppendingCollection {
795 version: (5, 0),
796 items: vec!["did:plc:asdf-1".into()],
797 next: None,
798 total: 5,
799 }
800 );
801 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5);
802 });
803
804 test_each_storage!(get_filtered_links, |storage| {
805 let links = storage.get_links(
806 "a.com",
807 "app.t.c",
808 ".abc.uri",
809 2,
810 None,
811 &HashSet::from([Did("did:plc:linker".to_string())]),
812 )?;
813 assert_eq!(
814 links,
815 PagedAppendingCollection {
816 version: (0, 0),
817 items: vec![],
818 next: None,
819 total: 0,
820 }
821 );
822
823 storage.push(
824 &ActionableEvent::CreateLinks {
825 record_id: RecordId {
826 did: "did:plc:linker".into(),
827 collection: "app.t.c".into(),
828 rkey: "asdf".into(),
829 },
830 links: vec![CollectedLink {
831 target: Link::Uri("a.com".into()),
832 path: ".abc.uri".into(),
833 }],
834 },
835 0,
836 )?;
837
838 let links = storage.get_links(
839 "a.com",
840 "app.t.c",
841 ".abc.uri",
842 2,
843 None,
844 &HashSet::from([Did("did:plc:linker".to_string())]),
845 )?;
846 assert_eq!(
847 links,
848 PagedAppendingCollection {
849 version: (1, 0),
850 items: vec![RecordId {
851 did: "did:plc:linker".into(),
852 collection: "app.t.c".into(),
853 rkey: "asdf".into(),
854 },],
855 next: None,
856 total: 1,
857 }
858 );
859
860 let links = storage.get_links(
861 "a.com",
862 "app.t.c",
863 ".abc.uri",
864 2,
865 None,
866 &HashSet::from([Did("did:plc:someone-else".to_string())]),
867 )?;
868 assert_eq!(
869 links,
870 PagedAppendingCollection {
871 version: (0, 0),
872 items: vec![],
873 next: None,
874 total: 0,
875 }
876 );
877
878 storage.push(
879 &ActionableEvent::CreateLinks {
880 record_id: RecordId {
881 did: "did:plc:linker".into(),
882 collection: "app.t.c".into(),
883 rkey: "asdf-2".into(),
884 },
885 links: vec![CollectedLink {
886 target: Link::Uri("a.com".into()),
887 path: ".abc.uri".into(),
888 }],
889 },
890 0,
891 )?;
892 storage.push(
893 &ActionableEvent::CreateLinks {
894 record_id: RecordId {
895 did: "did:plc:someone-else".into(),
896 collection: "app.t.c".into(),
897 rkey: "asdf".into(),
898 },
899 links: vec![CollectedLink {
900 target: Link::Uri("a.com".into()),
901 path: ".abc.uri".into(),
902 }],
903 },
904 0,
905 )?;
906
907 let links = storage.get_links(
908 "a.com",
909 "app.t.c",
910 ".abc.uri",
911 2,
912 None,
913 &HashSet::from([Did("did:plc:linker".to_string())]),
914 )?;
915 assert_eq!(
916 links,
917 PagedAppendingCollection {
918 version: (2, 0),
919 items: vec![
920 RecordId {
921 did: "did:plc:linker".into(),
922 collection: "app.t.c".into(),
923 rkey: "asdf-2".into(),
924 },
925 RecordId {
926 did: "did:plc:linker".into(),
927 collection: "app.t.c".into(),
928 rkey: "asdf".into(),
929 },
930 ],
931 next: None,
932 total: 2,
933 }
934 );
935
936 let links = storage.get_links(
937 "a.com",
938 "app.t.c",
939 ".abc.uri",
940 2,
941 None,
942 &HashSet::from([
943 Did("did:plc:linker".to_string()),
944 Did("did:plc:someone-else".to_string()),
945 ]),
946 )?;
947 assert_eq!(
948 links,
949 PagedAppendingCollection {
950 version: (3, 0),
951 items: vec![
952 RecordId {
953 did: "did:plc:someone-else".into(),
954 collection: "app.t.c".into(),
955 rkey: "asdf".into(),
956 },
957 RecordId {
958 did: "did:plc:linker".into(),
959 collection: "app.t.c".into(),
960 rkey: "asdf-2".into(),
961 },
962 ],
963 next: Some(1),
964 total: 3,
965 }
966 );
967
968 let links = storage.get_links(
969 "a.com",
970 "app.t.c",
971 ".abc.uri",
972 2,
973 None,
974 &HashSet::from([Did("did:plc:someone-unknown".to_string())]),
975 )?;
976 assert_eq!(
977 links,
978 PagedAppendingCollection {
979 version: (0, 0),
980 items: vec![],
981 next: None,
982 total: 0,
983 }
984 );
985 });
986
987 test_each_storage!(get_links_exact_multiple, |storage| {
988 for i in 1..=4 {
989 storage.push(
990 &ActionableEvent::CreateLinks {
991 record_id: RecordId {
992 did: format!("did:plc:asdf-{i}").into(),
993 collection: "app.t.c".into(),
994 rkey: "asdf".into(),
995 },
996 links: vec![CollectedLink {
997 target: Link::Uri("a.com".into()),
998 path: ".abc.uri".into(),
999 }],
1000 },
1001 0,
1002 )?;
1003 }
1004 let links =
1005 storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?;
1006 assert_eq!(
1007 links,
1008 PagedAppendingCollection {
1009 version: (4, 0),
1010 items: vec![
1011 RecordId {
1012 did: "did:plc:asdf-4".into(),
1013 collection: "app.t.c".into(),
1014 rkey: "asdf".into(),
1015 },
1016 RecordId {
1017 did: "did:plc:asdf-3".into(),
1018 collection: "app.t.c".into(),
1019 rkey: "asdf".into(),
1020 },
1021 ],
1022 next: Some(2),
1023 total: 4,
1024 }
1025 );
1026 let links = storage.get_links(
1027 "a.com",
1028 "app.t.c",
1029 ".abc.uri",
1030 2,
1031 links.next,
1032 &HashSet::default(),
1033 )?;
1034 assert_eq!(
1035 links,
1036 PagedAppendingCollection {
1037 version: (4, 0),
1038 items: vec![
1039 RecordId {
1040 did: "did:plc:asdf-2".into(),
1041 collection: "app.t.c".into(),
1042 rkey: "asdf".into(),
1043 },
1044 RecordId {
1045 did: "did:plc:asdf-1".into(),
1046 collection: "app.t.c".into(),
1047 rkey: "asdf".into(),
1048 },
1049 ],
1050 next: None,
1051 total: 4,
1052 }
1053 );
1054 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 4..=4);
1055 });
1056
1057 test_each_storage!(page_links_while_new_links_arrive, |storage| {
1058 for i in 1..=4 {
1059 storage.push(
1060 &ActionableEvent::CreateLinks {
1061 record_id: RecordId {
1062 did: format!("did:plc:asdf-{i}").into(),
1063 collection: "app.t.c".into(),
1064 rkey: "asdf".into(),
1065 },
1066 links: vec![CollectedLink {
1067 target: Link::Uri("a.com".into()),
1068 path: ".abc.uri".into(),
1069 }],
1070 },
1071 0,
1072 )?;
1073 }
1074 let links =
1075 storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?;
1076 assert_eq!(
1077 links,
1078 PagedAppendingCollection {
1079 version: (4, 0),
1080 items: vec![
1081 RecordId {
1082 did: "did:plc:asdf-4".into(),
1083 collection: "app.t.c".into(),
1084 rkey: "asdf".into(),
1085 },
1086 RecordId {
1087 did: "did:plc:asdf-3".into(),
1088 collection: "app.t.c".into(),
1089 rkey: "asdf".into(),
1090 },
1091 ],
1092 next: Some(2),
1093 total: 4,
1094 }
1095 );
1096 storage.push(
1097 &ActionableEvent::CreateLinks {
1098 record_id: RecordId {
1099 did: "did:plc:asdf-5".into(),
1100 collection: "app.t.c".into(),
1101 rkey: "asdf".into(),
1102 },
1103 links: vec![CollectedLink {
1104 target: Link::Uri("a.com".into()),
1105 path: ".abc.uri".into(),
1106 }],
1107 },
1108 0,
1109 )?;
1110 let links = storage.get_links(
1111 "a.com",
1112 "app.t.c",
1113 ".abc.uri",
1114 2,
1115 links.next,
1116 &HashSet::default(),
1117 )?;
1118 assert_eq!(
1119 links,
1120 PagedAppendingCollection {
1121 version: (5, 0),
1122 items: vec![
1123 RecordId {
1124 did: "did:plc:asdf-2".into(),
1125 collection: "app.t.c".into(),
1126 rkey: "asdf".into(),
1127 },
1128 RecordId {
1129 did: "did:plc:asdf-1".into(),
1130 collection: "app.t.c".into(),
1131 rkey: "asdf".into(),
1132 },
1133 ],
1134 next: None,
1135 total: 5,
1136 }
1137 );
1138 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5);
1139 });
1140
1141 test_each_storage!(page_links_while_some_are_deleted, |storage| {
1142 for i in 1..=4 {
1143 storage.push(
1144 &ActionableEvent::CreateLinks {
1145 record_id: RecordId {
1146 did: format!("did:plc:asdf-{i}").into(),
1147 collection: "app.t.c".into(),
1148 rkey: "asdf".into(),
1149 },
1150 links: vec![CollectedLink {
1151 target: Link::Uri("a.com".into()),
1152 path: ".abc.uri".into(),
1153 }],
1154 },
1155 0,
1156 )?;
1157 }
1158 let links =
1159 storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?;
1160 assert_eq!(
1161 links,
1162 PagedAppendingCollection {
1163 version: (4, 0),
1164 items: vec![
1165 RecordId {
1166 did: "did:plc:asdf-4".into(),
1167 collection: "app.t.c".into(),
1168 rkey: "asdf".into(),
1169 },
1170 RecordId {
1171 did: "did:plc:asdf-3".into(),
1172 collection: "app.t.c".into(),
1173 rkey: "asdf".into(),
1174 },
1175 ],
1176 next: Some(2),
1177 total: 4,
1178 }
1179 );
1180 storage.push(
1181 &ActionableEvent::DeleteRecord(RecordId {
1182 did: "did:plc:asdf-2".into(),
1183 collection: "app.t.c".into(),
1184 rkey: "asdf".into(),
1185 }),
1186 0,
1187 )?;
1188 let links = storage.get_links(
1189 "a.com",
1190 "app.t.c",
1191 ".abc.uri",
1192 2,
1193 links.next,
1194 &HashSet::default(),
1195 )?;
1196 assert_eq!(
1197 links,
1198 PagedAppendingCollection {
1199 version: (4, 1),
1200 items: vec![RecordId {
1201 did: "did:plc:asdf-1".into(),
1202 collection: "app.t.c".into(),
1203 rkey: "asdf".into(),
1204 },],
1205 next: None,
1206 total: 3,
1207 }
1208 );
1209 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 3..=3);
1210 });
1211
1212 test_each_storage!(page_links_accounts_inactive, |storage| {
1213 for i in 1..=4 {
1214 storage.push(
1215 &ActionableEvent::CreateLinks {
1216 record_id: RecordId {
1217 did: format!("did:plc:asdf-{i}").into(),
1218 collection: "app.t.c".into(),
1219 rkey: "asdf".into(),
1220 },
1221 links: vec![CollectedLink {
1222 target: Link::Uri("a.com".into()),
1223 path: ".abc.uri".into(),
1224 }],
1225 },
1226 0,
1227 )?;
1228 }
1229 let links =
1230 storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?;
1231 assert_eq!(
1232 links,
1233 PagedAppendingCollection {
1234 version: (4, 0),
1235 items: vec![
1236 RecordId {
1237 did: "did:plc:asdf-4".into(),
1238 collection: "app.t.c".into(),
1239 rkey: "asdf".into(),
1240 },
1241 RecordId {
1242 did: "did:plc:asdf-3".into(),
1243 collection: "app.t.c".into(),
1244 rkey: "asdf".into(),
1245 },
1246 ],
1247 next: Some(2),
1248 total: 4,
1249 }
1250 );
1251 storage.push(
1252 &ActionableEvent::DeactivateAccount("did:plc:asdf-1".into()),
1253 0,
1254 )?;
1255 let links = storage.get_links(
1256 "a.com",
1257 "app.t.c",
1258 ".abc.uri",
1259 2,
1260 links.next,
1261 &HashSet::default(),
1262 )?;
1263 assert_eq!(
1264 links,
1265 PagedAppendingCollection {
1266 version: (4, 0),
1267 items: vec![RecordId {
1268 did: "did:plc:asdf-2".into(),
1269 collection: "app.t.c".into(),
1270 rkey: "asdf".into(),
1271 },],
1272 next: None,
1273 total: 4,
1274 }
1275 );
1276 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 4..=4);
1277 });
1278
1279 test_each_storage!(get_all_counts, |storage| {
1280 storage.push(
1281 &ActionableEvent::CreateLinks {
1282 record_id: RecordId {
1283 did: "did:plc:asdf".into(),
1284 collection: "app.t.c".into(),
1285 rkey: "asdf".into(),
1286 },
1287 links: vec![
1288 CollectedLink {
1289 target: Link::Uri("a.com".into()),
1290 path: ".abc.uri".into(),
1291 },
1292 CollectedLink {
1293 target: Link::Uri("a.com".into()),
1294 path: ".def.uri".into(),
1295 },
1296 ],
1297 },
1298 0,
1299 )?;
1300 assert_eq!(storage.get_all_record_counts("a.com")?, {
1301 let mut counts = HashMap::new();
1302 let mut t_c_counts = HashMap::new();
1303 t_c_counts.insert(".abc.uri".into(), 1);
1304 t_c_counts.insert(".def.uri".into(), 1);
1305 counts.insert("app.t.c".into(), t_c_counts);
1306 counts
1307 });
1308 assert_eq!(storage.get_all_counts("a.com")?, {
1309 let mut counts = HashMap::new();
1310 let mut t_c_counts = HashMap::new();
1311 t_c_counts.insert(
1312 ".abc.uri".into(),
1313 CountsByCount {
1314 records: 1,
1315 distinct_dids: 1,
1316 },
1317 );
1318 t_c_counts.insert(
1319 ".def.uri".into(),
1320 CountsByCount {
1321 records: 1,
1322 distinct_dids: 1,
1323 },
1324 );
1325 counts.insert("app.t.c".into(), t_c_counts);
1326 counts
1327 });
1328 assert_stats(storage.get_stats()?, 1..=1, 2..=2, 1..=1);
1329 });
1330}