Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::{ActionableEvent, CountsByCount, Did, RecordId}; 2use anyhow::Result; 3use serde::{Deserialize, Serialize}; 4use std::collections::{HashMap, HashSet}; 5 6pub mod mem_store; 7pub use mem_store::MemStorage; 8 9#[cfg(feature = "rocks")] 10pub mod rocks_store; 11#[cfg(feature = "rocks")] 12pub use rocks_store::RocksStorage; 13 14#[derive(Debug, PartialEq)] 15pub struct PagedAppendingCollection<T> { 16 pub version: (u64, u64), // (collection length, deleted item count) // TODO: change to (total, active)? since dedups isn't "deleted" 17 pub items: Vec<T>, 18 pub next: Option<u64>, 19 pub total: u64, 20} 21 22/// A paged collection whose keys are sorted instead of indexed 23/// 24/// this has weaker guarantees than PagedAppendingCollection: it might 25/// return a totally consistent snapshot. but it should avoid duplicates 26/// and each page should at least be internally consistent. 27#[derive(Debug, PartialEq, Default)] 28pub struct PagedOrderedCollection<T, K: Ord> { 29 pub items: Vec<T>, 30 pub next: Option<K>, 31} 32 33#[derive(Debug, Deserialize, Serialize, PartialEq)] 34pub struct StorageStats { 35 /// estimate of how many accounts we've seen create links. the _subjects_ of any links are not represented here. 36 /// for example: new user A follows users B and C. this count will only increment by one, for A. 37 pub dids: u64, 38 39 /// estimate targets * distinct (collection, path)s to reference them. 40 /// distinct targets alone are currently challenging to estimate. 41 pub targetables: u64, 42 43 /// estimate of the count of atproto records seen that contain links. 44 /// records with multiple links are single-counted. 45 /// for LSM stores, deleted links don't decrement this, and updated records with any links will likely increment it. 46 pub linking_records: u64, 47 48 /// first jetstream cursor when this instance first started 49 pub started_at: Option<u64>, 50 51 /// anything else we want to throw in 52 pub other_data: HashMap<String, u64>, 53} 54 55pub trait LinkStorage: Send + Sync { 56 /// jetstream cursor from last saved actions, if available 57 fn get_cursor(&mut self) -> Result<Option<u64>> { 58 Ok(None) 59 } 60 61 fn push(&mut self, event: &ActionableEvent, cursor: u64) -> Result<()>; 62 63 // readers are off from the writer instance 64 fn to_readable(&mut self) -> impl LinkReader; 65} 66 67pub trait LinkReader: Clone + Send + Sync + 'static { 68 #[allow(clippy::too_many_arguments)] 69 fn get_many_to_many_counts( 70 &self, 71 target: &str, 72 collection: &str, 73 path: &str, 74 path_to_other: &str, 75 limit: u64, 76 after: Option<String>, 77 filter_dids: &HashSet<Did>, 78 filter_to_targets: &HashSet<String>, 79 ) -> Result<PagedOrderedCollection<(String, u64, u64), String>>; 80 81 fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>; 82 83 fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>; 84 85 fn get_links( 86 &self, 87 target: &str, 88 collection: &str, 89 path: &str, 90 limit: u64, 91 until: Option<u64>, 92 filter_dids: &HashSet<Did>, 93 ) -> Result<PagedAppendingCollection<RecordId>>; 94 95 fn get_distinct_dids( 96 &self, 97 target: &str, 98 collection: &str, 99 path: &str, 100 limit: u64, 101 until: Option<u64>, 102 ) -> Result<PagedAppendingCollection<Did>>; // TODO: reflect dedups in cursor 103 104 fn get_all_record_counts(&self, _target: &str) 105 -> Result<HashMap<String, HashMap<String, u64>>>; 106 107 fn get_all_counts( 108 &self, 109 _target: &str, 110 ) -> Result<HashMap<String, HashMap<String, CountsByCount>>>; 111 112 /// assume all stats are estimates, since exact counts are very challenging for LSMs 113 fn get_stats(&self) -> Result<StorageStats>; 114} 115 116#[cfg(test)] 117mod tests { 118 use super::*; 119 use links::{CollectedLink, Link}; 120 use std::ops::RangeBounds; 121 122 macro_rules! test_each_storage { 123 ($test_name:ident, |$storage_label:ident| $test_code:block) => { 124 #[test] 125 fn $test_name() -> Result<()> { 126 { 127 println!("=> testing with memstorage backend"); 128 #[allow(unused_mut)] 129 let mut $storage_label = MemStorage::new(); 130 $test_code 131 } 132 133 #[cfg(feature = "rocks")] 134 { 135 println!("=> testing with rocksdb backend"); 136 let rocks_db_path = tempfile::tempdir()?; 137 #[allow(unused_mut)] 138 let mut $storage_label = RocksStorage::new(rocks_db_path.path())?; 139 $test_code 140 } 141 142 Ok(()) 143 } 144 }; 145 } 146 147 fn assert_stats( 148 stats: StorageStats, 149 dids: impl RangeBounds<u64>, 150 targetables: impl RangeBounds<u64>, 151 linking_records: impl RangeBounds<u64>, 152 ) { 153 fn check(name: &str, stat: u64, rb: impl RangeBounds<u64>) { 154 assert!( 155 rb.contains(&stat), 156 "{name:?}: {stat:?} not in range {:?}–{:?}", 157 rb.start_bound(), 158 rb.end_bound() 159 ); 160 } 161 check("dids", stats.dids, dids); 162 check("targetables", stats.targetables, targetables); 163 check("linking_records", stats.linking_records, linking_records); 164 } 165 166 test_each_storage!(test_empty, |storage| { 167 assert_eq!(storage.get_count("", "", "")?, 0); 168 assert_eq!(storage.get_count("a", "b", "c")?, 0); 169 assert_eq!( 170 storage.get_count( 171 "at://did:plc:b3rzzkblqsxhr3dgcueymkqe/app.bsky.feed.post/3lf6yc4drhk2f", 172 "app.t.c", 173 ".reply.parent.uri" 174 )?, 175 0 176 ); 177 assert_eq!(storage.get_distinct_did_count("", "", "")?, 0); 178 assert_eq!( 179 storage.get_links( 180 "a.com", 181 "app.t.c", 182 ".abc.uri", 183 100, 184 None, 185 &HashSet::default() 186 )?, 187 PagedAppendingCollection { 188 version: (0, 0), 189 items: vec![], 190 next: None, 191 total: 0, 192 } 193 ); 194 assert_eq!( 195 storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?, 196 PagedAppendingCollection { 197 version: (0, 0), 198 items: vec![], 199 next: None, 200 total: 0, 201 } 202 ); 203 assert_eq!(storage.get_all_counts("bad-example.com")?, HashMap::new()); 204 assert_eq!( 205 storage.get_all_record_counts("bad-example.com")?, 206 HashMap::new() 207 ); 208 209 assert_stats(storage.get_stats()?, 0..=0, 0..=0, 0..=0); 210 }); 211 212 test_each_storage!(test_add_link, |storage| { 213 storage.push( 214 &ActionableEvent::CreateLinks { 215 record_id: RecordId { 216 did: "did:plc:asdf".into(), 217 collection: "app.t.c".into(), 218 rkey: "fdsa".into(), 219 }, 220 links: vec![CollectedLink { 221 target: Link::Uri("e.com".into()), 222 path: ".abc.uri".into(), 223 }], 224 }, 225 0, 226 )?; 227 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 228 assert_eq!( 229 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 230 1 231 ); 232 assert_eq!(storage.get_count("bad.com", "app.t.c", ".abc.uri")?, 0); 233 assert_eq!(storage.get_count("e.com", "app.t.c", ".bad.uri")?, 0); 234 assert_eq!( 235 storage.get_distinct_did_count("e.com", "app.t.c", ".bad.uri")?, 236 0 237 ); 238 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1); 239 }); 240 241 test_each_storage!(test_links, |storage| { 242 storage.push( 243 &ActionableEvent::CreateLinks { 244 record_id: RecordId { 245 did: "did:plc:asdf".into(), 246 collection: "app.t.c".into(), 247 rkey: "fdsa".into(), 248 }, 249 links: vec![CollectedLink { 250 target: Link::Uri("e.com".into()), 251 path: ".abc.uri".into(), 252 }], 253 }, 254 0, 255 )?; 256 257 // delete under the wrong collection 258 storage.push( 259 &ActionableEvent::DeleteRecord(RecordId { 260 did: "did:plc:asdf".into(), 261 collection: "app.test.wrongcollection".into(), 262 rkey: "fdsa".into(), 263 }), 264 0, 265 )?; 266 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 267 268 // delete under the wrong rkey 269 storage.push( 270 &ActionableEvent::DeleteRecord(RecordId { 271 did: "did:plc:asdf".into(), 272 collection: "app.t.c".into(), 273 rkey: "wrongkey".into(), 274 }), 275 0, 276 )?; 277 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 278 279 // finally actually delete it 280 storage.push( 281 &ActionableEvent::DeleteRecord(RecordId { 282 did: "did:plc:asdf".into(), 283 collection: "app.t.c".into(), 284 rkey: "fdsa".into(), 285 }), 286 0, 287 )?; 288 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0); 289 290 // put it back 291 storage.push( 292 &ActionableEvent::CreateLinks { 293 record_id: RecordId { 294 did: "did:plc:asdf".into(), 295 collection: "app.t.c".into(), 296 rkey: "fdsa".into(), 297 }, 298 links: vec![CollectedLink { 299 target: Link::Uri("e.com".into()), 300 path: ".abc.uri".into(), 301 }], 302 }, 303 0, 304 )?; 305 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 306 assert_eq!( 307 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 308 1 309 ); 310 311 // add another link from this user 312 storage.push( 313 &ActionableEvent::CreateLinks { 314 record_id: RecordId { 315 did: "did:plc:asdf".into(), 316 collection: "app.t.c".into(), 317 rkey: "fdsa2".into(), 318 }, 319 links: vec![CollectedLink { 320 target: Link::Uri("e.com".into()), 321 path: ".abc.uri".into(), 322 }], 323 }, 324 0, 325 )?; 326 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2); 327 assert_eq!( 328 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 329 1 330 ); 331 332 // add a link from someone else 333 storage.push( 334 &ActionableEvent::CreateLinks { 335 record_id: RecordId { 336 did: "did:plc:asdfasdf".into(), 337 collection: "app.t.c".into(), 338 rkey: "fdsa".into(), 339 }, 340 links: vec![CollectedLink { 341 target: Link::Uri("e.com".into()), 342 path: ".abc.uri".into(), 343 }], 344 }, 345 0, 346 )?; 347 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 3); 348 assert_eq!( 349 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 350 2 351 ); 352 353 // aaaand delete the first one again 354 storage.push( 355 &ActionableEvent::DeleteRecord(RecordId { 356 did: "did:plc:asdf".into(), 357 collection: "app.t.c".into(), 358 rkey: "fdsa".into(), 359 }), 360 0, 361 )?; 362 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2); 363 assert_eq!( 364 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 365 2 366 ); 367 assert_stats(storage.get_stats()?, 2..=2, 1..=1, 2..=2); 368 }); 369 370 test_each_storage!(test_two_user_links_delete_one, |storage| { 371 // create the first link 372 storage.push( 373 &ActionableEvent::CreateLinks { 374 record_id: RecordId { 375 did: "did:plc:asdf".into(), 376 collection: "app.t.c".into(), 377 rkey: "A".into(), 378 }, 379 links: vec![CollectedLink { 380 target: Link::Uri("e.com".into()), 381 path: ".abc.uri".into(), 382 }], 383 }, 384 0, 385 )?; 386 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 387 assert_eq!( 388 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 389 1 390 ); 391 392 // create the second link (same user, different rkey) 393 storage.push( 394 &ActionableEvent::CreateLinks { 395 record_id: RecordId { 396 did: "did:plc:asdf".into(), 397 collection: "app.t.c".into(), 398 rkey: "B".into(), 399 }, 400 links: vec![CollectedLink { 401 target: Link::Uri("e.com".into()), 402 path: ".abc.uri".into(), 403 }], 404 }, 405 0, 406 )?; 407 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2); 408 assert_eq!( 409 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 410 1 411 ); 412 413 // aaaand delete the first link 414 storage.push( 415 &ActionableEvent::DeleteRecord(RecordId { 416 did: "did:plc:asdf".into(), 417 collection: "app.t.c".into(), 418 rkey: "A".into(), 419 }), 420 0, 421 )?; 422 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 423 assert_eq!( 424 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 425 1 426 ); 427 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1); 428 }); 429 430 test_each_storage!(test_accounts, |storage| { 431 // create two links 432 storage.push( 433 &ActionableEvent::CreateLinks { 434 record_id: RecordId { 435 did: "did:plc:asdf".into(), 436 collection: "app.t.c".into(), 437 rkey: "A".into(), 438 }, 439 links: vec![CollectedLink { 440 target: Link::Uri("a.com".into()), 441 path: ".abc.uri".into(), 442 }], 443 }, 444 0, 445 )?; 446 storage.push( 447 &ActionableEvent::CreateLinks { 448 record_id: RecordId { 449 did: "did:plc:asdf".into(), 450 collection: "app.t.c".into(), 451 rkey: "B".into(), 452 }, 453 links: vec![CollectedLink { 454 target: Link::Uri("b.com".into()), 455 path: ".abc.uri".into(), 456 }], 457 }, 458 0, 459 )?; 460 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1); 461 assert_eq!(storage.get_count("b.com", "app.t.c", ".abc.uri")?, 1); 462 463 // and a third from a different account 464 storage.push( 465 &ActionableEvent::CreateLinks { 466 record_id: RecordId { 467 did: "did:plc:fdsa".into(), 468 collection: "app.t.c".into(), 469 rkey: "A".into(), 470 }, 471 links: vec![CollectedLink { 472 target: Link::Uri("a.com".into()), 473 path: ".abc.uri".into(), 474 }], 475 }, 476 0, 477 )?; 478 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 2); 479 480 // delete the first account 481 storage.push(&ActionableEvent::DeleteAccount("did:plc:asdf".into()), 0)?; 482 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1); 483 assert_eq!(storage.get_count("b.com", "app.t.c", ".abc.uri")?, 0); 484 assert_stats(storage.get_stats()?, 1..=2, 2..=2, 1..=1); 485 }); 486 487 test_each_storage!(multi_link, |storage| { 488 storage.push( 489 &ActionableEvent::CreateLinks { 490 record_id: RecordId { 491 did: "did:plc:asdf".into(), 492 collection: "app.t.c".into(), 493 rkey: "fdsa".into(), 494 }, 495 links: vec![ 496 CollectedLink { 497 target: Link::Uri("e.com".into()), 498 path: ".abc.uri".into(), 499 }, 500 CollectedLink { 501 target: Link::Uri("f.com".into()), 502 path: ".xyz[].uri".into(), 503 }, 504 CollectedLink { 505 target: Link::Uri("g.com".into()), 506 path: ".xyz[].uri".into(), 507 }, 508 ], 509 }, 510 0, 511 )?; 512 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 513 assert_eq!( 514 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 515 1 516 ); 517 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1); 518 assert_eq!( 519 storage.get_distinct_did_count("f.com", "app.t.c", ".xyz[].uri")?, 520 1 521 ); 522 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 1); 523 assert_eq!( 524 storage.get_distinct_did_count("g.com", "app.t.c", ".xyz[].uri")?, 525 1 526 ); 527 528 storage.push( 529 &ActionableEvent::DeleteRecord(RecordId { 530 did: "did:plc:asdf".into(), 531 collection: "app.t.c".into(), 532 rkey: "fdsa".into(), 533 }), 534 0, 535 )?; 536 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0); 537 assert_eq!( 538 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 539 0 540 ); 541 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 0); 542 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 0); 543 assert_stats(storage.get_stats()?, 1..=1, 3..=3, 0..=0); 544 }); 545 546 test_each_storage!(update_link, |storage| { 547 // create the links 548 storage.push( 549 &ActionableEvent::CreateLinks { 550 record_id: RecordId { 551 did: "did:plc:asdf".into(), 552 collection: "app.t.c".into(), 553 rkey: "fdsa".into(), 554 }, 555 links: vec![ 556 CollectedLink { 557 target: Link::Uri("e.com".into()), 558 path: ".abc.uri".into(), 559 }, 560 CollectedLink { 561 target: Link::Uri("f.com".into()), 562 path: ".xyz[].uri".into(), 563 }, 564 CollectedLink { 565 target: Link::Uri("g.com".into()), 566 path: ".xyz[].uri".into(), 567 }, 568 ], 569 }, 570 0, 571 )?; 572 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 573 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1); 574 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 1); 575 576 // update them 577 storage.push( 578 &ActionableEvent::UpdateLinks { 579 record_id: RecordId { 580 did: "did:plc:asdf".into(), 581 collection: "app.t.c".into(), 582 rkey: "fdsa".into(), 583 }, 584 new_links: vec![ 585 CollectedLink { 586 target: Link::Uri("h.com".into()), 587 path: ".abc.uri".into(), 588 }, 589 CollectedLink { 590 target: Link::Uri("f.com".into()), 591 path: ".xyz[].uri".into(), 592 }, 593 CollectedLink { 594 target: Link::Uri("i.com".into()), 595 path: ".xyz[].uri".into(), 596 }, 597 ], 598 }, 599 0, 600 )?; 601 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0); 602 assert_eq!(storage.get_count("h.com", "app.t.c", ".abc.uri")?, 1); 603 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1); 604 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 0); 605 assert_eq!(storage.get_count("i.com", "app.t.c", ".xyz[].uri")?, 1); 606 assert_stats(storage.get_stats()?, 1..=1, 5..=5, 1..=1); 607 }); 608 609 test_each_storage!(update_no_links_to_links, |storage| { 610 // update without prior create (consumer would have filtered out the original) 611 storage.push( 612 &ActionableEvent::UpdateLinks { 613 record_id: RecordId { 614 did: "did:plc:asdf".into(), 615 collection: "app.t.c".into(), 616 rkey: "asdf".into(), 617 }, 618 new_links: vec![CollectedLink { 619 target: Link::Uri("a.com".into()), 620 path: ".abc.uri".into(), 621 }], 622 }, 623 0, 624 )?; 625 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1); 626 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1); 627 }); 628 629 test_each_storage!(delete_multi_link_same_target, |storage| { 630 storage.push( 631 &ActionableEvent::CreateLinks { 632 record_id: RecordId { 633 did: "did:plc:asdf".into(), 634 collection: "app.t.c".into(), 635 rkey: "asdf".into(), 636 }, 637 links: vec![ 638 CollectedLink { 639 target: Link::Uri("a.com".into()), 640 path: ".abc.uri".into(), 641 }, 642 CollectedLink { 643 target: Link::Uri("a.com".into()), 644 path: ".def.uri".into(), 645 }, 646 ], 647 }, 648 0, 649 )?; 650 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1); 651 assert_eq!(storage.get_count("a.com", "app.t.c", ".def.uri")?, 1); 652 653 storage.push( 654 &ActionableEvent::DeleteRecord(RecordId { 655 did: "did:plc:asdf".into(), 656 collection: "app.t.c".into(), 657 rkey: "asdf".into(), 658 }), 659 0, 660 )?; 661 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 0); 662 assert_eq!(storage.get_count("a.com", "app.t.c", ".def.uri")?, 0); 663 assert_stats(storage.get_stats()?, 1..=1, 2..=2, 0..=0); 664 }); 665 666 test_each_storage!(get_links_basic, |storage| { 667 storage.push( 668 &ActionableEvent::CreateLinks { 669 record_id: RecordId { 670 did: "did:plc:asdf".into(), 671 collection: "app.t.c".into(), 672 rkey: "asdf".into(), 673 }, 674 links: vec![CollectedLink { 675 target: Link::Uri("a.com".into()), 676 path: ".abc.uri".into(), 677 }], 678 }, 679 0, 680 )?; 681 assert_eq!( 682 storage.get_links( 683 "a.com", 684 "app.t.c", 685 ".abc.uri", 686 100, 687 None, 688 &HashSet::default() 689 )?, 690 PagedAppendingCollection { 691 version: (1, 0), 692 items: vec![RecordId { 693 did: "did:plc:asdf".into(), 694 collection: "app.t.c".into(), 695 rkey: "asdf".into(), 696 }], 697 next: None, 698 total: 1, 699 } 700 ); 701 assert_eq!( 702 storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?, 703 PagedAppendingCollection { 704 version: (1, 0), 705 items: vec!["did:plc:asdf".into()], 706 next: None, 707 total: 1, 708 } 709 ); 710 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1); 711 }); 712 713 test_each_storage!(get_links_paged, |storage| { 714 for i in 1..=5 { 715 storage.push( 716 &ActionableEvent::CreateLinks { 717 record_id: RecordId { 718 did: format!("did:plc:asdf-{i}").into(), 719 collection: "app.t.c".into(), 720 rkey: "asdf".into(), 721 }, 722 links: vec![CollectedLink { 723 target: Link::Uri("a.com".into()), 724 path: ".abc.uri".into(), 725 }], 726 }, 727 0, 728 )?; 729 } 730 let links = 731 storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?; 732 let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, None)?; 733 assert_eq!( 734 links, 735 PagedAppendingCollection { 736 version: (5, 0), 737 items: vec![ 738 RecordId { 739 did: "did:plc:asdf-5".into(), 740 collection: "app.t.c".into(), 741 rkey: "asdf".into(), 742 }, 743 RecordId { 744 did: "did:plc:asdf-4".into(), 745 collection: "app.t.c".into(), 746 rkey: "asdf".into(), 747 }, 748 ], 749 next: Some(3), 750 total: 5, 751 } 752 ); 753 assert_eq!( 754 dids, 755 PagedAppendingCollection { 756 version: (5, 0), 757 items: vec!["did:plc:asdf-5".into(), "did:plc:asdf-4".into()], 758 next: Some(3), 759 total: 5, 760 } 761 ); 762 let links = storage.get_links( 763 "a.com", 764 "app.t.c", 765 ".abc.uri", 766 2, 767 links.next, 768 &HashSet::default(), 769 )?; 770 let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, dids.next)?; 771 assert_eq!( 772 links, 773 PagedAppendingCollection { 774 version: (5, 0), 775 items: vec![ 776 RecordId { 777 did: "did:plc:asdf-3".into(), 778 collection: "app.t.c".into(), 779 rkey: "asdf".into(), 780 }, 781 RecordId { 782 did: "did:plc:asdf-2".into(), 783 collection: "app.t.c".into(), 784 rkey: "asdf".into(), 785 }, 786 ], 787 next: Some(1), 788 total: 5, 789 } 790 ); 791 assert_eq!( 792 dids, 793 PagedAppendingCollection { 794 version: (5, 0), 795 items: vec!["did:plc:asdf-3".into(), "did:plc:asdf-2".into()], 796 next: Some(1), 797 total: 5, 798 } 799 ); 800 let links = storage.get_links( 801 "a.com", 802 "app.t.c", 803 ".abc.uri", 804 2, 805 links.next, 806 &HashSet::default(), 807 )?; 808 let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, dids.next)?; 809 assert_eq!( 810 links, 811 PagedAppendingCollection { 812 version: (5, 0), 813 items: vec![RecordId { 814 did: "did:plc:asdf-1".into(), 815 collection: "app.t.c".into(), 816 rkey: "asdf".into(), 817 },], 818 next: None, 819 total: 5, 820 } 821 ); 822 assert_eq!( 823 dids, 824 PagedAppendingCollection { 825 version: (5, 0), 826 items: vec!["did:plc:asdf-1".into()], 827 next: None, 828 total: 5, 829 } 830 ); 831 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5); 832 }); 833 834 test_each_storage!(get_filtered_links, |storage| { 835 let links = storage.get_links( 836 "a.com", 837 "app.t.c", 838 ".abc.uri", 839 2, 840 None, 841 &HashSet::from([Did("did:plc:linker".to_string())]), 842 )?; 843 assert_eq!( 844 links, 845 PagedAppendingCollection { 846 version: (0, 0), 847 items: vec![], 848 next: None, 849 total: 0, 850 } 851 ); 852 853 storage.push( 854 &ActionableEvent::CreateLinks { 855 record_id: RecordId { 856 did: "did:plc:linker".into(), 857 collection: "app.t.c".into(), 858 rkey: "asdf".into(), 859 }, 860 links: vec![CollectedLink { 861 target: Link::Uri("a.com".into()), 862 path: ".abc.uri".into(), 863 }], 864 }, 865 0, 866 )?; 867 868 let links = storage.get_links( 869 "a.com", 870 "app.t.c", 871 ".abc.uri", 872 2, 873 None, 874 &HashSet::from([Did("did:plc:linker".to_string())]), 875 )?; 876 assert_eq!( 877 links, 878 PagedAppendingCollection { 879 version: (1, 0), 880 items: vec![RecordId { 881 did: "did:plc:linker".into(), 882 collection: "app.t.c".into(), 883 rkey: "asdf".into(), 884 },], 885 next: None, 886 total: 1, 887 } 888 ); 889 890 let links = storage.get_links( 891 "a.com", 892 "app.t.c", 893 ".abc.uri", 894 2, 895 None, 896 &HashSet::from([Did("did:plc:someone-else".to_string())]), 897 )?; 898 assert_eq!( 899 links, 900 PagedAppendingCollection { 901 version: (0, 0), 902 items: vec![], 903 next: None, 904 total: 0, 905 } 906 ); 907 908 storage.push( 909 &ActionableEvent::CreateLinks { 910 record_id: RecordId { 911 did: "did:plc:linker".into(), 912 collection: "app.t.c".into(), 913 rkey: "asdf-2".into(), 914 }, 915 links: vec![CollectedLink { 916 target: Link::Uri("a.com".into()), 917 path: ".abc.uri".into(), 918 }], 919 }, 920 0, 921 )?; 922 storage.push( 923 &ActionableEvent::CreateLinks { 924 record_id: RecordId { 925 did: "did:plc:someone-else".into(), 926 collection: "app.t.c".into(), 927 rkey: "asdf".into(), 928 }, 929 links: vec![CollectedLink { 930 target: Link::Uri("a.com".into()), 931 path: ".abc.uri".into(), 932 }], 933 }, 934 0, 935 )?; 936 937 let links = storage.get_links( 938 "a.com", 939 "app.t.c", 940 ".abc.uri", 941 2, 942 None, 943 &HashSet::from([Did("did:plc:linker".to_string())]), 944 )?; 945 assert_eq!( 946 links, 947 PagedAppendingCollection { 948 version: (2, 0), 949 items: vec![ 950 RecordId { 951 did: "did:plc:linker".into(), 952 collection: "app.t.c".into(), 953 rkey: "asdf-2".into(), 954 }, 955 RecordId { 956 did: "did:plc:linker".into(), 957 collection: "app.t.c".into(), 958 rkey: "asdf".into(), 959 }, 960 ], 961 next: None, 962 total: 2, 963 } 964 ); 965 966 let links = storage.get_links( 967 "a.com", 968 "app.t.c", 969 ".abc.uri", 970 2, 971 None, 972 &HashSet::from([ 973 Did("did:plc:linker".to_string()), 974 Did("did:plc:someone-else".to_string()), 975 ]), 976 )?; 977 assert_eq!( 978 links, 979 PagedAppendingCollection { 980 version: (3, 0), 981 items: vec![ 982 RecordId { 983 did: "did:plc:someone-else".into(), 984 collection: "app.t.c".into(), 985 rkey: "asdf".into(), 986 }, 987 RecordId { 988 did: "did:plc:linker".into(), 989 collection: "app.t.c".into(), 990 rkey: "asdf-2".into(), 991 }, 992 ], 993 next: Some(1), 994 total: 3, 995 } 996 ); 997 998 let links = storage.get_links( 999 "a.com", 1000 "app.t.c", 1001 ".abc.uri", 1002 2, 1003 None, 1004 &HashSet::from([Did("did:plc:someone-unknown".to_string())]), 1005 )?; 1006 assert_eq!( 1007 links, 1008 PagedAppendingCollection { 1009 version: (0, 0), 1010 items: vec![], 1011 next: None, 1012 total: 0, 1013 } 1014 ); 1015 }); 1016 1017 test_each_storage!(get_links_exact_multiple, |storage| { 1018 for i in 1..=4 { 1019 storage.push( 1020 &ActionableEvent::CreateLinks { 1021 record_id: RecordId { 1022 did: format!("did:plc:asdf-{i}").into(), 1023 collection: "app.t.c".into(), 1024 rkey: "asdf".into(), 1025 }, 1026 links: vec![CollectedLink { 1027 target: Link::Uri("a.com".into()), 1028 path: ".abc.uri".into(), 1029 }], 1030 }, 1031 0, 1032 )?; 1033 } 1034 let links = 1035 storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?; 1036 assert_eq!( 1037 links, 1038 PagedAppendingCollection { 1039 version: (4, 0), 1040 items: vec![ 1041 RecordId { 1042 did: "did:plc:asdf-4".into(), 1043 collection: "app.t.c".into(), 1044 rkey: "asdf".into(), 1045 }, 1046 RecordId { 1047 did: "did:plc:asdf-3".into(), 1048 collection: "app.t.c".into(), 1049 rkey: "asdf".into(), 1050 }, 1051 ], 1052 next: Some(2), 1053 total: 4, 1054 } 1055 ); 1056 let links = storage.get_links( 1057 "a.com", 1058 "app.t.c", 1059 ".abc.uri", 1060 2, 1061 links.next, 1062 &HashSet::default(), 1063 )?; 1064 assert_eq!( 1065 links, 1066 PagedAppendingCollection { 1067 version: (4, 0), 1068 items: vec![ 1069 RecordId { 1070 did: "did:plc:asdf-2".into(), 1071 collection: "app.t.c".into(), 1072 rkey: "asdf".into(), 1073 }, 1074 RecordId { 1075 did: "did:plc:asdf-1".into(), 1076 collection: "app.t.c".into(), 1077 rkey: "asdf".into(), 1078 }, 1079 ], 1080 next: None, 1081 total: 4, 1082 } 1083 ); 1084 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 4..=4); 1085 }); 1086 1087 test_each_storage!(page_links_while_new_links_arrive, |storage| { 1088 for i in 1..=4 { 1089 storage.push( 1090 &ActionableEvent::CreateLinks { 1091 record_id: RecordId { 1092 did: format!("did:plc:asdf-{i}").into(), 1093 collection: "app.t.c".into(), 1094 rkey: "asdf".into(), 1095 }, 1096 links: vec![CollectedLink { 1097 target: Link::Uri("a.com".into()), 1098 path: ".abc.uri".into(), 1099 }], 1100 }, 1101 0, 1102 )?; 1103 } 1104 let links = 1105 storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?; 1106 assert_eq!( 1107 links, 1108 PagedAppendingCollection { 1109 version: (4, 0), 1110 items: vec![ 1111 RecordId { 1112 did: "did:plc:asdf-4".into(), 1113 collection: "app.t.c".into(), 1114 rkey: "asdf".into(), 1115 }, 1116 RecordId { 1117 did: "did:plc:asdf-3".into(), 1118 collection: "app.t.c".into(), 1119 rkey: "asdf".into(), 1120 }, 1121 ], 1122 next: Some(2), 1123 total: 4, 1124 } 1125 ); 1126 storage.push( 1127 &ActionableEvent::CreateLinks { 1128 record_id: RecordId { 1129 did: "did:plc:asdf-5".into(), 1130 collection: "app.t.c".into(), 1131 rkey: "asdf".into(), 1132 }, 1133 links: vec![CollectedLink { 1134 target: Link::Uri("a.com".into()), 1135 path: ".abc.uri".into(), 1136 }], 1137 }, 1138 0, 1139 )?; 1140 let links = storage.get_links( 1141 "a.com", 1142 "app.t.c", 1143 ".abc.uri", 1144 2, 1145 links.next, 1146 &HashSet::default(), 1147 )?; 1148 assert_eq!( 1149 links, 1150 PagedAppendingCollection { 1151 version: (5, 0), 1152 items: vec![ 1153 RecordId { 1154 did: "did:plc:asdf-2".into(), 1155 collection: "app.t.c".into(), 1156 rkey: "asdf".into(), 1157 }, 1158 RecordId { 1159 did: "did:plc:asdf-1".into(), 1160 collection: "app.t.c".into(), 1161 rkey: "asdf".into(), 1162 }, 1163 ], 1164 next: None, 1165 total: 5, 1166 } 1167 ); 1168 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5); 1169 }); 1170 1171 test_each_storage!(page_links_while_some_are_deleted, |storage| { 1172 for i in 1..=4 { 1173 storage.push( 1174 &ActionableEvent::CreateLinks { 1175 record_id: RecordId { 1176 did: format!("did:plc:asdf-{i}").into(), 1177 collection: "app.t.c".into(), 1178 rkey: "asdf".into(), 1179 }, 1180 links: vec![CollectedLink { 1181 target: Link::Uri("a.com".into()), 1182 path: ".abc.uri".into(), 1183 }], 1184 }, 1185 0, 1186 )?; 1187 } 1188 let links = 1189 storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?; 1190 assert_eq!( 1191 links, 1192 PagedAppendingCollection { 1193 version: (4, 0), 1194 items: vec![ 1195 RecordId { 1196 did: "did:plc:asdf-4".into(), 1197 collection: "app.t.c".into(), 1198 rkey: "asdf".into(), 1199 }, 1200 RecordId { 1201 did: "did:plc:asdf-3".into(), 1202 collection: "app.t.c".into(), 1203 rkey: "asdf".into(), 1204 }, 1205 ], 1206 next: Some(2), 1207 total: 4, 1208 } 1209 ); 1210 storage.push( 1211 &ActionableEvent::DeleteRecord(RecordId { 1212 did: "did:plc:asdf-2".into(), 1213 collection: "app.t.c".into(), 1214 rkey: "asdf".into(), 1215 }), 1216 0, 1217 )?; 1218 let links = storage.get_links( 1219 "a.com", 1220 "app.t.c", 1221 ".abc.uri", 1222 2, 1223 links.next, 1224 &HashSet::default(), 1225 )?; 1226 assert_eq!( 1227 links, 1228 PagedAppendingCollection { 1229 version: (4, 1), 1230 items: vec![RecordId { 1231 did: "did:plc:asdf-1".into(), 1232 collection: "app.t.c".into(), 1233 rkey: "asdf".into(), 1234 },], 1235 next: None, 1236 total: 3, 1237 } 1238 ); 1239 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 3..=3); 1240 }); 1241 1242 test_each_storage!(page_links_accounts_inactive, |storage| { 1243 for i in 1..=4 { 1244 storage.push( 1245 &ActionableEvent::CreateLinks { 1246 record_id: RecordId { 1247 did: format!("did:plc:asdf-{i}").into(), 1248 collection: "app.t.c".into(), 1249 rkey: "asdf".into(), 1250 }, 1251 links: vec![CollectedLink { 1252 target: Link::Uri("a.com".into()), 1253 path: ".abc.uri".into(), 1254 }], 1255 }, 1256 0, 1257 )?; 1258 } 1259 let links = 1260 storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?; 1261 assert_eq!( 1262 links, 1263 PagedAppendingCollection { 1264 version: (4, 0), 1265 items: vec![ 1266 RecordId { 1267 did: "did:plc:asdf-4".into(), 1268 collection: "app.t.c".into(), 1269 rkey: "asdf".into(), 1270 }, 1271 RecordId { 1272 did: "did:plc:asdf-3".into(), 1273 collection: "app.t.c".into(), 1274 rkey: "asdf".into(), 1275 }, 1276 ], 1277 next: Some(2), 1278 total: 4, 1279 } 1280 ); 1281 storage.push( 1282 &ActionableEvent::DeactivateAccount("did:plc:asdf-1".into()), 1283 0, 1284 )?; 1285 let links = storage.get_links( 1286 "a.com", 1287 "app.t.c", 1288 ".abc.uri", 1289 2, 1290 links.next, 1291 &HashSet::default(), 1292 )?; 1293 assert_eq!( 1294 links, 1295 PagedAppendingCollection { 1296 version: (4, 0), 1297 items: vec![RecordId { 1298 did: "did:plc:asdf-2".into(), 1299 collection: "app.t.c".into(), 1300 rkey: "asdf".into(), 1301 },], 1302 next: None, 1303 total: 4, 1304 } 1305 ); 1306 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 4..=4); 1307 }); 1308 1309 test_each_storage!(get_all_counts, |storage| { 1310 storage.push( 1311 &ActionableEvent::CreateLinks { 1312 record_id: RecordId { 1313 did: "did:plc:asdf".into(), 1314 collection: "app.t.c".into(), 1315 rkey: "asdf".into(), 1316 }, 1317 links: vec![ 1318 CollectedLink { 1319 target: Link::Uri("a.com".into()), 1320 path: ".abc.uri".into(), 1321 }, 1322 CollectedLink { 1323 target: Link::Uri("a.com".into()), 1324 path: ".def.uri".into(), 1325 }, 1326 ], 1327 }, 1328 0, 1329 )?; 1330 assert_eq!(storage.get_all_record_counts("a.com")?, { 1331 let mut counts = HashMap::new(); 1332 let mut t_c_counts = HashMap::new(); 1333 t_c_counts.insert(".abc.uri".into(), 1); 1334 t_c_counts.insert(".def.uri".into(), 1); 1335 counts.insert("app.t.c".into(), t_c_counts); 1336 counts 1337 }); 1338 assert_eq!(storage.get_all_counts("a.com")?, { 1339 let mut counts = HashMap::new(); 1340 let mut t_c_counts = HashMap::new(); 1341 t_c_counts.insert( 1342 ".abc.uri".into(), 1343 CountsByCount { 1344 records: 1, 1345 distinct_dids: 1, 1346 }, 1347 ); 1348 t_c_counts.insert( 1349 ".def.uri".into(), 1350 CountsByCount { 1351 records: 1, 1352 distinct_dids: 1, 1353 }, 1354 ); 1355 counts.insert("app.t.c".into(), t_c_counts); 1356 counts 1357 }); 1358 assert_stats(storage.get_stats()?, 1..=1, 2..=2, 1..=1); 1359 }); 1360 1361 //////// many-to-many ///////// 1362 1363 test_each_storage!(get_m2m_counts_empty, |storage| { 1364 assert_eq!( 1365 storage.get_many_to_many_counts( 1366 "a.com", 1367 "a.b.c", 1368 ".d.e", 1369 ".f.g", 1370 10, 1371 None, 1372 &HashSet::new(), 1373 &HashSet::new(), 1374 )?, 1375 PagedOrderedCollection { 1376 items: vec![], 1377 next: None, 1378 } 1379 ); 1380 }); 1381 1382 test_each_storage!(get_m2m_counts_single, |storage| { 1383 storage.push( 1384 &ActionableEvent::CreateLinks { 1385 record_id: RecordId { 1386 did: "did:plc:asdf".into(), 1387 collection: "app.t.c".into(), 1388 rkey: "asdf".into(), 1389 }, 1390 links: vec![ 1391 CollectedLink { 1392 target: Link::Uri("a.com".into()), 1393 path: ".abc.uri".into(), 1394 }, 1395 CollectedLink { 1396 target: Link::Uri("b.com".into()), 1397 path: ".def.uri".into(), 1398 }, 1399 CollectedLink { 1400 target: Link::Uri("b.com".into()), 1401 path: ".ghi.uri".into(), 1402 }, 1403 ], 1404 }, 1405 0, 1406 )?; 1407 assert_eq!( 1408 storage.get_many_to_many_counts( 1409 "a.com", 1410 "app.t.c", 1411 ".abc.uri", 1412 ".def.uri", 1413 10, 1414 None, 1415 &HashSet::new(), 1416 &HashSet::new(), 1417 )?, 1418 PagedOrderedCollection { 1419 items: vec![("b.com".to_string(), 1, 1)], 1420 next: None, 1421 } 1422 ); 1423 }); 1424 1425 test_each_storage!(get_m2m_counts_filters, |storage| { 1426 storage.push( 1427 &ActionableEvent::CreateLinks { 1428 record_id: RecordId { 1429 did: "did:plc:asdf".into(), 1430 collection: "app.t.c".into(), 1431 rkey: "asdf".into(), 1432 }, 1433 links: vec![ 1434 CollectedLink { 1435 target: Link::Uri("a.com".into()), 1436 path: ".abc.uri".into(), 1437 }, 1438 CollectedLink { 1439 target: Link::Uri("b.com".into()), 1440 path: ".def.uri".into(), 1441 }, 1442 ], 1443 }, 1444 0, 1445 )?; 1446 storage.push( 1447 &ActionableEvent::CreateLinks { 1448 record_id: RecordId { 1449 did: "did:plc:asdfasdf".into(), 1450 collection: "app.t.c".into(), 1451 rkey: "asdf".into(), 1452 }, 1453 links: vec![ 1454 CollectedLink { 1455 target: Link::Uri("a.com".into()), 1456 path: ".abc.uri".into(), 1457 }, 1458 CollectedLink { 1459 target: Link::Uri("b.com".into()), 1460 path: ".def.uri".into(), 1461 }, 1462 ], 1463 }, 1464 1, 1465 )?; 1466 storage.push( 1467 &ActionableEvent::CreateLinks { 1468 record_id: RecordId { 1469 did: "did:plc:fdsa".into(), 1470 collection: "app.t.c".into(), 1471 rkey: "asdf".into(), 1472 }, 1473 links: vec![ 1474 CollectedLink { 1475 target: Link::Uri("a.com".into()), 1476 path: ".abc.uri".into(), 1477 }, 1478 CollectedLink { 1479 target: Link::Uri("c.com".into()), 1480 path: ".def.uri".into(), 1481 }, 1482 ], 1483 }, 1484 2, 1485 )?; 1486 storage.push( 1487 &ActionableEvent::CreateLinks { 1488 record_id: RecordId { 1489 did: "did:plc:fdsa".into(), 1490 collection: "app.t.c".into(), 1491 rkey: "asdf2".into(), 1492 }, 1493 links: vec![ 1494 CollectedLink { 1495 target: Link::Uri("a.com".into()), 1496 path: ".abc.uri".into(), 1497 }, 1498 CollectedLink { 1499 target: Link::Uri("c.com".into()), 1500 path: ".def.uri".into(), 1501 }, 1502 ], 1503 }, 1504 3, 1505 )?; 1506 assert_eq!( 1507 storage.get_many_to_many_counts( 1508 "a.com", 1509 "app.t.c", 1510 ".abc.uri", 1511 ".def.uri", 1512 10, 1513 None, 1514 &HashSet::new(), 1515 &HashSet::new(), 1516 )?, 1517 PagedOrderedCollection { 1518 items: vec![("b.com".to_string(), 2, 2), ("c.com".to_string(), 2, 1),], 1519 next: None, 1520 } 1521 ); 1522 assert_eq!( 1523 storage.get_many_to_many_counts( 1524 "a.com", 1525 "app.t.c", 1526 ".abc.uri", 1527 ".def.uri", 1528 10, 1529 None, 1530 &HashSet::from_iter([Did("did:plc:fdsa".to_string())]), 1531 &HashSet::new(), 1532 )?, 1533 PagedOrderedCollection { 1534 items: vec![("c.com".to_string(), 2, 1),], 1535 next: None, 1536 } 1537 ); 1538 assert_eq!( 1539 storage.get_many_to_many_counts( 1540 "a.com", 1541 "app.t.c", 1542 ".abc.uri", 1543 ".def.uri", 1544 10, 1545 None, 1546 &HashSet::new(), 1547 &HashSet::from_iter(["b.com".to_string()]), 1548 )?, 1549 PagedOrderedCollection { 1550 items: vec![("b.com".to_string(), 2, 2),], 1551 next: None, 1552 } 1553 ); 1554 }); 1555}