Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::{ActionableEvent, CountsByCount, Did, RecordId}; 2use anyhow::Result; 3use serde::{Deserialize, Serialize}; 4use std::collections::{HashMap, HashSet}; 5 6pub mod mem_store; 7pub use mem_store::MemStorage; 8 9#[cfg(feature = "rocks")] 10pub mod rocks_store; 11#[cfg(feature = "rocks")] 12pub use rocks_store::RocksStorage; 13 14#[derive(Debug, PartialEq)] 15pub struct PagedAppendingCollection<T> { 16 pub version: (u64, u64), // (collection length, deleted item count) // TODO: change to (total, active)? since dedups isn't "deleted" 17 pub items: Vec<T>, 18 pub next: Option<u64>, 19 pub total: u64, 20} 21 22#[derive(Debug, Deserialize, Serialize, PartialEq)] 23pub struct StorageStats { 24 /// estimate of how many accounts we've seen create links. the _subjects_ of any links are not represented here. 25 /// for example: new user A follows users B and C. this count will only increment by one, for A. 26 pub dids: u64, 27 28 /// estimate targets * distinct (collection, path)s to reference them. 29 /// distinct targets alone are currently challenging to estimate. 30 pub targetables: u64, 31 32 /// estimate of the count of atproto records seen that contain links. 33 /// records with multiple links are single-counted. 34 /// for LSM stores, deleted links don't decrement this, and updated records with any links will likely increment it. 35 pub linking_records: u64, 36} 37 38pub trait LinkStorage: Send + Sync { 39 /// jetstream cursor from last saved actions, if available 40 fn get_cursor(&mut self) -> Result<Option<u64>> { 41 Ok(None) 42 } 43 44 fn push(&mut self, event: &ActionableEvent, cursor: u64) -> Result<()>; 45 46 // readers are off from the writer instance 47 fn to_readable(&mut self) -> impl LinkReader; 48} 49 50pub trait LinkReader: Clone + Send + Sync + 'static { 51 fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>; 52 53 fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>; 54 55 fn get_links( 56 &self, 57 target: &str, 58 collection: &str, 59 path: &str, 60 limit: u64, 61 until: Option<u64>, 62 filter_dids: &HashSet<Did>, 63 ) -> Result<PagedAppendingCollection<RecordId>>; 64 65 fn get_distinct_dids( 66 &self, 67 target: &str, 68 collection: &str, 69 path: &str, 70 limit: u64, 71 until: Option<u64>, 72 ) -> Result<PagedAppendingCollection<Did>>; // TODO: reflect dedups in cursor 73 74 fn get_all_record_counts(&self, _target: &str) 75 -> Result<HashMap<String, HashMap<String, u64>>>; 76 77 fn get_all_counts( 78 &self, 79 _target: &str, 80 ) -> Result<HashMap<String, HashMap<String, CountsByCount>>>; 81 82 /// assume all stats are estimates, since exact counts are very challenging for LSMs 83 fn get_stats(&self) -> Result<StorageStats>; 84} 85 86#[cfg(test)] 87mod tests { 88 use super::*; 89 use links::{CollectedLink, Link}; 90 use std::ops::RangeBounds; 91 92 macro_rules! test_each_storage { 93 ($test_name:ident, |$storage_label:ident| $test_code:block) => { 94 #[test] 95 fn $test_name() -> Result<()> { 96 { 97 println!("=> testing with memstorage backend"); 98 #[allow(unused_mut)] 99 let mut $storage_label = MemStorage::new(); 100 $test_code 101 } 102 103 #[cfg(feature = "rocks")] 104 { 105 println!("=> testing with rocksdb backend"); 106 let rocks_db_path = tempfile::tempdir()?; 107 #[allow(unused_mut)] 108 let mut $storage_label = RocksStorage::new(rocks_db_path.path())?; 109 $test_code 110 } 111 112 Ok(()) 113 } 114 }; 115 } 116 117 fn assert_stats( 118 stats: StorageStats, 119 dids: impl RangeBounds<u64>, 120 targetables: impl RangeBounds<u64>, 121 linking_records: impl RangeBounds<u64>, 122 ) { 123 fn check(name: &str, stat: u64, rb: impl RangeBounds<u64>) { 124 assert!( 125 rb.contains(&stat), 126 "{name:?}: {stat:?} not in range {:?}–{:?}", 127 rb.start_bound(), 128 rb.end_bound() 129 ); 130 } 131 check("dids", stats.dids, dids); 132 check("targetables", stats.targetables, targetables); 133 check("linking_records", stats.linking_records, linking_records); 134 } 135 136 test_each_storage!(test_empty, |storage| { 137 assert_eq!(storage.get_count("", "", "")?, 0); 138 assert_eq!(storage.get_count("a", "b", "c")?, 0); 139 assert_eq!( 140 storage.get_count( 141 "at://did:plc:b3rzzkblqsxhr3dgcueymkqe/app.bsky.feed.post/3lf6yc4drhk2f", 142 "app.t.c", 143 ".reply.parent.uri" 144 )?, 145 0 146 ); 147 assert_eq!(storage.get_distinct_did_count("", "", "")?, 0); 148 assert_eq!( 149 storage.get_links( 150 "a.com", 151 "app.t.c", 152 ".abc.uri", 153 100, 154 None, 155 &HashSet::default() 156 )?, 157 PagedAppendingCollection { 158 version: (0, 0), 159 items: vec![], 160 next: None, 161 total: 0, 162 } 163 ); 164 assert_eq!( 165 storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?, 166 PagedAppendingCollection { 167 version: (0, 0), 168 items: vec![], 169 next: None, 170 total: 0, 171 } 172 ); 173 assert_eq!(storage.get_all_counts("bad-example.com")?, HashMap::new()); 174 assert_eq!( 175 storage.get_all_record_counts("bad-example.com")?, 176 HashMap::new() 177 ); 178 179 assert_stats(storage.get_stats()?, 0..=0, 0..=0, 0..=0); 180 }); 181 182 test_each_storage!(test_add_link, |storage| { 183 storage.push( 184 &ActionableEvent::CreateLinks { 185 record_id: RecordId { 186 did: "did:plc:asdf".into(), 187 collection: "app.t.c".into(), 188 rkey: "fdsa".into(), 189 }, 190 links: vec![CollectedLink { 191 target: Link::Uri("e.com".into()), 192 path: ".abc.uri".into(), 193 }], 194 }, 195 0, 196 )?; 197 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 198 assert_eq!( 199 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 200 1 201 ); 202 assert_eq!(storage.get_count("bad.com", "app.t.c", ".abc.uri")?, 0); 203 assert_eq!(storage.get_count("e.com", "app.t.c", ".bad.uri")?, 0); 204 assert_eq!( 205 storage.get_distinct_did_count("e.com", "app.t.c", ".bad.uri")?, 206 0 207 ); 208 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1); 209 }); 210 211 test_each_storage!(test_links, |storage| { 212 storage.push( 213 &ActionableEvent::CreateLinks { 214 record_id: RecordId { 215 did: "did:plc:asdf".into(), 216 collection: "app.t.c".into(), 217 rkey: "fdsa".into(), 218 }, 219 links: vec![CollectedLink { 220 target: Link::Uri("e.com".into()), 221 path: ".abc.uri".into(), 222 }], 223 }, 224 0, 225 )?; 226 227 // delete under the wrong collection 228 storage.push( 229 &ActionableEvent::DeleteRecord(RecordId { 230 did: "did:plc:asdf".into(), 231 collection: "app.test.wrongcollection".into(), 232 rkey: "fdsa".into(), 233 }), 234 0, 235 )?; 236 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 237 238 // delete under the wrong rkey 239 storage.push( 240 &ActionableEvent::DeleteRecord(RecordId { 241 did: "did:plc:asdf".into(), 242 collection: "app.t.c".into(), 243 rkey: "wrongkey".into(), 244 }), 245 0, 246 )?; 247 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 248 249 // finally actually delete it 250 storage.push( 251 &ActionableEvent::DeleteRecord(RecordId { 252 did: "did:plc:asdf".into(), 253 collection: "app.t.c".into(), 254 rkey: "fdsa".into(), 255 }), 256 0, 257 )?; 258 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0); 259 260 // put it back 261 storage.push( 262 &ActionableEvent::CreateLinks { 263 record_id: RecordId { 264 did: "did:plc:asdf".into(), 265 collection: "app.t.c".into(), 266 rkey: "fdsa".into(), 267 }, 268 links: vec![CollectedLink { 269 target: Link::Uri("e.com".into()), 270 path: ".abc.uri".into(), 271 }], 272 }, 273 0, 274 )?; 275 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 276 assert_eq!( 277 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 278 1 279 ); 280 281 // add another link from this user 282 storage.push( 283 &ActionableEvent::CreateLinks { 284 record_id: RecordId { 285 did: "did:plc:asdf".into(), 286 collection: "app.t.c".into(), 287 rkey: "fdsa2".into(), 288 }, 289 links: vec![CollectedLink { 290 target: Link::Uri("e.com".into()), 291 path: ".abc.uri".into(), 292 }], 293 }, 294 0, 295 )?; 296 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2); 297 assert_eq!( 298 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 299 1 300 ); 301 302 // add a link from someone else 303 storage.push( 304 &ActionableEvent::CreateLinks { 305 record_id: RecordId { 306 did: "did:plc:asdfasdf".into(), 307 collection: "app.t.c".into(), 308 rkey: "fdsa".into(), 309 }, 310 links: vec![CollectedLink { 311 target: Link::Uri("e.com".into()), 312 path: ".abc.uri".into(), 313 }], 314 }, 315 0, 316 )?; 317 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 3); 318 assert_eq!( 319 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 320 2 321 ); 322 323 // aaaand delete the first one again 324 storage.push( 325 &ActionableEvent::DeleteRecord(RecordId { 326 did: "did:plc:asdf".into(), 327 collection: "app.t.c".into(), 328 rkey: "fdsa".into(), 329 }), 330 0, 331 )?; 332 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2); 333 assert_eq!( 334 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 335 2 336 ); 337 assert_stats(storage.get_stats()?, 2..=2, 1..=1, 2..=2); 338 }); 339 340 test_each_storage!(test_two_user_links_delete_one, |storage| { 341 // create the first link 342 storage.push( 343 &ActionableEvent::CreateLinks { 344 record_id: RecordId { 345 did: "did:plc:asdf".into(), 346 collection: "app.t.c".into(), 347 rkey: "A".into(), 348 }, 349 links: vec![CollectedLink { 350 target: Link::Uri("e.com".into()), 351 path: ".abc.uri".into(), 352 }], 353 }, 354 0, 355 )?; 356 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 357 assert_eq!( 358 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 359 1 360 ); 361 362 // create the second link (same user, different rkey) 363 storage.push( 364 &ActionableEvent::CreateLinks { 365 record_id: RecordId { 366 did: "did:plc:asdf".into(), 367 collection: "app.t.c".into(), 368 rkey: "B".into(), 369 }, 370 links: vec![CollectedLink { 371 target: Link::Uri("e.com".into()), 372 path: ".abc.uri".into(), 373 }], 374 }, 375 0, 376 )?; 377 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2); 378 assert_eq!( 379 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 380 1 381 ); 382 383 // aaaand delete the first link 384 storage.push( 385 &ActionableEvent::DeleteRecord(RecordId { 386 did: "did:plc:asdf".into(), 387 collection: "app.t.c".into(), 388 rkey: "A".into(), 389 }), 390 0, 391 )?; 392 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 393 assert_eq!( 394 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 395 1 396 ); 397 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1); 398 }); 399 400 test_each_storage!(test_accounts, |storage| { 401 // create two links 402 storage.push( 403 &ActionableEvent::CreateLinks { 404 record_id: RecordId { 405 did: "did:plc:asdf".into(), 406 collection: "app.t.c".into(), 407 rkey: "A".into(), 408 }, 409 links: vec![CollectedLink { 410 target: Link::Uri("a.com".into()), 411 path: ".abc.uri".into(), 412 }], 413 }, 414 0, 415 )?; 416 storage.push( 417 &ActionableEvent::CreateLinks { 418 record_id: RecordId { 419 did: "did:plc:asdf".into(), 420 collection: "app.t.c".into(), 421 rkey: "B".into(), 422 }, 423 links: vec![CollectedLink { 424 target: Link::Uri("b.com".into()), 425 path: ".abc.uri".into(), 426 }], 427 }, 428 0, 429 )?; 430 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1); 431 assert_eq!(storage.get_count("b.com", "app.t.c", ".abc.uri")?, 1); 432 433 // and a third from a different account 434 storage.push( 435 &ActionableEvent::CreateLinks { 436 record_id: RecordId { 437 did: "did:plc:fdsa".into(), 438 collection: "app.t.c".into(), 439 rkey: "A".into(), 440 }, 441 links: vec![CollectedLink { 442 target: Link::Uri("a.com".into()), 443 path: ".abc.uri".into(), 444 }], 445 }, 446 0, 447 )?; 448 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 2); 449 450 // delete the first account 451 storage.push(&ActionableEvent::DeleteAccount("did:plc:asdf".into()), 0)?; 452 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1); 453 assert_eq!(storage.get_count("b.com", "app.t.c", ".abc.uri")?, 0); 454 assert_stats(storage.get_stats()?, 1..=2, 2..=2, 1..=1); 455 }); 456 457 test_each_storage!(multi_link, |storage| { 458 storage.push( 459 &ActionableEvent::CreateLinks { 460 record_id: RecordId { 461 did: "did:plc:asdf".into(), 462 collection: "app.t.c".into(), 463 rkey: "fdsa".into(), 464 }, 465 links: vec![ 466 CollectedLink { 467 target: Link::Uri("e.com".into()), 468 path: ".abc.uri".into(), 469 }, 470 CollectedLink { 471 target: Link::Uri("f.com".into()), 472 path: ".xyz[].uri".into(), 473 }, 474 CollectedLink { 475 target: Link::Uri("g.com".into()), 476 path: ".xyz[].uri".into(), 477 }, 478 ], 479 }, 480 0, 481 )?; 482 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 483 assert_eq!( 484 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 485 1 486 ); 487 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1); 488 assert_eq!( 489 storage.get_distinct_did_count("f.com", "app.t.c", ".xyz[].uri")?, 490 1 491 ); 492 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 1); 493 assert_eq!( 494 storage.get_distinct_did_count("g.com", "app.t.c", ".xyz[].uri")?, 495 1 496 ); 497 498 storage.push( 499 &ActionableEvent::DeleteRecord(RecordId { 500 did: "did:plc:asdf".into(), 501 collection: "app.t.c".into(), 502 rkey: "fdsa".into(), 503 }), 504 0, 505 )?; 506 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0); 507 assert_eq!( 508 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 509 0 510 ); 511 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 0); 512 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 0); 513 assert_stats(storage.get_stats()?, 1..=1, 3..=3, 0..=0); 514 }); 515 516 test_each_storage!(update_link, |storage| { 517 // create the links 518 storage.push( 519 &ActionableEvent::CreateLinks { 520 record_id: RecordId { 521 did: "did:plc:asdf".into(), 522 collection: "app.t.c".into(), 523 rkey: "fdsa".into(), 524 }, 525 links: vec![ 526 CollectedLink { 527 target: Link::Uri("e.com".into()), 528 path: ".abc.uri".into(), 529 }, 530 CollectedLink { 531 target: Link::Uri("f.com".into()), 532 path: ".xyz[].uri".into(), 533 }, 534 CollectedLink { 535 target: Link::Uri("g.com".into()), 536 path: ".xyz[].uri".into(), 537 }, 538 ], 539 }, 540 0, 541 )?; 542 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 543 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1); 544 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 1); 545 546 // update them 547 storage.push( 548 &ActionableEvent::UpdateLinks { 549 record_id: RecordId { 550 did: "did:plc:asdf".into(), 551 collection: "app.t.c".into(), 552 rkey: "fdsa".into(), 553 }, 554 new_links: vec![ 555 CollectedLink { 556 target: Link::Uri("h.com".into()), 557 path: ".abc.uri".into(), 558 }, 559 CollectedLink { 560 target: Link::Uri("f.com".into()), 561 path: ".xyz[].uri".into(), 562 }, 563 CollectedLink { 564 target: Link::Uri("i.com".into()), 565 path: ".xyz[].uri".into(), 566 }, 567 ], 568 }, 569 0, 570 )?; 571 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0); 572 assert_eq!(storage.get_count("h.com", "app.t.c", ".abc.uri")?, 1); 573 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1); 574 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 0); 575 assert_eq!(storage.get_count("i.com", "app.t.c", ".xyz[].uri")?, 1); 576 assert_stats(storage.get_stats()?, 1..=1, 5..=5, 1..=1); 577 }); 578 579 test_each_storage!(update_no_links_to_links, |storage| { 580 // update without prior create (consumer would have filtered out the original) 581 storage.push( 582 &ActionableEvent::UpdateLinks { 583 record_id: RecordId { 584 did: "did:plc:asdf".into(), 585 collection: "app.t.c".into(), 586 rkey: "asdf".into(), 587 }, 588 new_links: vec![CollectedLink { 589 target: Link::Uri("a.com".into()), 590 path: ".abc.uri".into(), 591 }], 592 }, 593 0, 594 )?; 595 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1); 596 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1); 597 }); 598 599 test_each_storage!(delete_multi_link_same_target, |storage| { 600 storage.push( 601 &ActionableEvent::CreateLinks { 602 record_id: RecordId { 603 did: "did:plc:asdf".into(), 604 collection: "app.t.c".into(), 605 rkey: "asdf".into(), 606 }, 607 links: vec![ 608 CollectedLink { 609 target: Link::Uri("a.com".into()), 610 path: ".abc.uri".into(), 611 }, 612 CollectedLink { 613 target: Link::Uri("a.com".into()), 614 path: ".def.uri".into(), 615 }, 616 ], 617 }, 618 0, 619 )?; 620 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1); 621 assert_eq!(storage.get_count("a.com", "app.t.c", ".def.uri")?, 1); 622 623 storage.push( 624 &ActionableEvent::DeleteRecord(RecordId { 625 did: "did:plc:asdf".into(), 626 collection: "app.t.c".into(), 627 rkey: "asdf".into(), 628 }), 629 0, 630 )?; 631 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 0); 632 assert_eq!(storage.get_count("a.com", "app.t.c", ".def.uri")?, 0); 633 assert_stats(storage.get_stats()?, 1..=1, 2..=2, 0..=0); 634 }); 635 636 test_each_storage!(get_links_basic, |storage| { 637 storage.push( 638 &ActionableEvent::CreateLinks { 639 record_id: RecordId { 640 did: "did:plc:asdf".into(), 641 collection: "app.t.c".into(), 642 rkey: "asdf".into(), 643 }, 644 links: vec![CollectedLink { 645 target: Link::Uri("a.com".into()), 646 path: ".abc.uri".into(), 647 }], 648 }, 649 0, 650 )?; 651 assert_eq!( 652 storage.get_links( 653 "a.com", 654 "app.t.c", 655 ".abc.uri", 656 100, 657 None, 658 &HashSet::default() 659 )?, 660 PagedAppendingCollection { 661 version: (1, 0), 662 items: vec![RecordId { 663 did: "did:plc:asdf".into(), 664 collection: "app.t.c".into(), 665 rkey: "asdf".into(), 666 }], 667 next: None, 668 total: 1, 669 } 670 ); 671 assert_eq!( 672 storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?, 673 PagedAppendingCollection { 674 version: (1, 0), 675 items: vec!["did:plc:asdf".into()], 676 next: None, 677 total: 1, 678 } 679 ); 680 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1); 681 }); 682 683 test_each_storage!(get_links_paged, |storage| { 684 for i in 1..=5 { 685 storage.push( 686 &ActionableEvent::CreateLinks { 687 record_id: RecordId { 688 did: format!("did:plc:asdf-{i}").into(), 689 collection: "app.t.c".into(), 690 rkey: "asdf".into(), 691 }, 692 links: vec![CollectedLink { 693 target: Link::Uri("a.com".into()), 694 path: ".abc.uri".into(), 695 }], 696 }, 697 0, 698 )?; 699 } 700 let links = 701 storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?; 702 let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, None)?; 703 assert_eq!( 704 links, 705 PagedAppendingCollection { 706 version: (5, 0), 707 items: vec![ 708 RecordId { 709 did: "did:plc:asdf-5".into(), 710 collection: "app.t.c".into(), 711 rkey: "asdf".into(), 712 }, 713 RecordId { 714 did: "did:plc:asdf-4".into(), 715 collection: "app.t.c".into(), 716 rkey: "asdf".into(), 717 }, 718 ], 719 next: Some(3), 720 total: 5, 721 } 722 ); 723 assert_eq!( 724 dids, 725 PagedAppendingCollection { 726 version: (5, 0), 727 items: vec!["did:plc:asdf-5".into(), "did:plc:asdf-4".into()], 728 next: Some(3), 729 total: 5, 730 } 731 ); 732 let links = storage.get_links( 733 "a.com", 734 "app.t.c", 735 ".abc.uri", 736 2, 737 links.next, 738 &HashSet::default(), 739 )?; 740 let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, dids.next)?; 741 assert_eq!( 742 links, 743 PagedAppendingCollection { 744 version: (5, 0), 745 items: vec![ 746 RecordId { 747 did: "did:plc:asdf-3".into(), 748 collection: "app.t.c".into(), 749 rkey: "asdf".into(), 750 }, 751 RecordId { 752 did: "did:plc:asdf-2".into(), 753 collection: "app.t.c".into(), 754 rkey: "asdf".into(), 755 }, 756 ], 757 next: Some(1), 758 total: 5, 759 } 760 ); 761 assert_eq!( 762 dids, 763 PagedAppendingCollection { 764 version: (5, 0), 765 items: vec!["did:plc:asdf-3".into(), "did:plc:asdf-2".into()], 766 next: Some(1), 767 total: 5, 768 } 769 ); 770 let links = storage.get_links( 771 "a.com", 772 "app.t.c", 773 ".abc.uri", 774 2, 775 links.next, 776 &HashSet::default(), 777 )?; 778 let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, dids.next)?; 779 assert_eq!( 780 links, 781 PagedAppendingCollection { 782 version: (5, 0), 783 items: vec![RecordId { 784 did: "did:plc:asdf-1".into(), 785 collection: "app.t.c".into(), 786 rkey: "asdf".into(), 787 },], 788 next: None, 789 total: 5, 790 } 791 ); 792 assert_eq!( 793 dids, 794 PagedAppendingCollection { 795 version: (5, 0), 796 items: vec!["did:plc:asdf-1".into()], 797 next: None, 798 total: 5, 799 } 800 ); 801 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5); 802 }); 803 804 test_each_storage!(get_filtered_links, |storage| { 805 let links = storage.get_links( 806 "a.com", 807 "app.t.c", 808 ".abc.uri", 809 2, 810 None, 811 &HashSet::from([Did("did:plc:linker".to_string())]), 812 )?; 813 assert_eq!( 814 links, 815 PagedAppendingCollection { 816 version: (0, 0), 817 items: vec![], 818 next: None, 819 total: 0, 820 } 821 ); 822 823 storage.push( 824 &ActionableEvent::CreateLinks { 825 record_id: RecordId { 826 did: "did:plc:linker".into(), 827 collection: "app.t.c".into(), 828 rkey: "asdf".into(), 829 }, 830 links: vec![CollectedLink { 831 target: Link::Uri("a.com".into()), 832 path: ".abc.uri".into(), 833 }], 834 }, 835 0, 836 )?; 837 838 let links = storage.get_links( 839 "a.com", 840 "app.t.c", 841 ".abc.uri", 842 2, 843 None, 844 &HashSet::from([Did("did:plc:linker".to_string())]), 845 )?; 846 assert_eq!( 847 links, 848 PagedAppendingCollection { 849 version: (1, 0), 850 items: vec![RecordId { 851 did: "did:plc:linker".into(), 852 collection: "app.t.c".into(), 853 rkey: "asdf".into(), 854 },], 855 next: None, 856 total: 1, 857 } 858 ); 859 860 let links = storage.get_links( 861 "a.com", 862 "app.t.c", 863 ".abc.uri", 864 2, 865 None, 866 &HashSet::from([Did("did:plc:someone-else".to_string())]), 867 )?; 868 assert_eq!( 869 links, 870 PagedAppendingCollection { 871 version: (0, 0), 872 items: vec![], 873 next: None, 874 total: 0, 875 } 876 ); 877 878 storage.push( 879 &ActionableEvent::CreateLinks { 880 record_id: RecordId { 881 did: "did:plc:linker".into(), 882 collection: "app.t.c".into(), 883 rkey: "asdf-2".into(), 884 }, 885 links: vec![CollectedLink { 886 target: Link::Uri("a.com".into()), 887 path: ".abc.uri".into(), 888 }], 889 }, 890 0, 891 )?; 892 storage.push( 893 &ActionableEvent::CreateLinks { 894 record_id: RecordId { 895 did: "did:plc:someone-else".into(), 896 collection: "app.t.c".into(), 897 rkey: "asdf".into(), 898 }, 899 links: vec![CollectedLink { 900 target: Link::Uri("a.com".into()), 901 path: ".abc.uri".into(), 902 }], 903 }, 904 0, 905 )?; 906 907 let links = storage.get_links( 908 "a.com", 909 "app.t.c", 910 ".abc.uri", 911 2, 912 None, 913 &HashSet::from([Did("did:plc:linker".to_string())]), 914 )?; 915 assert_eq!( 916 links, 917 PagedAppendingCollection { 918 version: (2, 0), 919 items: vec![ 920 RecordId { 921 did: "did:plc:linker".into(), 922 collection: "app.t.c".into(), 923 rkey: "asdf-2".into(), 924 }, 925 RecordId { 926 did: "did:plc:linker".into(), 927 collection: "app.t.c".into(), 928 rkey: "asdf".into(), 929 }, 930 ], 931 next: None, 932 total: 2, 933 } 934 ); 935 936 let links = storage.get_links( 937 "a.com", 938 "app.t.c", 939 ".abc.uri", 940 2, 941 None, 942 &HashSet::from([ 943 Did("did:plc:linker".to_string()), 944 Did("did:plc:someone-else".to_string()), 945 ]), 946 )?; 947 assert_eq!( 948 links, 949 PagedAppendingCollection { 950 version: (3, 0), 951 items: vec![ 952 RecordId { 953 did: "did:plc:someone-else".into(), 954 collection: "app.t.c".into(), 955 rkey: "asdf".into(), 956 }, 957 RecordId { 958 did: "did:plc:linker".into(), 959 collection: "app.t.c".into(), 960 rkey: "asdf-2".into(), 961 }, 962 ], 963 next: Some(1), 964 total: 3, 965 } 966 ); 967 968 let links = storage.get_links( 969 "a.com", 970 "app.t.c", 971 ".abc.uri", 972 2, 973 None, 974 &HashSet::from([Did("did:plc:someone-unknown".to_string())]), 975 )?; 976 assert_eq!( 977 links, 978 PagedAppendingCollection { 979 version: (0, 0), 980 items: vec![], 981 next: None, 982 total: 0, 983 } 984 ); 985 }); 986 987 test_each_storage!(get_links_exact_multiple, |storage| { 988 for i in 1..=4 { 989 storage.push( 990 &ActionableEvent::CreateLinks { 991 record_id: RecordId { 992 did: format!("did:plc:asdf-{i}").into(), 993 collection: "app.t.c".into(), 994 rkey: "asdf".into(), 995 }, 996 links: vec![CollectedLink { 997 target: Link::Uri("a.com".into()), 998 path: ".abc.uri".into(), 999 }], 1000 }, 1001 0, 1002 )?; 1003 } 1004 let links = 1005 storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?; 1006 assert_eq!( 1007 links, 1008 PagedAppendingCollection { 1009 version: (4, 0), 1010 items: vec![ 1011 RecordId { 1012 did: "did:plc:asdf-4".into(), 1013 collection: "app.t.c".into(), 1014 rkey: "asdf".into(), 1015 }, 1016 RecordId { 1017 did: "did:plc:asdf-3".into(), 1018 collection: "app.t.c".into(), 1019 rkey: "asdf".into(), 1020 }, 1021 ], 1022 next: Some(2), 1023 total: 4, 1024 } 1025 ); 1026 let links = storage.get_links( 1027 "a.com", 1028 "app.t.c", 1029 ".abc.uri", 1030 2, 1031 links.next, 1032 &HashSet::default(), 1033 )?; 1034 assert_eq!( 1035 links, 1036 PagedAppendingCollection { 1037 version: (4, 0), 1038 items: vec![ 1039 RecordId { 1040 did: "did:plc:asdf-2".into(), 1041 collection: "app.t.c".into(), 1042 rkey: "asdf".into(), 1043 }, 1044 RecordId { 1045 did: "did:plc:asdf-1".into(), 1046 collection: "app.t.c".into(), 1047 rkey: "asdf".into(), 1048 }, 1049 ], 1050 next: None, 1051 total: 4, 1052 } 1053 ); 1054 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 4..=4); 1055 }); 1056 1057 test_each_storage!(page_links_while_new_links_arrive, |storage| { 1058 for i in 1..=4 { 1059 storage.push( 1060 &ActionableEvent::CreateLinks { 1061 record_id: RecordId { 1062 did: format!("did:plc:asdf-{i}").into(), 1063 collection: "app.t.c".into(), 1064 rkey: "asdf".into(), 1065 }, 1066 links: vec![CollectedLink { 1067 target: Link::Uri("a.com".into()), 1068 path: ".abc.uri".into(), 1069 }], 1070 }, 1071 0, 1072 )?; 1073 } 1074 let links = 1075 storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?; 1076 assert_eq!( 1077 links, 1078 PagedAppendingCollection { 1079 version: (4, 0), 1080 items: vec![ 1081 RecordId { 1082 did: "did:plc:asdf-4".into(), 1083 collection: "app.t.c".into(), 1084 rkey: "asdf".into(), 1085 }, 1086 RecordId { 1087 did: "did:plc:asdf-3".into(), 1088 collection: "app.t.c".into(), 1089 rkey: "asdf".into(), 1090 }, 1091 ], 1092 next: Some(2), 1093 total: 4, 1094 } 1095 ); 1096 storage.push( 1097 &ActionableEvent::CreateLinks { 1098 record_id: RecordId { 1099 did: "did:plc:asdf-5".into(), 1100 collection: "app.t.c".into(), 1101 rkey: "asdf".into(), 1102 }, 1103 links: vec![CollectedLink { 1104 target: Link::Uri("a.com".into()), 1105 path: ".abc.uri".into(), 1106 }], 1107 }, 1108 0, 1109 )?; 1110 let links = storage.get_links( 1111 "a.com", 1112 "app.t.c", 1113 ".abc.uri", 1114 2, 1115 links.next, 1116 &HashSet::default(), 1117 )?; 1118 assert_eq!( 1119 links, 1120 PagedAppendingCollection { 1121 version: (5, 0), 1122 items: vec![ 1123 RecordId { 1124 did: "did:plc:asdf-2".into(), 1125 collection: "app.t.c".into(), 1126 rkey: "asdf".into(), 1127 }, 1128 RecordId { 1129 did: "did:plc:asdf-1".into(), 1130 collection: "app.t.c".into(), 1131 rkey: "asdf".into(), 1132 }, 1133 ], 1134 next: None, 1135 total: 5, 1136 } 1137 ); 1138 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5); 1139 }); 1140 1141 test_each_storage!(page_links_while_some_are_deleted, |storage| { 1142 for i in 1..=4 { 1143 storage.push( 1144 &ActionableEvent::CreateLinks { 1145 record_id: RecordId { 1146 did: format!("did:plc:asdf-{i}").into(), 1147 collection: "app.t.c".into(), 1148 rkey: "asdf".into(), 1149 }, 1150 links: vec![CollectedLink { 1151 target: Link::Uri("a.com".into()), 1152 path: ".abc.uri".into(), 1153 }], 1154 }, 1155 0, 1156 )?; 1157 } 1158 let links = 1159 storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?; 1160 assert_eq!( 1161 links, 1162 PagedAppendingCollection { 1163 version: (4, 0), 1164 items: vec![ 1165 RecordId { 1166 did: "did:plc:asdf-4".into(), 1167 collection: "app.t.c".into(), 1168 rkey: "asdf".into(), 1169 }, 1170 RecordId { 1171 did: "did:plc:asdf-3".into(), 1172 collection: "app.t.c".into(), 1173 rkey: "asdf".into(), 1174 }, 1175 ], 1176 next: Some(2), 1177 total: 4, 1178 } 1179 ); 1180 storage.push( 1181 &ActionableEvent::DeleteRecord(RecordId { 1182 did: "did:plc:asdf-2".into(), 1183 collection: "app.t.c".into(), 1184 rkey: "asdf".into(), 1185 }), 1186 0, 1187 )?; 1188 let links = storage.get_links( 1189 "a.com", 1190 "app.t.c", 1191 ".abc.uri", 1192 2, 1193 links.next, 1194 &HashSet::default(), 1195 )?; 1196 assert_eq!( 1197 links, 1198 PagedAppendingCollection { 1199 version: (4, 1), 1200 items: vec![RecordId { 1201 did: "did:plc:asdf-1".into(), 1202 collection: "app.t.c".into(), 1203 rkey: "asdf".into(), 1204 },], 1205 next: None, 1206 total: 3, 1207 } 1208 ); 1209 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 3..=3); 1210 }); 1211 1212 test_each_storage!(page_links_accounts_inactive, |storage| { 1213 for i in 1..=4 { 1214 storage.push( 1215 &ActionableEvent::CreateLinks { 1216 record_id: RecordId { 1217 did: format!("did:plc:asdf-{i}").into(), 1218 collection: "app.t.c".into(), 1219 rkey: "asdf".into(), 1220 }, 1221 links: vec![CollectedLink { 1222 target: Link::Uri("a.com".into()), 1223 path: ".abc.uri".into(), 1224 }], 1225 }, 1226 0, 1227 )?; 1228 } 1229 let links = 1230 storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::default())?; 1231 assert_eq!( 1232 links, 1233 PagedAppendingCollection { 1234 version: (4, 0), 1235 items: vec![ 1236 RecordId { 1237 did: "did:plc:asdf-4".into(), 1238 collection: "app.t.c".into(), 1239 rkey: "asdf".into(), 1240 }, 1241 RecordId { 1242 did: "did:plc:asdf-3".into(), 1243 collection: "app.t.c".into(), 1244 rkey: "asdf".into(), 1245 }, 1246 ], 1247 next: Some(2), 1248 total: 4, 1249 } 1250 ); 1251 storage.push( 1252 &ActionableEvent::DeactivateAccount("did:plc:asdf-1".into()), 1253 0, 1254 )?; 1255 let links = storage.get_links( 1256 "a.com", 1257 "app.t.c", 1258 ".abc.uri", 1259 2, 1260 links.next, 1261 &HashSet::default(), 1262 )?; 1263 assert_eq!( 1264 links, 1265 PagedAppendingCollection { 1266 version: (4, 0), 1267 items: vec![RecordId { 1268 did: "did:plc:asdf-2".into(), 1269 collection: "app.t.c".into(), 1270 rkey: "asdf".into(), 1271 },], 1272 next: None, 1273 total: 4, 1274 } 1275 ); 1276 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 4..=4); 1277 }); 1278 1279 test_each_storage!(get_all_counts, |storage| { 1280 storage.push( 1281 &ActionableEvent::CreateLinks { 1282 record_id: RecordId { 1283 did: "did:plc:asdf".into(), 1284 collection: "app.t.c".into(), 1285 rkey: "asdf".into(), 1286 }, 1287 links: vec![ 1288 CollectedLink { 1289 target: Link::Uri("a.com".into()), 1290 path: ".abc.uri".into(), 1291 }, 1292 CollectedLink { 1293 target: Link::Uri("a.com".into()), 1294 path: ".def.uri".into(), 1295 }, 1296 ], 1297 }, 1298 0, 1299 )?; 1300 assert_eq!(storage.get_all_record_counts("a.com")?, { 1301 let mut counts = HashMap::new(); 1302 let mut t_c_counts = HashMap::new(); 1303 t_c_counts.insert(".abc.uri".into(), 1); 1304 t_c_counts.insert(".def.uri".into(), 1); 1305 counts.insert("app.t.c".into(), t_c_counts); 1306 counts 1307 }); 1308 assert_eq!(storage.get_all_counts("a.com")?, { 1309 let mut counts = HashMap::new(); 1310 let mut t_c_counts = HashMap::new(); 1311 t_c_counts.insert( 1312 ".abc.uri".into(), 1313 CountsByCount { 1314 records: 1, 1315 distinct_dids: 1, 1316 }, 1317 ); 1318 t_c_counts.insert( 1319 ".def.uri".into(), 1320 CountsByCount { 1321 records: 1, 1322 distinct_dids: 1, 1323 }, 1324 ); 1325 counts.insert("app.t.c".into(), t_c_counts); 1326 counts 1327 }); 1328 assert_stats(storage.get_stats()?, 1..=1, 2..=2, 1..=1); 1329 }); 1330}