Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::{ActionableEvent, CountsByCount, Did, RecordId}; 2use anyhow::Result; 3use serde::{Deserialize, Serialize}; 4use std::collections::HashMap; 5 6pub mod mem_store; 7pub use mem_store::MemStorage; 8 9#[cfg(feature = "rocks")] 10pub mod rocks_store; 11#[cfg(feature = "rocks")] 12pub use rocks_store::RocksStorage; 13 14#[derive(Debug, PartialEq)] 15pub struct PagedAppendingCollection<T> { 16 pub version: (u64, u64), // (collection length, deleted item count) // TODO: change to (total, active)? since dedups isn't "deleted" 17 pub items: Vec<T>, 18 pub next: Option<u64>, 19 pub total: u64, 20} 21 22#[derive(Debug, Deserialize, Serialize, PartialEq)] 23pub struct StorageStats { 24 /// estimate of how many accounts we've seen create links. the _subjects_ of any links are not represented here. 25 /// for example: new user A follows users B and C. this count will only increment by one, for A. 26 pub dids: u64, 27 28 /// estimate targets * distinct (collection, path)s to reference them. 29 /// distinct targets alone are currently challenging to estimate. 30 pub targetables: u64, 31 32 /// estimate of the count of atproto records seen that contain links. 33 /// records with multiple links are single-counted. 34 /// for LSM stores, deleted links don't decrement this, and updated records with any links will likely increment it. 35 pub linking_records: u64, 36} 37 38pub trait LinkStorage: Send + Sync { 39 /// jetstream cursor from last saved actions, if available 40 fn get_cursor(&mut self) -> Result<Option<u64>> { 41 Ok(None) 42 } 43 44 fn push(&mut self, event: &ActionableEvent, cursor: u64) -> Result<()>; 45 46 // readers are off from the writer instance 47 fn to_readable(&mut self) -> impl LinkReader; 48} 49 50pub trait LinkReader: Clone + Send + Sync + 'static { 51 fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>; 52 53 fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>; 54 55 fn get_links( 56 &self, 57 target: &str, 58 collection: &str, 59 path: &str, 60 limit: u64, 61 until: Option<u64>, 62 ) -> Result<PagedAppendingCollection<RecordId>>; 63 64 fn get_distinct_dids( 65 &self, 66 target: &str, 67 collection: &str, 68 path: &str, 69 limit: u64, 70 until: Option<u64>, 71 ) -> Result<PagedAppendingCollection<Did>>; // TODO: reflect dedups in cursor 72 73 fn get_all_record_counts(&self, _target: &str) 74 -> Result<HashMap<String, HashMap<String, u64>>>; 75 76 fn get_all_counts( 77 &self, 78 _target: &str, 79 ) -> Result<HashMap<String, HashMap<String, CountsByCount>>>; 80 81 /// assume all stats are estimates, since exact counts are very challenging for LSMs 82 fn get_stats(&self) -> Result<StorageStats>; 83} 84 85#[cfg(test)] 86mod tests { 87 use super::*; 88 use links::{CollectedLink, Link}; 89 use std::ops::RangeBounds; 90 91 macro_rules! test_each_storage { 92 ($test_name:ident, |$storage_label:ident| $test_code:block) => { 93 #[test] 94 fn $test_name() -> Result<()> { 95 { 96 println!("=> testing with memstorage backend"); 97 #[allow(unused_mut)] 98 let mut $storage_label = MemStorage::new(); 99 $test_code 100 } 101 102 #[cfg(feature = "rocks")] 103 { 104 println!("=> testing with rocksdb backend"); 105 let rocks_db_path = tempfile::tempdir()?; 106 #[allow(unused_mut)] 107 let mut $storage_label = RocksStorage::new(rocks_db_path.path())?; 108 $test_code 109 } 110 111 Ok(()) 112 } 113 }; 114 } 115 116 fn assert_stats( 117 stats: StorageStats, 118 dids: impl RangeBounds<u64>, 119 targetables: impl RangeBounds<u64>, 120 linking_records: impl RangeBounds<u64>, 121 ) { 122 fn check(name: &str, stat: u64, rb: impl RangeBounds<u64>) { 123 assert!( 124 rb.contains(&stat), 125 "{name:?}: {stat:?} not in range {:?}–{:?}", 126 rb.start_bound(), 127 rb.end_bound() 128 ); 129 } 130 check("dids", stats.dids, dids); 131 check("targetables", stats.targetables, targetables); 132 check("linking_records", stats.linking_records, linking_records); 133 } 134 135 test_each_storage!(test_empty, |storage| { 136 assert_eq!(storage.get_count("", "", "")?, 0); 137 assert_eq!(storage.get_count("a", "b", "c")?, 0); 138 assert_eq!( 139 storage.get_count( 140 "at://did:plc:b3rzzkblqsxhr3dgcueymkqe/app.bsky.feed.post/3lf6yc4drhk2f", 141 "app.t.c", 142 ".reply.parent.uri" 143 )?, 144 0 145 ); 146 assert_eq!(storage.get_distinct_did_count("", "", "")?, 0); 147 assert_eq!( 148 storage.get_links("a.com", "app.t.c", ".abc.uri", 100, None)?, 149 PagedAppendingCollection { 150 version: (0, 0), 151 items: vec![], 152 next: None, 153 total: 0, 154 } 155 ); 156 assert_eq!( 157 storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?, 158 PagedAppendingCollection { 159 version: (0, 0), 160 items: vec![], 161 next: None, 162 total: 0, 163 } 164 ); 165 assert_eq!(storage.get_all_counts("bad-example.com")?, HashMap::new()); 166 assert_eq!( 167 storage.get_all_record_counts("bad-example.com")?, 168 HashMap::new() 169 ); 170 171 assert_stats(storage.get_stats()?, 0..=0, 0..=0, 0..=0); 172 }); 173 174 test_each_storage!(test_add_link, |storage| { 175 storage.push( 176 &ActionableEvent::CreateLinks { 177 record_id: RecordId { 178 did: "did:plc:asdf".into(), 179 collection: "app.t.c".into(), 180 rkey: "fdsa".into(), 181 }, 182 links: vec![CollectedLink { 183 target: Link::Uri("e.com".into()), 184 path: ".abc.uri".into(), 185 }], 186 }, 187 0, 188 )?; 189 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 190 assert_eq!( 191 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 192 1 193 ); 194 assert_eq!(storage.get_count("bad.com", "app.t.c", ".abc.uri")?, 0); 195 assert_eq!(storage.get_count("e.com", "app.t.c", ".bad.uri")?, 0); 196 assert_eq!( 197 storage.get_distinct_did_count("e.com", "app.t.c", ".bad.uri")?, 198 0 199 ); 200 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1); 201 }); 202 203 test_each_storage!(test_links, |storage| { 204 storage.push( 205 &ActionableEvent::CreateLinks { 206 record_id: RecordId { 207 did: "did:plc:asdf".into(), 208 collection: "app.t.c".into(), 209 rkey: "fdsa".into(), 210 }, 211 links: vec![CollectedLink { 212 target: Link::Uri("e.com".into()), 213 path: ".abc.uri".into(), 214 }], 215 }, 216 0, 217 )?; 218 219 // delete under the wrong collection 220 storage.push( 221 &ActionableEvent::DeleteRecord(RecordId { 222 did: "did:plc:asdf".into(), 223 collection: "app.test.wrongcollection".into(), 224 rkey: "fdsa".into(), 225 }), 226 0, 227 )?; 228 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 229 230 // delete under the wrong rkey 231 storage.push( 232 &ActionableEvent::DeleteRecord(RecordId { 233 did: "did:plc:asdf".into(), 234 collection: "app.t.c".into(), 235 rkey: "wrongkey".into(), 236 }), 237 0, 238 )?; 239 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 240 241 // finally actually delete it 242 storage.push( 243 &ActionableEvent::DeleteRecord(RecordId { 244 did: "did:plc:asdf".into(), 245 collection: "app.t.c".into(), 246 rkey: "fdsa".into(), 247 }), 248 0, 249 )?; 250 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0); 251 252 // put it back 253 storage.push( 254 &ActionableEvent::CreateLinks { 255 record_id: RecordId { 256 did: "did:plc:asdf".into(), 257 collection: "app.t.c".into(), 258 rkey: "fdsa".into(), 259 }, 260 links: vec![CollectedLink { 261 target: Link::Uri("e.com".into()), 262 path: ".abc.uri".into(), 263 }], 264 }, 265 0, 266 )?; 267 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 268 assert_eq!( 269 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 270 1 271 ); 272 273 // add another link from this user 274 storage.push( 275 &ActionableEvent::CreateLinks { 276 record_id: RecordId { 277 did: "did:plc:asdf".into(), 278 collection: "app.t.c".into(), 279 rkey: "fdsa2".into(), 280 }, 281 links: vec![CollectedLink { 282 target: Link::Uri("e.com".into()), 283 path: ".abc.uri".into(), 284 }], 285 }, 286 0, 287 )?; 288 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2); 289 assert_eq!( 290 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 291 1 292 ); 293 294 // add a link from someone else 295 storage.push( 296 &ActionableEvent::CreateLinks { 297 record_id: RecordId { 298 did: "did:plc:asdfasdf".into(), 299 collection: "app.t.c".into(), 300 rkey: "fdsa".into(), 301 }, 302 links: vec![CollectedLink { 303 target: Link::Uri("e.com".into()), 304 path: ".abc.uri".into(), 305 }], 306 }, 307 0, 308 )?; 309 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 3); 310 assert_eq!( 311 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 312 2 313 ); 314 315 // aaaand delete the first one again 316 storage.push( 317 &ActionableEvent::DeleteRecord(RecordId { 318 did: "did:plc:asdf".into(), 319 collection: "app.t.c".into(), 320 rkey: "fdsa".into(), 321 }), 322 0, 323 )?; 324 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2); 325 assert_eq!( 326 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 327 2 328 ); 329 assert_stats(storage.get_stats()?, 2..=2, 1..=1, 2..=2); 330 }); 331 332 test_each_storage!(test_two_user_links_delete_one, |storage| { 333 // create the first link 334 storage.push( 335 &ActionableEvent::CreateLinks { 336 record_id: RecordId { 337 did: "did:plc:asdf".into(), 338 collection: "app.t.c".into(), 339 rkey: "A".into(), 340 }, 341 links: vec![CollectedLink { 342 target: Link::Uri("e.com".into()), 343 path: ".abc.uri".into(), 344 }], 345 }, 346 0, 347 )?; 348 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 349 assert_eq!( 350 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 351 1 352 ); 353 354 // create the second link (same user, different rkey) 355 storage.push( 356 &ActionableEvent::CreateLinks { 357 record_id: RecordId { 358 did: "did:plc:asdf".into(), 359 collection: "app.t.c".into(), 360 rkey: "B".into(), 361 }, 362 links: vec![CollectedLink { 363 target: Link::Uri("e.com".into()), 364 path: ".abc.uri".into(), 365 }], 366 }, 367 0, 368 )?; 369 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2); 370 assert_eq!( 371 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 372 1 373 ); 374 375 // aaaand delete the first link 376 storage.push( 377 &ActionableEvent::DeleteRecord(RecordId { 378 did: "did:plc:asdf".into(), 379 collection: "app.t.c".into(), 380 rkey: "A".into(), 381 }), 382 0, 383 )?; 384 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 385 assert_eq!( 386 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 387 1 388 ); 389 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1); 390 }); 391 392 test_each_storage!(test_accounts, |storage| { 393 // create two links 394 storage.push( 395 &ActionableEvent::CreateLinks { 396 record_id: RecordId { 397 did: "did:plc:asdf".into(), 398 collection: "app.t.c".into(), 399 rkey: "A".into(), 400 }, 401 links: vec![CollectedLink { 402 target: Link::Uri("a.com".into()), 403 path: ".abc.uri".into(), 404 }], 405 }, 406 0, 407 )?; 408 storage.push( 409 &ActionableEvent::CreateLinks { 410 record_id: RecordId { 411 did: "did:plc:asdf".into(), 412 collection: "app.t.c".into(), 413 rkey: "B".into(), 414 }, 415 links: vec![CollectedLink { 416 target: Link::Uri("b.com".into()), 417 path: ".abc.uri".into(), 418 }], 419 }, 420 0, 421 )?; 422 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1); 423 assert_eq!(storage.get_count("b.com", "app.t.c", ".abc.uri")?, 1); 424 425 // and a third from a different account 426 storage.push( 427 &ActionableEvent::CreateLinks { 428 record_id: RecordId { 429 did: "did:plc:fdsa".into(), 430 collection: "app.t.c".into(), 431 rkey: "A".into(), 432 }, 433 links: vec![CollectedLink { 434 target: Link::Uri("a.com".into()), 435 path: ".abc.uri".into(), 436 }], 437 }, 438 0, 439 )?; 440 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 2); 441 442 // delete the first account 443 storage.push(&ActionableEvent::DeleteAccount("did:plc:asdf".into()), 0)?; 444 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1); 445 assert_eq!(storage.get_count("b.com", "app.t.c", ".abc.uri")?, 0); 446 assert_stats(storage.get_stats()?, 1..=2, 2..=2, 1..=1); 447 }); 448 449 test_each_storage!(multi_link, |storage| { 450 storage.push( 451 &ActionableEvent::CreateLinks { 452 record_id: RecordId { 453 did: "did:plc:asdf".into(), 454 collection: "app.t.c".into(), 455 rkey: "fdsa".into(), 456 }, 457 links: vec![ 458 CollectedLink { 459 target: Link::Uri("e.com".into()), 460 path: ".abc.uri".into(), 461 }, 462 CollectedLink { 463 target: Link::Uri("f.com".into()), 464 path: ".xyz[].uri".into(), 465 }, 466 CollectedLink { 467 target: Link::Uri("g.com".into()), 468 path: ".xyz[].uri".into(), 469 }, 470 ], 471 }, 472 0, 473 )?; 474 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 475 assert_eq!( 476 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 477 1 478 ); 479 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1); 480 assert_eq!( 481 storage.get_distinct_did_count("f.com", "app.t.c", ".xyz[].uri")?, 482 1 483 ); 484 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 1); 485 assert_eq!( 486 storage.get_distinct_did_count("g.com", "app.t.c", ".xyz[].uri")?, 487 1 488 ); 489 490 storage.push( 491 &ActionableEvent::DeleteRecord(RecordId { 492 did: "did:plc:asdf".into(), 493 collection: "app.t.c".into(), 494 rkey: "fdsa".into(), 495 }), 496 0, 497 )?; 498 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0); 499 assert_eq!( 500 storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 501 0 502 ); 503 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 0); 504 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 0); 505 assert_stats(storage.get_stats()?, 1..=1, 3..=3, 0..=0); 506 }); 507 508 test_each_storage!(update_link, |storage| { 509 // create the links 510 storage.push( 511 &ActionableEvent::CreateLinks { 512 record_id: RecordId { 513 did: "did:plc:asdf".into(), 514 collection: "app.t.c".into(), 515 rkey: "fdsa".into(), 516 }, 517 links: vec![ 518 CollectedLink { 519 target: Link::Uri("e.com".into()), 520 path: ".abc.uri".into(), 521 }, 522 CollectedLink { 523 target: Link::Uri("f.com".into()), 524 path: ".xyz[].uri".into(), 525 }, 526 CollectedLink { 527 target: Link::Uri("g.com".into()), 528 path: ".xyz[].uri".into(), 529 }, 530 ], 531 }, 532 0, 533 )?; 534 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 535 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1); 536 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 1); 537 538 // update them 539 storage.push( 540 &ActionableEvent::UpdateLinks { 541 record_id: RecordId { 542 did: "did:plc:asdf".into(), 543 collection: "app.t.c".into(), 544 rkey: "fdsa".into(), 545 }, 546 new_links: vec![ 547 CollectedLink { 548 target: Link::Uri("h.com".into()), 549 path: ".abc.uri".into(), 550 }, 551 CollectedLink { 552 target: Link::Uri("f.com".into()), 553 path: ".xyz[].uri".into(), 554 }, 555 CollectedLink { 556 target: Link::Uri("i.com".into()), 557 path: ".xyz[].uri".into(), 558 }, 559 ], 560 }, 561 0, 562 )?; 563 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0); 564 assert_eq!(storage.get_count("h.com", "app.t.c", ".abc.uri")?, 1); 565 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1); 566 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 0); 567 assert_eq!(storage.get_count("i.com", "app.t.c", ".xyz[].uri")?, 1); 568 assert_stats(storage.get_stats()?, 1..=1, 5..=5, 1..=1); 569 }); 570 571 test_each_storage!(update_no_links_to_links, |storage| { 572 // update without prior create (consumer would have filtered out the original) 573 storage.push( 574 &ActionableEvent::UpdateLinks { 575 record_id: RecordId { 576 did: "did:plc:asdf".into(), 577 collection: "app.t.c".into(), 578 rkey: "asdf".into(), 579 }, 580 new_links: vec![CollectedLink { 581 target: Link::Uri("a.com".into()), 582 path: ".abc.uri".into(), 583 }], 584 }, 585 0, 586 )?; 587 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1); 588 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1); 589 }); 590 591 test_each_storage!(delete_multi_link_same_target, |storage| { 592 storage.push( 593 &ActionableEvent::CreateLinks { 594 record_id: RecordId { 595 did: "did:plc:asdf".into(), 596 collection: "app.t.c".into(), 597 rkey: "asdf".into(), 598 }, 599 links: vec![ 600 CollectedLink { 601 target: Link::Uri("a.com".into()), 602 path: ".abc.uri".into(), 603 }, 604 CollectedLink { 605 target: Link::Uri("a.com".into()), 606 path: ".def.uri".into(), 607 }, 608 ], 609 }, 610 0, 611 )?; 612 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 1); 613 assert_eq!(storage.get_count("a.com", "app.t.c", ".def.uri")?, 1); 614 615 storage.push( 616 &ActionableEvent::DeleteRecord(RecordId { 617 did: "did:plc:asdf".into(), 618 collection: "app.t.c".into(), 619 rkey: "asdf".into(), 620 }), 621 0, 622 )?; 623 assert_eq!(storage.get_count("a.com", "app.t.c", ".abc.uri")?, 0); 624 assert_eq!(storage.get_count("a.com", "app.t.c", ".def.uri")?, 0); 625 assert_stats(storage.get_stats()?, 1..=1, 2..=2, 0..=0); 626 }); 627 628 test_each_storage!(get_links_basic, |storage| { 629 storage.push( 630 &ActionableEvent::CreateLinks { 631 record_id: RecordId { 632 did: "did:plc:asdf".into(), 633 collection: "app.t.c".into(), 634 rkey: "asdf".into(), 635 }, 636 links: vec![CollectedLink { 637 target: Link::Uri("a.com".into()), 638 path: ".abc.uri".into(), 639 }], 640 }, 641 0, 642 )?; 643 assert_eq!( 644 storage.get_links("a.com", "app.t.c", ".abc.uri", 100, None)?, 645 PagedAppendingCollection { 646 version: (1, 0), 647 items: vec![RecordId { 648 did: "did:plc:asdf".into(), 649 collection: "app.t.c".into(), 650 rkey: "asdf".into(), 651 }], 652 next: None, 653 total: 1, 654 } 655 ); 656 assert_eq!( 657 storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?, 658 PagedAppendingCollection { 659 version: (1, 0), 660 items: vec!["did:plc:asdf".into()], 661 next: None, 662 total: 1, 663 } 664 ); 665 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1); 666 }); 667 668 test_each_storage!(get_links_paged, |storage| { 669 for i in 1..=5 { 670 storage.push( 671 &ActionableEvent::CreateLinks { 672 record_id: RecordId { 673 did: format!("did:plc:asdf-{i}").into(), 674 collection: "app.t.c".into(), 675 rkey: "asdf".into(), 676 }, 677 links: vec![CollectedLink { 678 target: Link::Uri("a.com".into()), 679 path: ".abc.uri".into(), 680 }], 681 }, 682 0, 683 )?; 684 } 685 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None)?; 686 let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, None)?; 687 assert_eq!( 688 links, 689 PagedAppendingCollection { 690 version: (5, 0), 691 items: vec![ 692 RecordId { 693 did: "did:plc:asdf-5".into(), 694 collection: "app.t.c".into(), 695 rkey: "asdf".into(), 696 }, 697 RecordId { 698 did: "did:plc:asdf-4".into(), 699 collection: "app.t.c".into(), 700 rkey: "asdf".into(), 701 }, 702 ], 703 next: Some(3), 704 total: 5, 705 } 706 ); 707 assert_eq!( 708 dids, 709 PagedAppendingCollection { 710 version: (5, 0), 711 items: vec!["did:plc:asdf-5".into(), "did:plc:asdf-4".into()], 712 next: Some(3), 713 total: 5, 714 } 715 ); 716 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next)?; 717 let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, dids.next)?; 718 assert_eq!( 719 links, 720 PagedAppendingCollection { 721 version: (5, 0), 722 items: vec![ 723 RecordId { 724 did: "did:plc:asdf-3".into(), 725 collection: "app.t.c".into(), 726 rkey: "asdf".into(), 727 }, 728 RecordId { 729 did: "did:plc:asdf-2".into(), 730 collection: "app.t.c".into(), 731 rkey: "asdf".into(), 732 }, 733 ], 734 next: Some(1), 735 total: 5, 736 } 737 ); 738 assert_eq!( 739 dids, 740 PagedAppendingCollection { 741 version: (5, 0), 742 items: vec!["did:plc:asdf-3".into(), "did:plc:asdf-2".into()], 743 next: Some(1), 744 total: 5, 745 } 746 ); 747 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next)?; 748 let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, dids.next)?; 749 assert_eq!( 750 links, 751 PagedAppendingCollection { 752 version: (5, 0), 753 items: vec![RecordId { 754 did: "did:plc:asdf-1".into(), 755 collection: "app.t.c".into(), 756 rkey: "asdf".into(), 757 },], 758 next: None, 759 total: 5, 760 } 761 ); 762 assert_eq!( 763 dids, 764 PagedAppendingCollection { 765 version: (5, 0), 766 items: vec!["did:plc:asdf-1".into()], 767 next: None, 768 total: 5, 769 } 770 ); 771 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5); 772 }); 773 774 test_each_storage!(get_links_exact_multiple, |storage| { 775 for i in 1..=4 { 776 storage.push( 777 &ActionableEvent::CreateLinks { 778 record_id: RecordId { 779 did: format!("did:plc:asdf-{i}").into(), 780 collection: "app.t.c".into(), 781 rkey: "asdf".into(), 782 }, 783 links: vec![CollectedLink { 784 target: Link::Uri("a.com".into()), 785 path: ".abc.uri".into(), 786 }], 787 }, 788 0, 789 )?; 790 } 791 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None)?; 792 assert_eq!( 793 links, 794 PagedAppendingCollection { 795 version: (4, 0), 796 items: vec![ 797 RecordId { 798 did: "did:plc:asdf-4".into(), 799 collection: "app.t.c".into(), 800 rkey: "asdf".into(), 801 }, 802 RecordId { 803 did: "did:plc:asdf-3".into(), 804 collection: "app.t.c".into(), 805 rkey: "asdf".into(), 806 }, 807 ], 808 next: Some(2), 809 total: 4, 810 } 811 ); 812 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next)?; 813 assert_eq!( 814 links, 815 PagedAppendingCollection { 816 version: (4, 0), 817 items: vec![ 818 RecordId { 819 did: "did:plc:asdf-2".into(), 820 collection: "app.t.c".into(), 821 rkey: "asdf".into(), 822 }, 823 RecordId { 824 did: "did:plc:asdf-1".into(), 825 collection: "app.t.c".into(), 826 rkey: "asdf".into(), 827 }, 828 ], 829 next: None, 830 total: 4, 831 } 832 ); 833 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 4..=4); 834 }); 835 836 test_each_storage!(page_links_while_new_links_arrive, |storage| { 837 for i in 1..=4 { 838 storage.push( 839 &ActionableEvent::CreateLinks { 840 record_id: RecordId { 841 did: format!("did:plc:asdf-{i}").into(), 842 collection: "app.t.c".into(), 843 rkey: "asdf".into(), 844 }, 845 links: vec![CollectedLink { 846 target: Link::Uri("a.com".into()), 847 path: ".abc.uri".into(), 848 }], 849 }, 850 0, 851 )?; 852 } 853 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None)?; 854 assert_eq!( 855 links, 856 PagedAppendingCollection { 857 version: (4, 0), 858 items: vec![ 859 RecordId { 860 did: "did:plc:asdf-4".into(), 861 collection: "app.t.c".into(), 862 rkey: "asdf".into(), 863 }, 864 RecordId { 865 did: "did:plc:asdf-3".into(), 866 collection: "app.t.c".into(), 867 rkey: "asdf".into(), 868 }, 869 ], 870 next: Some(2), 871 total: 4, 872 } 873 ); 874 storage.push( 875 &ActionableEvent::CreateLinks { 876 record_id: RecordId { 877 did: "did:plc:asdf-5".into(), 878 collection: "app.t.c".into(), 879 rkey: "asdf".into(), 880 }, 881 links: vec![CollectedLink { 882 target: Link::Uri("a.com".into()), 883 path: ".abc.uri".into(), 884 }], 885 }, 886 0, 887 )?; 888 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next)?; 889 assert_eq!( 890 links, 891 PagedAppendingCollection { 892 version: (5, 0), 893 items: vec![ 894 RecordId { 895 did: "did:plc:asdf-2".into(), 896 collection: "app.t.c".into(), 897 rkey: "asdf".into(), 898 }, 899 RecordId { 900 did: "did:plc:asdf-1".into(), 901 collection: "app.t.c".into(), 902 rkey: "asdf".into(), 903 }, 904 ], 905 next: None, 906 total: 5, 907 } 908 ); 909 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5); 910 }); 911 912 test_each_storage!(page_links_while_some_are_deleted, |storage| { 913 for i in 1..=4 { 914 storage.push( 915 &ActionableEvent::CreateLinks { 916 record_id: RecordId { 917 did: format!("did:plc:asdf-{i}").into(), 918 collection: "app.t.c".into(), 919 rkey: "asdf".into(), 920 }, 921 links: vec![CollectedLink { 922 target: Link::Uri("a.com".into()), 923 path: ".abc.uri".into(), 924 }], 925 }, 926 0, 927 )?; 928 } 929 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None)?; 930 assert_eq!( 931 links, 932 PagedAppendingCollection { 933 version: (4, 0), 934 items: vec![ 935 RecordId { 936 did: "did:plc:asdf-4".into(), 937 collection: "app.t.c".into(), 938 rkey: "asdf".into(), 939 }, 940 RecordId { 941 did: "did:plc:asdf-3".into(), 942 collection: "app.t.c".into(), 943 rkey: "asdf".into(), 944 }, 945 ], 946 next: Some(2), 947 total: 4, 948 } 949 ); 950 storage.push( 951 &ActionableEvent::DeleteRecord(RecordId { 952 did: "did:plc:asdf-2".into(), 953 collection: "app.t.c".into(), 954 rkey: "asdf".into(), 955 }), 956 0, 957 )?; 958 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next)?; 959 assert_eq!( 960 links, 961 PagedAppendingCollection { 962 version: (4, 1), 963 items: vec![RecordId { 964 did: "did:plc:asdf-1".into(), 965 collection: "app.t.c".into(), 966 rkey: "asdf".into(), 967 },], 968 next: None, 969 total: 3, 970 } 971 ); 972 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 3..=3); 973 }); 974 975 test_each_storage!(page_links_accounts_inactive, |storage| { 976 for i in 1..=4 { 977 storage.push( 978 &ActionableEvent::CreateLinks { 979 record_id: RecordId { 980 did: format!("did:plc:asdf-{i}").into(), 981 collection: "app.t.c".into(), 982 rkey: "asdf".into(), 983 }, 984 links: vec![CollectedLink { 985 target: Link::Uri("a.com".into()), 986 path: ".abc.uri".into(), 987 }], 988 }, 989 0, 990 )?; 991 } 992 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None)?; 993 assert_eq!( 994 links, 995 PagedAppendingCollection { 996 version: (4, 0), 997 items: vec![ 998 RecordId { 999 did: "did:plc:asdf-4".into(), 1000 collection: "app.t.c".into(), 1001 rkey: "asdf".into(), 1002 }, 1003 RecordId { 1004 did: "did:plc:asdf-3".into(), 1005 collection: "app.t.c".into(), 1006 rkey: "asdf".into(), 1007 }, 1008 ], 1009 next: Some(2), 1010 total: 4, 1011 } 1012 ); 1013 storage.push( 1014 &ActionableEvent::DeactivateAccount("did:plc:asdf-1".into()), 1015 0, 1016 )?; 1017 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next)?; 1018 assert_eq!( 1019 links, 1020 PagedAppendingCollection { 1021 version: (4, 0), 1022 items: vec![RecordId { 1023 did: "did:plc:asdf-2".into(), 1024 collection: "app.t.c".into(), 1025 rkey: "asdf".into(), 1026 },], 1027 next: None, 1028 total: 4, 1029 } 1030 ); 1031 assert_stats(storage.get_stats()?, 4..=4, 1..=1, 4..=4); 1032 }); 1033 1034 test_each_storage!(get_all_counts, |storage| { 1035 storage.push( 1036 &ActionableEvent::CreateLinks { 1037 record_id: RecordId { 1038 did: "did:plc:asdf".into(), 1039 collection: "app.t.c".into(), 1040 rkey: "asdf".into(), 1041 }, 1042 links: vec![ 1043 CollectedLink { 1044 target: Link::Uri("a.com".into()), 1045 path: ".abc.uri".into(), 1046 }, 1047 CollectedLink { 1048 target: Link::Uri("a.com".into()), 1049 path: ".def.uri".into(), 1050 }, 1051 ], 1052 }, 1053 0, 1054 )?; 1055 assert_eq!(storage.get_all_record_counts("a.com")?, { 1056 let mut counts = HashMap::new(); 1057 let mut t_c_counts = HashMap::new(); 1058 t_c_counts.insert(".abc.uri".into(), 1); 1059 t_c_counts.insert(".def.uri".into(), 1); 1060 counts.insert("app.t.c".into(), t_c_counts); 1061 counts 1062 }); 1063 assert_eq!(storage.get_all_counts("a.com")?, { 1064 let mut counts = HashMap::new(); 1065 let mut t_c_counts = HashMap::new(); 1066 t_c_counts.insert( 1067 ".abc.uri".into(), 1068 CountsByCount { 1069 records: 1, 1070 distinct_dids: 1, 1071 }, 1072 ); 1073 t_c_counts.insert( 1074 ".def.uri".into(), 1075 CountsByCount { 1076 records: 1, 1077 distinct_dids: 1, 1078 }, 1079 ); 1080 counts.insert("app.t.c".into(), t_c_counts); 1081 counts 1082 }); 1083 assert_stats(storage.get_stats()?, 1..=1, 2..=2, 1..=1); 1084 }); 1085}