forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::{ActionableEvent, CountsByCount, Did, RecordId};
2use anyhow::Result;
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5
6pub mod mem_store;
7pub use mem_store::MemStorage;
8
9#[cfg(feature = "rocks")]
10pub mod rocks_store;
11#[cfg(feature = "rocks")]
12pub use rocks_store::RocksStorage;
13
14#[derive(Debug, PartialEq)]
15pub struct PagedAppendingCollection<T> {
16 pub version: (u64, u64), // (collection length, deleted item count) // TODO: change to (total, active)? since dedups isn't "deleted"
17 pub items: Vec<T>,
18 pub next: Option<u64>,
19 pub total: u64,
20}
21
22#[derive(Debug, Deserialize, Serialize, PartialEq)]
23pub struct StorageStats {
24 /// estimate of how many accounts we've seen create links. the _subjects_ of any links are not represented here.
25 /// for example: new user A follows users B and C. this count will only increment by one, for A.
26 pub dids: u64,
27
28 /// estimate targets * distinct (collection, path)s to reference them.
29 /// distinct targets alone are currently challenging to estimate.
30 pub targetables: u64,
31
32 /// estimate of the count of atproto records seen that contain links.
33 /// records with multiple links are single-counted.
34 /// for LSM stores, deleted links don't decrement this, and updated records with any links will likely increment it.
35 pub linking_records: u64,
36}
37
38pub trait LinkStorage: Send + Sync {
39 /// jetstream cursor from last saved actions, if available
40 fn get_cursor(&mut self) -> Result<Option<u64>> {
41 Ok(None)
42 }
43
44 fn push(&mut self, event: &ActionableEvent, cursor: u64) -> Result<()>;
45
46 // readers are off from the writer instance
47 fn to_readable(&mut self) -> impl LinkReader;
48}
49
50pub trait LinkReader: Clone + Send + Sync + 'static {
51 fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>;
52
53 fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>;
54
55 fn get_links(
56 &self,
57 target: &str,
58 collection: &str,
59 path: &str,
60 limit: u64,
61 until: Option<u64>,
62 ) -> Result<PagedAppendingCollection<RecordId>>;
63
64 fn get_distinct_dids(
65 &self,
66 target: &str,
67 collection: &str,
68 path: &str,
69 limit: u64,
70 until: Option<u64>,
71 ) -> Result<PagedAppendingCollection<Did>>; // TODO: reflect dedups in cursor
72
73 fn get_all_record_counts(&self, _target: &str)
74 -> Result<HashMap<String, HashMap<String, u64>>>;
75
76 fn get_all_counts(
77 &self,
78 _target: &str,
79 ) -> Result<HashMap<String, HashMap<String, CountsByCount>>>;
80
81 /// assume all stats are estimates, since exact counts are very challenging for LSMs
82 fn get_stats(&self) -> Result<StorageStats>;
83}
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88 use links::{CollectedLink, Link};
89 use std::ops::RangeBounds;
90
91 macro_rules! test_each_storage {
92 ($test_name:ident, |$storage_label:ident| $test_code:block) => {
93 #[test]
94 fn $test_name() -> Result<()> {
95 {
96 println!("=> testing with memstorage backend");
97 #[allow(unused_mut)]
98 let mut $storage_label = MemStorage::new();
99 $test_code
100 }
101
102 #[cfg(feature = "rocks")]
103 {
104 println!("=> testing with rocksdb backend");
105 let rocks_db_path = tempfile::tempdir()?;
106 #[allow(unused_mut)]
107 let mut $storage_label = RocksStorage::new(rocks_db_path.path())?;
108 $test_code
109 }
110
111 Ok(())
112 }
113 };
114 }
115
116 fn assert_stats(
117 stats: StorageStats,
118 dids: impl RangeBounds<u64>,
119 targetables: impl RangeBounds<u64>,
120 linking_records: impl RangeBounds<u64>,
121 ) {
122 fn check(name: &str, stat: u64, rb: impl RangeBounds<u64>) {
123 assert!(
124 rb.contains(&stat),
125 "{name:?}: {stat:?} not in range {:?}–{:?}",
126 rb.start_bound(),
127 rb.end_bound()
128 );
129 }
130 check("dids", stats.dids, dids);
131 check("targetables", stats.targetables, targetables);
132 check("linking_records", stats.linking_records, linking_records);
133 }
134
135 test_each_storage!(test_empty, |storage| {
136 assert_eq!(storage.get_count("", "", "")?, 0);
137 assert_eq!(storage.get_count("a", "b", "c")?, 0);
138 assert_eq!(
139 storage.get_count(
140 "at://did:plc:b3rzzkblqsxhr3dgcueymkqe/app.bsky.feed.post/3lf6yc4drhk2f",
141 "app.t.c",
142 ".reply.parent.uri"
143 )?,
144 0
145 );
146 assert_eq!(storage.get_distinct_did_count("", "", "")?, 0);
147 assert_eq!(
148 storage.get_links("a.com", "app.t.c", ".abc.uri", 100, None)?,
149 PagedAppendingCollection {
150 version: (0, 0),
151 items: vec![],
152 next: None,
153 total: 0,
154 }
155 );
156 assert_eq!(
157 storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?,
158 PagedAppendingCollection {
159 version: (0, 0),
160 items: vec![],
161 next: None,
162 total: 0,
163 }
164 );
165 assert_eq!(storage.get_all_counts("bad-example.com")?, HashMap::new());
166 assert_eq!(
167 storage.get_all_record_counts("bad-example.com")?,
168 HashMap::new()
169 );
170
171 assert_stats(storage.get_stats()?, 0..=0, 0..=0, 0..=0);
172 });
173
174 test_each_storage!(test_add_link, |storage| {
175 storage.push(
176 &ActionableEvent::CreateLinks {
177 record_id: RecordId {
178 did: "did:plc:asdf".into(),
179 collection: "app.t.c".into(),
180 rkey: "fdsa".into(),
181 },
182 links: vec![CollectedLink {
183 target: Link::Uri("e.com".into()),
184 path: ".abc.uri".into(),
185 }],
186 },
187 0,
188 )?;
189 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
190 assert_eq!(
191 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
192 1
193 );
194 assert_eq!(storage.get_count("bad.com", "app.t.c", ".abc.uri")?, 0);
195 assert_eq!(storage.get_count("e.com", "app.t.c", ".bad.uri")?, 0);
196 assert_eq!(
197 storage.get_distinct_did_count("e.com", "app.t.c", ".bad.uri")?,
198 0
199 );
200 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1);
201 });
202
203 test_each_storage!(test_links, |storage| {
204 storage.push(
205 &ActionableEvent::CreateLinks {
206 record_id: RecordId {
207 did: "did:plc:asdf".into(),
208 collection: "app.t.c".into(),
209 rkey: "fdsa".into(),
210 },
211 links: vec![CollectedLink {
212 target: Link::Uri("e.com".into()),
213 path: ".abc.uri".into(),
214 }],
215 },
216 0,
217 )?;
218
219 // delete under the wrong collection
220 storage.push(
221 &ActionableEvent::DeleteRecord(RecordId {
222 did: "did:plc:asdf".into(),
223 collection: "app.test.wrongcollection".into(),
224 rkey: "fdsa".into(),
225 }),
226 0,
227 )?;
228 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
229
230 // delete under the wrong rkey
231 storage.push(
232 &ActionableEvent::DeleteRecord(RecordId {
233 did: "did:plc:asdf".into(),
234 collection: "app.t.c".into(),
235 rkey: "wrongkey".into(),
236 }),
237 0,
238 )?;
239 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
240
241 // finally actually delete it
242 storage.push(
243 &ActionableEvent::DeleteRecord(RecordId {
244 did: "did:plc:asdf".into(),
245 collection: "app.t.c".into(),
246 rkey: "fdsa".into(),
247 }),
248 0,
249 )?;
250 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0);
251
252 // put it back
253 storage.push(
254 &ActionableEvent::CreateLinks {
255 record_id: RecordId {
256 did: "did:plc:asdf".into(),
257 collection: "app.t.c".into(),
258 rkey: "fdsa".into(),
259 },
260 links: vec![CollectedLink {
261 target: Link::Uri("e.com".into()),
262 path: ".abc.uri".into(),
263 }],
264 },
265 0,
266 )?;
267 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
268 assert_eq!(
269 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
270 1
271 );
272
273 // add another link from this user
274 storage.push(
275 &ActionableEvent::CreateLinks {
276 record_id: RecordId {
277 did: "did:plc:asdf".into(),
278 collection: "app.t.c".into(),
279 rkey: "fdsa2".into(),
280 },
281 links: vec![CollectedLink {
282 target: Link::Uri("e.com".into()),
283 path: ".abc.uri".into(),
284 }],
285 },
286 0,
287 )?;
288 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2);
289 assert_eq!(
290 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
291 1
292 );
293
294 // add a link from someone else
295 storage.push(
296 &ActionableEvent::CreateLinks {
297 record_id: RecordId {
298 did: "did:plc:asdfasdf".into(),
299 collection: "app.t.c".into(),
300 rkey: "fdsa".into(),
301 },
302 links: vec![CollectedLink {
303 target: Link::Uri("e.com".into()),
304 path: ".abc.uri".into(),
305 }],
306 },
307 0,
308 )?;
309 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 3);
310 assert_eq!(
311 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
312 2
313 );
314
315 // aaaand delete the first one again
316 storage.push(
317 &ActionableEvent::DeleteRecord(RecordId {
318 did: "did:plc:asdf".into(),
319 collection: "app.t.c".into(),
320 rkey: "fdsa".into(),
321 }),
322 0,
323 )?;
324 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2);
325 assert_eq!(
326 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
327 2
328 );
329 assert_stats(storage.get_stats()?, 2..=2, 1..=1, 2..=2);
330 });
331
332 test_each_storage!(test_two_user_links_delete_one, |storage| {
333 // create the first link
334 storage.push(
335 &ActionableEvent::CreateLinks {
336 record_id: RecordId {
337 did: "did:plc:asdf".into(),
338 collection: "app.t.c".into(),
339 rkey: "A".into(),
340 },
341 links: vec![CollectedLink {
342 target: Link::Uri("e.com".into()),
343 path: ".abc.uri".into(),
344 }],
345 },
346 0,
347 )?;
348 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
349 assert_eq!(
350 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
351 1
352 );
353
354 // create the second link (same user, different rkey)
355 storage.push(
356 &ActionableEvent::CreateLinks {
357 record_id: RecordId {
358 did: "did:plc:asdf".into(),
359 collection: "app.t.c".into(),
360 rkey: "B".into(),
361 },
362 links: vec![CollectedLink {
363 target: Link::Uri("e.com".into()),
364 path: ".abc.uri".into(),
365 }],
366 },
367 0,
368 )?;
369 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2);
370 assert_eq!(
371 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
372 1
373 );
374
375 // aaaand delete the first link
376 storage.push(
377 &ActionableEvent::DeleteRecord(RecordId {
378 did: "did:plc:asdf".into(),
379 collection: "app.t.c".into(),
380 rkey: "A".into(),
381 }),
382 0,
383 )?;
384 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
385 assert_eq!(
386 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
387 1
388 );
389 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1);
390 });
391
392 test_each_storage!(test_accounts, |storage| {
393 // create two links
394 storage.push(
395 &ActionableEvent::CreateLinks {
396 record_id: RecordId {
397 did: "did:plc:asdf".into(),
398 collection: "app.t.c".into(),
399 rkey: "A".into(),
400 },
401 links: vec![CollectedLink {
402 target: Link::Uri("a.com".into()),
403 path: ".abc.uri".into(),
404 }],
405 },
406 0,
407 )?;
408 storage.push(
409 &ActionableEvent::CreateLinks {
410 record_id: RecordId {
411 did: "did:plc:asdf".into(),
412 collection: "app.t.c".into(),
413 rkey: "B".into(),
414 },
415 links: vec![CollectedLink {
416 target: Link::Uri("b.com".into()),
417 path: ".abc.uri".into(),
418 }],
419 },
420 0,
421 )?;
422 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1);
423 assert_eq!(storage.get_count("b.com", "app.t.c", ".abc.uri")?, 1);
424
425 // and a third from a different account
426 storage.push(
427 &ActionableEvent::CreateLinks {
428 record_id: RecordId {
429 did: "did:plc:fdsa".into(),
430 collection: "app.t.c".into(),
431 rkey: "A".into(),
432 },
433 links: vec![CollectedLink {
434 target: Link::Uri("a.com".into()),
435 path: ".abc.uri".into(),
436 }],
437 },
438 0,
439 )?;
440 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 2);
441
442 // delete the first account
443 storage.push(&ActionableEvent::DeleteAccount("did:plc:asdf".into()), 0)?;
444 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1);
445 assert_eq!(storage.get_count("b.com", "app.t.c", ".abc.uri")?, 0);
446 assert_stats(storage.get_stats()?, 1..=2, 2..=2, 1..=1);
447 });
448
449 test_each_storage!(multi_link, |storage| {
450 storage.push(
451 &ActionableEvent::CreateLinks {
452 record_id: RecordId {
453 did: "did:plc:asdf".into(),
454 collection: "app.t.c".into(),
455 rkey: "fdsa".into(),
456 },
457 links: vec![
458 CollectedLink {
459 target: Link::Uri("e.com".into()),
460 path: ".abc.uri".into(),
461 },
462 CollectedLink {
463 target: Link::Uri("f.com".into()),
464 path: ".xyz[].uri".into(),
465 },
466 CollectedLink {
467 target: Link::Uri("g.com".into()),
468 path: ".xyz[].uri".into(),
469 },
470 ],
471 },
472 0,
473 )?;
474 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
475 assert_eq!(
476 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
477 1
478 );
479 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1);
480 assert_eq!(
481 storage.get_distinct_did_count("f.com", "app.t.c", ".xyz[].uri")?,
482 1
483 );
484 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 1);
485 assert_eq!(
486 storage.get_distinct_did_count("g.com", "app.t.c", ".xyz[].uri")?,
487 1
488 );
489
490 storage.push(
491 &ActionableEvent::DeleteRecord(RecordId {
492 did: "did:plc:asdf".into(),
493 collection: "app.t.c".into(),
494 rkey: "fdsa".into(),
495 }),
496 0,
497 )?;
498 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0);
499 assert_eq!(
500 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?,
501 0
502 );
503 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 0);
504 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 0);
505 assert_stats(storage.get_stats()?, 1..=1, 3..=3, 0..=0);
506 });
507
508 test_each_storage!(update_link, |storage| {
509 // create the links
510 storage.push(
511 &ActionableEvent::CreateLinks {
512 record_id: RecordId {
513 did: "did:plc:asdf".into(),
514 collection: "app.t.c".into(),
515 rkey: "fdsa".into(),
516 },
517 links: vec![
518 CollectedLink {
519 target: Link::Uri("e.com".into()),
520 path: ".abc.uri".into(),
521 },
522 CollectedLink {
523 target: Link::Uri("f.com".into()),
524 path: ".xyz[].uri".into(),
525 },
526 CollectedLink {
527 target: Link::Uri("g.com".into()),
528 path: ".xyz[].uri".into(),
529 },
530 ],
531 },
532 0,
533 )?;
534 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1);
535 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1);
536 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 1);
537
538 // update them
539 storage.push(
540 &ActionableEvent::UpdateLinks {
541 record_id: RecordId {
542 did: "did:plc:asdf".into(),
543 collection: "app.t.c".into(),
544 rkey: "fdsa".into(),
545 },
546 new_links: vec![
547 CollectedLink {
548 target: Link::Uri("h.com".into()),
549 path: ".abc.uri".into(),
550 },
551 CollectedLink {
552 target: Link::Uri("f.com".into()),
553 path: ".xyz[].uri".into(),
554 },
555 CollectedLink {
556 target: Link::Uri("i.com".into()),
557 path: ".xyz[].uri".into(),
558 },
559 ],
560 },
561 0,
562 )?;
563 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0);
564 assert_eq!(storage.get_count("h.com", "app.t.c", ".abc.uri")?, 1);
565 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1);
566 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 0);
567 assert_eq!(storage.get_count("i.com", "app.t.c", ".xyz[].uri")?, 1);
568 assert_stats(storage.get_stats()?, 1..=1, 5..=5, 1..=1);
569 });
570
571 test_each_storage!(update_no_links_to_links, |storage| {
572 // update without prior create (consumer would have filtered out the original)
573 storage.push(
574 &ActionableEvent::UpdateLinks {
575 record_id: RecordId {
576 did: "did:plc:asdf".into(),
577 collection: "app.t.c".into(),
578 rkey: "asdf".into(),
579 },
580 new_links: vec![CollectedLink {
581 target: Link::Uri("a.com".into()),
582 path: ".abc.uri".into(),
583 }],
584 },
585 0,
586 )?;
587 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1);
588 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1);
589 });
590
591 test_each_storage!(delete_multi_link_same_target, |storage| {
592 storage.push(
593 &ActionableEvent::CreateLinks {
594 record_id: RecordId {
595 did: "did:plc:asdf".into(),
596 collection: "app.t.c".into(),
597 rkey: "asdf".into(),
598 },
599 links: vec![
600 CollectedLink {
601 target: Link::Uri("a.com".into()),
602 path: ".abc.uri".into(),
603 },
604 CollectedLink {
605 target: Link::Uri("a.com".into()),
606 path: ".def.uri".into(),
607 },
608 ],
609 },
610 0,
611 )?;
612 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1);
613 assert_eq!(storage.get_count("a.com", "app.t.c", ".def.uri")?, 1);
614
615 storage.push(
616 &ActionableEvent::DeleteRecord(RecordId {
617 did: "did:plc:asdf".into(),
618 collection: "app.t.c".into(),
619 rkey: "asdf".into(),
620 }),
621 0,
622 )?;
623 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 0);
624 assert_eq!(storage.get_count("a.com", "app.t.c", ".def.uri")?, 0);
625 assert_stats(storage.get_stats()?, 1..=1, 2..=2, 0..=0);
626 });
627
628 test_each_storage!(get_links_basic, |storage| {
629 storage.push(
630 &ActionableEvent::CreateLinks {
631 record_id: RecordId {
632 did: "did:plc:asdf".into(),
633 collection: "app.t.c".into(),
634 rkey: "asdf".into(),
635 },
636 links: vec![CollectedLink {
637 target: Link::Uri("a.com".into()),
638 path: ".abc.uri".into(),
639 }],
640 },
641 0,
642 )?;
643 assert_eq!(
644 storage.get_links("a.com", "app.t.c", ".abc.uri", 100, None)?,
645 PagedAppendingCollection {
646 version: (1, 0),
647 items: vec![RecordId {
648 did: "did:plc:asdf".into(),
649 collection: "app.t.c".into(),
650 rkey: "asdf".into(),
651 }],
652 next: None,
653 total: 1,
654 }
655 );
656 assert_eq!(
657 storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?,
658 PagedAppendingCollection {
659 version: (1, 0),
660 items: vec!["did:plc:asdf".into()],
661 next: None,
662 total: 1,
663 }
664 );
665 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1);
666 });
667
668 test_each_storage!(get_links_paged, |storage| {
669 for i in 1..=5 {
670 storage.push(
671 &ActionableEvent::CreateLinks {
672 record_id: RecordId {
673 did: format!("did:plc:asdf-{i}").into(),
674 collection: "app.t.c".into(),
675 rkey: "asdf".into(),
676 },
677 links: vec![CollectedLink {
678 target: Link::Uri("a.com".into()),
679 path: ".abc.uri".into(),
680 }],
681 },
682 0,
683 )?;
684 }
685 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None)?;
686 let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, None)?;
687 assert_eq!(
688 links,
689 PagedAppendingCollection {
690 version: (5, 0),
691 items: vec![
692 RecordId {
693 did: "did:plc:asdf-5".into(),
694 collection: "app.t.c".into(),
695 rkey: "asdf".into(),
696 },
697 RecordId {
698 did: "did:plc:asdf-4".into(),
699 collection: "app.t.c".into(),
700 rkey: "asdf".into(),
701 },
702 ],
703 next: Some(3),
704 total: 5,
705 }
706 );
707 assert_eq!(
708 dids,
709 PagedAppendingCollection {
710 version: (5, 0),
711 items: vec!["did:plc:asdf-5".into(), "did:plc:asdf-4".into()],
712 next: Some(3),
713 total: 5,
714 }
715 );
716 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next)?;
717 let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, dids.next)?;
718 assert_eq!(
719 links,
720 PagedAppendingCollection {
721 version: (5, 0),
722 items: vec![
723 RecordId {
724 did: "did:plc:asdf-3".into(),
725 collection: "app.t.c".into(),
726 rkey: "asdf".into(),
727 },
728 RecordId {
729 did: "did:plc:asdf-2".into(),
730 collection: "app.t.c".into(),
731 rkey: "asdf".into(),
732 },
733 ],
734 next: Some(1),
735 total: 5,
736 }
737 );
738 assert_eq!(
739 dids,
740 PagedAppendingCollection {
741 version: (5, 0),
742 items: vec!["did:plc:asdf-3".into(), "did:plc:asdf-2".into()],
743 next: Some(1),
744 total: 5,
745 }
746 );
747 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next)?;
748 let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, dids.next)?;
749 assert_eq!(
750 links,
751 PagedAppendingCollection {
752 version: (5, 0),
753 items: vec![RecordId {
754 did: "did:plc:asdf-1".into(),
755 collection: "app.t.c".into(),
756 rkey: "asdf".into(),
757 },],
758 next: None,
759 total: 5,
760 }
761 );
762 assert_eq!(
763 dids,
764 PagedAppendingCollection {
765 version: (5, 0),
766 items: vec!["did:plc:asdf-1".into()],
767 next: None,
768 total: 5,
769 }
770 );
771 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5);
772 });
773
774 test_each_storage!(get_links_exact_multiple, |storage| {
775 for i in 1..=4 {
776 storage.push(
777 &ActionableEvent::CreateLinks {
778 record_id: RecordId {
779 did: format!("did:plc:asdf-{i}").into(),
780 collection: "app.t.c".into(),
781 rkey: "asdf".into(),
782 },
783 links: vec![CollectedLink {
784 target: Link::Uri("a.com".into()),
785 path: ".abc.uri".into(),
786 }],
787 },
788 0,
789 )?;
790 }
791 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None)?;
792 assert_eq!(
793 links,
794 PagedAppendingCollection {
795 version: (4, 0),
796 items: vec![
797 RecordId {
798 did: "did:plc:asdf-4".into(),
799 collection: "app.t.c".into(),
800 rkey: "asdf".into(),
801 },
802 RecordId {
803 did: "did:plc:asdf-3".into(),
804 collection: "app.t.c".into(),
805 rkey: "asdf".into(),
806 },
807 ],
808 next: Some(2),
809 total: 4,
810 }
811 );
812 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next)?;
813 assert_eq!(
814 links,
815 PagedAppendingCollection {
816 version: (4, 0),
817 items: vec![
818 RecordId {
819 did: "did:plc:asdf-2".into(),
820 collection: "app.t.c".into(),
821 rkey: "asdf".into(),
822 },
823 RecordId {
824 did: "did:plc:asdf-1".into(),
825 collection: "app.t.c".into(),
826 rkey: "asdf".into(),
827 },
828 ],
829 next: None,
830 total: 4,
831 }
832 );
833 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 4..=4);
834 });
835
836 test_each_storage!(page_links_while_new_links_arrive, |storage| {
837 for i in 1..=4 {
838 storage.push(
839 &ActionableEvent::CreateLinks {
840 record_id: RecordId {
841 did: format!("did:plc:asdf-{i}").into(),
842 collection: "app.t.c".into(),
843 rkey: "asdf".into(),
844 },
845 links: vec![CollectedLink {
846 target: Link::Uri("a.com".into()),
847 path: ".abc.uri".into(),
848 }],
849 },
850 0,
851 )?;
852 }
853 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None)?;
854 assert_eq!(
855 links,
856 PagedAppendingCollection {
857 version: (4, 0),
858 items: vec![
859 RecordId {
860 did: "did:plc:asdf-4".into(),
861 collection: "app.t.c".into(),
862 rkey: "asdf".into(),
863 },
864 RecordId {
865 did: "did:plc:asdf-3".into(),
866 collection: "app.t.c".into(),
867 rkey: "asdf".into(),
868 },
869 ],
870 next: Some(2),
871 total: 4,
872 }
873 );
874 storage.push(
875 &ActionableEvent::CreateLinks {
876 record_id: RecordId {
877 did: "did:plc:asdf-5".into(),
878 collection: "app.t.c".into(),
879 rkey: "asdf".into(),
880 },
881 links: vec![CollectedLink {
882 target: Link::Uri("a.com".into()),
883 path: ".abc.uri".into(),
884 }],
885 },
886 0,
887 )?;
888 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next)?;
889 assert_eq!(
890 links,
891 PagedAppendingCollection {
892 version: (5, 0),
893 items: vec![
894 RecordId {
895 did: "did:plc:asdf-2".into(),
896 collection: "app.t.c".into(),
897 rkey: "asdf".into(),
898 },
899 RecordId {
900 did: "did:plc:asdf-1".into(),
901 collection: "app.t.c".into(),
902 rkey: "asdf".into(),
903 },
904 ],
905 next: None,
906 total: 5,
907 }
908 );
909 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5);
910 });
911
912 test_each_storage!(page_links_while_some_are_deleted, |storage| {
913 for i in 1..=4 {
914 storage.push(
915 &ActionableEvent::CreateLinks {
916 record_id: RecordId {
917 did: format!("did:plc:asdf-{i}").into(),
918 collection: "app.t.c".into(),
919 rkey: "asdf".into(),
920 },
921 links: vec![CollectedLink {
922 target: Link::Uri("a.com".into()),
923 path: ".abc.uri".into(),
924 }],
925 },
926 0,
927 )?;
928 }
929 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None)?;
930 assert_eq!(
931 links,
932 PagedAppendingCollection {
933 version: (4, 0),
934 items: vec![
935 RecordId {
936 did: "did:plc:asdf-4".into(),
937 collection: "app.t.c".into(),
938 rkey: "asdf".into(),
939 },
940 RecordId {
941 did: "did:plc:asdf-3".into(),
942 collection: "app.t.c".into(),
943 rkey: "asdf".into(),
944 },
945 ],
946 next: Some(2),
947 total: 4,
948 }
949 );
950 storage.push(
951 &ActionableEvent::DeleteRecord(RecordId {
952 did: "did:plc:asdf-2".into(),
953 collection: "app.t.c".into(),
954 rkey: "asdf".into(),
955 }),
956 0,
957 )?;
958 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next)?;
959 assert_eq!(
960 links,
961 PagedAppendingCollection {
962 version: (4, 1),
963 items: vec![RecordId {
964 did: "did:plc:asdf-1".into(),
965 collection: "app.t.c".into(),
966 rkey: "asdf".into(),
967 },],
968 next: None,
969 total: 3,
970 }
971 );
972 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 3..=3);
973 });
974
975 test_each_storage!(page_links_accounts_inactive, |storage| {
976 for i in 1..=4 {
977 storage.push(
978 &ActionableEvent::CreateLinks {
979 record_id: RecordId {
980 did: format!("did:plc:asdf-{i}").into(),
981 collection: "app.t.c".into(),
982 rkey: "asdf".into(),
983 },
984 links: vec![CollectedLink {
985 target: Link::Uri("a.com".into()),
986 path: ".abc.uri".into(),
987 }],
988 },
989 0,
990 )?;
991 }
992 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None)?;
993 assert_eq!(
994 links,
995 PagedAppendingCollection {
996 version: (4, 0),
997 items: vec![
998 RecordId {
999 did: "did:plc:asdf-4".into(),
1000 collection: "app.t.c".into(),
1001 rkey: "asdf".into(),
1002 },
1003 RecordId {
1004 did: "did:plc:asdf-3".into(),
1005 collection: "app.t.c".into(),
1006 rkey: "asdf".into(),
1007 },
1008 ],
1009 next: Some(2),
1010 total: 4,
1011 }
1012 );
1013 storage.push(
1014 &ActionableEvent::DeactivateAccount("did:plc:asdf-1".into()),
1015 0,
1016 )?;
1017 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next)?;
1018 assert_eq!(
1019 links,
1020 PagedAppendingCollection {
1021 version: (4, 0),
1022 items: vec![RecordId {
1023 did: "did:plc:asdf-2".into(),
1024 collection: "app.t.c".into(),
1025 rkey: "asdf".into(),
1026 },],
1027 next: None,
1028 total: 4,
1029 }
1030 );
1031 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 4..=4);
1032 });
1033
1034 test_each_storage!(get_all_counts, |storage| {
1035 storage.push(
1036 &ActionableEvent::CreateLinks {
1037 record_id: RecordId {
1038 did: "did:plc:asdf".into(),
1039 collection: "app.t.c".into(),
1040 rkey: "asdf".into(),
1041 },
1042 links: vec![
1043 CollectedLink {
1044 target: Link::Uri("a.com".into()),
1045 path: ".abc.uri".into(),
1046 },
1047 CollectedLink {
1048 target: Link::Uri("a.com".into()),
1049 path: ".def.uri".into(),
1050 },
1051 ],
1052 },
1053 0,
1054 )?;
1055 assert_eq!(storage.get_all_record_counts("a.com")?, {
1056 let mut counts = HashMap::new();
1057 let mut t_c_counts = HashMap::new();
1058 t_c_counts.insert(".abc.uri".into(), 1);
1059 t_c_counts.insert(".def.uri".into(), 1);
1060 counts.insert("app.t.c".into(), t_c_counts);
1061 counts
1062 });
1063 assert_eq!(storage.get_all_counts("a.com")?, {
1064 let mut counts = HashMap::new();
1065 let mut t_c_counts = HashMap::new();
1066 t_c_counts.insert(
1067 ".abc.uri".into(),
1068 CountsByCount {
1069 records: 1,
1070 distinct_dids: 1,
1071 },
1072 );
1073 t_c_counts.insert(
1074 ".def.uri".into(),
1075 CountsByCount {
1076 records: 1,
1077 distinct_dids: 1,
1078 },
1079 );
1080 counts.insert("app.t.c".into(), t_c_counts);
1081 counts
1082 });
1083 assert_stats(storage.get_stats()?, 1..=1, 2..=2, 1..=1);
1084 });
1085}