Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
at main 19 kB view raw
1pub mod consumer; 2pub mod db_types; 3pub mod error; 4pub mod file_consumer; 5pub mod index_html; 6pub mod server; 7pub mod storage; 8pub mod storage_fjall; 9pub mod store_types; 10 11use crate::db_types::{EncodingError, EncodingResult}; 12use crate::error::BatchInsertError; 13use crate::store_types::{CountsValue, SketchSecretPrefix}; 14use cardinality_estimator_safe::{Element, Sketch}; 15use error::FirehoseEventError; 16use jetstream::events::{CommitEvent, CommitOp, Cursor}; 17use jetstream::exports::{Did, Nsid, RecordKey}; 18use schemars::JsonSchema; 19use serde::Serialize; 20use serde_json::value::RawValue; 21use sha2::Sha256; 22use std::collections::HashMap; 23use std::time::Duration; 24 25fn did_element(sketch_secret: &SketchSecretPrefix, did: &Did) -> Element<14> { 26 Element::from_digest_with_prefix::<Sha256>(sketch_secret, did.as_bytes()) 27} 28 29pub fn nice_duration(dt: Duration) -> String { 30 let secs = dt.as_secs_f64(); 31 if secs < 1. { 32 return format!("{:.0}ms", secs * 1000.); 33 } 34 if secs < 60. { 35 return format!("{secs:.02}s"); 36 } 37 let mins = (secs / 60.).floor(); 38 let rsecs = secs - (mins * 60.); 39 if mins < 60. { 40 return format!("{mins:.0}m{rsecs:.0}s"); 41 } 42 let hrs = (mins / 60.).floor(); 43 let rmins = mins - (hrs * 60.); 44 if hrs < 24. { 45 return format!("{hrs:.0}h{rmins:.0}m{rsecs:.0}s"); 46 } 47 let days = (hrs / 24.).floor(); 48 let rhrs = hrs - (days * 24.); 49 format!("{days:.0}d{rhrs:.0}h{rmins:.0}m{rsecs:.0}s") 50} 51 52#[derive(Debug, Default, Clone)] 53pub struct CollectionCommits<const LIMIT: usize> { 54 pub creates: usize, 55 pub updates: usize, 56 pub deletes: usize, 57 pub dids_estimate: Sketch<14>, 58 pub commits: Vec<UFOsCommit>, 59 head: usize, 60} 61 62impl<const LIMIT: usize> CollectionCommits<LIMIT> { 63 fn advance_head(&mut self) { 64 self.head += 1; 65 if self.head > LIMIT { 66 self.head = 0; 67 } 68 } 69 /// lossy-ish commit insertion 70 /// 71 /// - new commits are *always* added to the batch or else rejected as full. 72 /// - when LIMIT is reached, new commits can displace existing `creates`. 73 /// `update`s and `delete`s are *never* displaced. 74 /// - if all batched `creates` have been displaced, the batch is full. 75 /// 76 /// in general it's rare for commits to be displaced except for very high- 77 /// volume collections such as `app.bsky.feed.like`. 78 /// 79 /// it could be nice in the future to retain all batched commits and just 80 /// drop new `creates` after a limit instead. 81 pub fn truncating_insert( 82 &mut self, 83 commit: UFOsCommit, 84 sketch_secret: &SketchSecretPrefix, 85 ) -> Result<(), BatchInsertError> { 86 if (self.updates + self.deletes) == LIMIT { 87 // nothing can be displaced (only `create`s may be displaced) 88 return Err(BatchInsertError::BatchFull(commit)); 89 } 90 91 // every kind of commit counts as "user activity" 92 self.dids_estimate 93 .insert(did_element(sketch_secret, &commit.did)); 94 95 match commit.action { 96 CommitAction::Put(PutAction { 97 is_update: false, .. 98 }) => { 99 self.creates += 1; 100 } 101 CommitAction::Put(PutAction { 102 is_update: true, .. 103 }) => { 104 self.updates += 1; 105 } 106 CommitAction::Cut => { 107 self.deletes += 1; 108 } 109 } 110 111 if self.commits.len() < LIMIT { 112 // normal insert: there's space left to put a new commit at the end 113 self.commits.push(commit); 114 } else { 115 // displacement insert: find an old `create` we can displace 116 let head_started_at = self.head; 117 loop { 118 let candidate = self 119 .commits 120 .get_mut(self.head) 121 .ok_or(BatchInsertError::BatchOverflow(self.head))?; 122 if candidate.action.is_create() { 123 *candidate = commit; 124 break; 125 } 126 self.advance_head(); 127 if self.head == head_started_at { 128 return Err(BatchInsertError::BatchForever); 129 } 130 } 131 } 132 133 Ok(()) 134 } 135} 136 137#[derive(Debug, Clone)] 138pub struct DeleteAccount { 139 pub did: Did, 140 pub cursor: Cursor, 141} 142 143#[derive(Debug, Clone)] 144pub enum CommitAction { 145 Put(PutAction), 146 Cut, 147} 148impl CommitAction { 149 pub fn is_create(&self) -> bool { 150 match self { 151 CommitAction::Put(PutAction { is_update, .. }) => !is_update, 152 CommitAction::Cut => false, 153 } 154 } 155} 156 157#[derive(Debug, Clone)] 158pub struct PutAction { 159 record: Box<RawValue>, 160 is_update: bool, 161} 162 163#[derive(Debug, Clone)] 164pub struct UFOsCommit { 165 cursor: Cursor, 166 did: Did, 167 rkey: RecordKey, 168 rev: String, 169 action: CommitAction, 170} 171 172#[derive(Debug, Clone, Serialize)] 173pub struct UFOsRecord { 174 pub cursor: Cursor, 175 pub did: Did, 176 pub collection: Nsid, 177 pub rkey: RecordKey, 178 pub rev: String, 179 // TODO: cid? 180 pub record: Box<RawValue>, 181 pub is_update: bool, 182} 183 184impl UFOsCommit { 185 pub fn from_commit_info( 186 commit: CommitEvent, 187 did: Did, 188 cursor: Cursor, 189 ) -> Result<(Self, Nsid), FirehoseEventError> { 190 let action = match commit.operation { 191 CommitOp::Delete => CommitAction::Cut, 192 cru => CommitAction::Put(PutAction { 193 record: commit.record.ok_or(FirehoseEventError::CruMissingRecord)?, 194 is_update: cru == CommitOp::Update, 195 }), 196 }; 197 let batched = Self { 198 cursor, 199 did, 200 rkey: commit.rkey, 201 rev: commit.rev, 202 action, 203 }; 204 Ok((batched, commit.collection)) 205 } 206} 207 208#[derive(Debug, Default, Clone)] 209pub struct EventBatch<const LIMIT: usize> { 210 pub commits_by_nsid: HashMap<Nsid, CollectionCommits<LIMIT>>, 211 pub account_removes: Vec<DeleteAccount>, 212} 213 214impl<const LIMIT: usize> EventBatch<LIMIT> { 215 pub fn insert_commit_by_nsid( 216 &mut self, 217 collection: &Nsid, 218 commit: UFOsCommit, 219 max_collections: usize, 220 sketch_secret: &SketchSecretPrefix, 221 ) -> Result<(), BatchInsertError> { 222 let map = &mut self.commits_by_nsid; 223 if !map.contains_key(collection) && map.len() >= max_collections { 224 return Err(BatchInsertError::BatchFull(commit)); 225 } 226 map.entry(collection.clone()) 227 .or_default() 228 .truncating_insert(commit, sketch_secret)?; 229 Ok(()) 230 } 231 pub fn total_collections(&self) -> usize { 232 self.commits_by_nsid.len() 233 } 234 pub fn account_removes(&self) -> usize { 235 self.account_removes.len() 236 } 237 pub fn estimate_dids(&self) -> usize { 238 let mut estimator = Sketch::<14>::default(); 239 for commits in self.commits_by_nsid.values() { 240 estimator.merge(&commits.dids_estimate); 241 } 242 estimator.estimate() 243 } 244 pub fn latest_cursor(&self) -> Option<Cursor> { 245 let mut oldest = Cursor::from_start(); 246 for commits in self.commits_by_nsid.values() { 247 for commit in &commits.commits { 248 if commit.cursor > oldest { 249 oldest = commit.cursor; 250 } 251 } 252 } 253 if let Some(del) = self.account_removes.last() { 254 if del.cursor > oldest { 255 oldest = del.cursor; 256 } 257 } 258 if oldest > Cursor::from_start() { 259 Some(oldest) 260 } else { 261 None 262 } 263 } 264 pub fn is_empty(&self) -> bool { 265 self.commits_by_nsid.is_empty() && self.account_removes.is_empty() 266 } 267} 268 269#[derive(Debug, Serialize, JsonSchema)] 270#[serde(rename_all = "camelCase")] 271pub enum ConsumerInfo { 272 Jetstream { 273 endpoint: String, 274 started_at: u64, 275 latest_cursor: Option<u64>, 276 rollup_cursor: Option<u64>, 277 }, 278} 279 280#[derive(Debug, PartialEq, Serialize, JsonSchema)] 281pub struct NsidCount { 282 nsid: String, 283 creates: u64, 284 updates: u64, 285 deletes: u64, 286 dids_estimate: u64, 287} 288impl NsidCount { 289 pub fn new(nsid: &Nsid, counts: &CountsValue) -> Self { 290 let crud = counts.counts(); 291 Self { 292 nsid: nsid.to_string(), 293 creates: crud.creates, 294 updates: crud.updates, 295 deletes: crud.deletes, 296 dids_estimate: counts.dids().estimate() as u64, 297 } 298 } 299} 300 301#[derive(Debug, PartialEq, Serialize, JsonSchema)] 302pub struct PrefixCount { 303 prefix: String, 304 creates: u64, 305 updates: u64, 306 deletes: u64, 307 dids_estimate: u64, 308} 309impl PrefixCount { 310 pub fn new(prefix: &str, counts: &CountsValue) -> Self { 311 let crud = counts.counts(); 312 Self { 313 prefix: prefix.to_string(), 314 creates: crud.creates, 315 updates: crud.updates, 316 deletes: crud.deletes, 317 dids_estimate: counts.dids().estimate() as u64, 318 } 319 } 320} 321 322#[derive(Debug, PartialEq, Serialize, JsonSchema)] 323#[serde(tag = "type", rename_all = "camelCase")] 324pub enum PrefixChild { 325 Collection(NsidCount), 326 Prefix(PrefixCount), 327} 328 329#[derive(Debug, Serialize, JsonSchema)] 330pub struct NsidPrefix(String); 331impl NsidPrefix { 332 /// Input must not include a trailing dot. 333 pub fn new(pre: &str) -> EncodingResult<Self> { 334 // it's a valid prefix if appending `.name` makes it a valid NSID 335 Nsid::new(format!("{pre}.name")).map_err(EncodingError::BadAtriumStringType)?; 336 // hack (shouldn't really be here): reject prefixes that aren't at least 2 segments long 337 if !pre.contains('.') { 338 return Err(EncodingError::NotEnoughNsidSegments); 339 } 340 Ok(Self(pre.to_string())) 341 } 342 pub fn is_group_of(&self, other: &Nsid) -> bool { 343 assert!( 344 other.as_str().starts_with(&self.0), 345 "must be a prefix of other" 346 ); 347 self.0 == other.domain_authority() 348 } 349 /// The prefix as initialized (no trailing dot) 350 pub fn as_str(&self) -> &str { 351 self.0.as_str() 352 } 353 /// The prefix with a trailing `.` appended to avoid matching a longer segment 354 pub fn terminated(&self) -> String { 355 format!("{}.", self.0) 356 } 357} 358 359#[derive(Debug, Serialize, JsonSchema)] 360pub struct JustCount { 361 creates: u64, 362 updates: u64, 363 deletes: u64, 364 dids_estimate: u64, 365} 366 367#[derive(Debug)] 368pub enum OrderCollectionsBy { 369 Lexi { cursor: Option<Vec<u8>> }, 370 RecordsCreated, 371 DidsEstimate, 372} 373impl Default for OrderCollectionsBy { 374 fn default() -> Self { 375 Self::Lexi { cursor: None } 376 } 377} 378 379#[cfg(test)] 380mod tests { 381 use super::*; 382 383 #[test] 384 fn test_truncating_insert_truncates() -> anyhow::Result<()> { 385 let mut commits: CollectionCommits<2> = Default::default(); 386 387 commits.truncating_insert( 388 UFOsCommit { 389 cursor: Cursor::from_raw_u64(100), 390 did: Did::new("did:plc:whatever".to_string()).unwrap(), 391 rkey: RecordKey::new("rkey-asdf-a".to_string()).unwrap(), 392 rev: "rev-asdf".to_string(), 393 action: CommitAction::Put(PutAction { 394 record: RawValue::from_string("{}".to_string())?, 395 is_update: false, 396 }), 397 }, 398 &[0u8; 16], 399 )?; 400 401 commits.truncating_insert( 402 UFOsCommit { 403 cursor: Cursor::from_raw_u64(101), 404 did: Did::new("did:plc:whatever".to_string()).unwrap(), 405 rkey: RecordKey::new("rkey-asdf-b".to_string()).unwrap(), 406 rev: "rev-asdg".to_string(), 407 action: CommitAction::Put(PutAction { 408 record: RawValue::from_string("{}".to_string())?, 409 is_update: false, 410 }), 411 }, 412 &[0u8; 16], 413 )?; 414 415 commits.truncating_insert( 416 UFOsCommit { 417 cursor: Cursor::from_raw_u64(102), 418 did: Did::new("did:plc:whatever".to_string()).unwrap(), 419 rkey: RecordKey::new("rkey-asdf-c".to_string()).unwrap(), 420 rev: "rev-asdh".to_string(), 421 action: CommitAction::Put(PutAction { 422 record: RawValue::from_string("{}".to_string())?, 423 is_update: false, 424 }), 425 }, 426 &[0u8; 16], 427 )?; 428 429 assert_eq!(commits.creates, 3); 430 assert_eq!(commits.dids_estimate.estimate(), 1); 431 assert_eq!(commits.commits.len(), 2); 432 433 let mut found_first = false; 434 let mut found_last = false; 435 for commit in commits.commits { 436 match commit.rev.as_ref() { 437 "rev-asdf" => { 438 found_first = true; 439 } 440 "rev-asdh" => { 441 found_last = true; 442 } 443 _ => {} 444 } 445 } 446 assert!(!found_first); 447 assert!(found_last); 448 449 Ok(()) 450 } 451 452 #[test] 453 fn test_truncating_insert_counts_updates() -> anyhow::Result<()> { 454 let mut commits: CollectionCommits<2> = Default::default(); 455 456 commits.truncating_insert( 457 UFOsCommit { 458 cursor: Cursor::from_raw_u64(100), 459 did: Did::new("did:plc:whatever".to_string()).unwrap(), 460 rkey: RecordKey::new("rkey-asdf-a".to_string()).unwrap(), 461 rev: "rev-asdf".to_string(), 462 action: CommitAction::Put(PutAction { 463 record: RawValue::from_string("{}".to_string())?, 464 is_update: true, 465 }), 466 }, 467 &[0u8; 16], 468 )?; 469 470 assert_eq!(commits.creates, 0); 471 assert_eq!(commits.updates, 1); 472 assert_eq!(commits.deletes, 0); 473 assert_eq!(commits.dids_estimate.estimate(), 1); 474 assert_eq!(commits.commits.len(), 1); 475 Ok(()) 476 } 477 478 #[test] 479 fn test_truncating_insert_does_not_truncate_deletes() -> anyhow::Result<()> { 480 let mut commits: CollectionCommits<2> = Default::default(); 481 482 commits.truncating_insert( 483 UFOsCommit { 484 cursor: Cursor::from_raw_u64(100), 485 did: Did::new("did:plc:whatever".to_string()).unwrap(), 486 rkey: RecordKey::new("rkey-asdf-a".to_string()).unwrap(), 487 rev: "rev-asdf".to_string(), 488 action: CommitAction::Cut, 489 }, 490 &[0u8; 16], 491 )?; 492 493 commits.truncating_insert( 494 UFOsCommit { 495 cursor: Cursor::from_raw_u64(101), 496 did: Did::new("did:plc:whatever".to_string()).unwrap(), 497 rkey: RecordKey::new("rkey-asdf-b".to_string()).unwrap(), 498 rev: "rev-asdg".to_string(), 499 action: CommitAction::Put(PutAction { 500 record: RawValue::from_string("{}".to_string())?, 501 is_update: false, 502 }), 503 }, 504 &[0u8; 16], 505 )?; 506 507 commits.truncating_insert( 508 UFOsCommit { 509 cursor: Cursor::from_raw_u64(102), 510 did: Did::new("did:plc:whatever".to_string()).unwrap(), 511 rkey: RecordKey::new("rkey-asdf-c".to_string()).unwrap(), 512 rev: "rev-asdh".to_string(), 513 action: CommitAction::Put(PutAction { 514 record: RawValue::from_string("{}".to_string())?, 515 is_update: false, 516 }), 517 }, 518 &[0u8; 16], 519 )?; 520 521 assert_eq!(commits.creates, 2); 522 assert_eq!(commits.deletes, 1); 523 assert_eq!(commits.dids_estimate.estimate(), 1); 524 assert_eq!(commits.commits.len(), 2); 525 526 let mut found_first = false; 527 let mut found_last = false; 528 let mut found_delete = false; 529 for commit in commits.commits { 530 match commit.rev.as_ref() { 531 "rev-asdg" => { 532 found_first = true; 533 } 534 "rev-asdh" => { 535 found_last = true; 536 } 537 _ => {} 538 } 539 if let CommitAction::Cut = commit.action { 540 found_delete = true; 541 } 542 } 543 assert!(!found_first); 544 assert!(found_last); 545 assert!(found_delete); 546 547 Ok(()) 548 } 549 550 #[test] 551 fn test_truncating_insert_maxes_out_deletes() -> anyhow::Result<()> { 552 let mut commits: CollectionCommits<2> = Default::default(); 553 554 commits 555 .truncating_insert( 556 UFOsCommit { 557 cursor: Cursor::from_raw_u64(100), 558 did: Did::new("did:plc:whatever".to_string()).unwrap(), 559 rkey: RecordKey::new("rkey-asdf-a".to_string()).unwrap(), 560 rev: "rev-asdf".to_string(), 561 action: CommitAction::Cut, 562 }, 563 &[0u8; 16], 564 ) 565 .unwrap(); 566 567 // this create will just be discarded 568 commits 569 .truncating_insert( 570 UFOsCommit { 571 cursor: Cursor::from_raw_u64(80), 572 did: Did::new("did:plc:whatever".to_string()).unwrap(), 573 rkey: RecordKey::new("rkey-asdf-zzz".to_string()).unwrap(), 574 rev: "rev-asdzzz".to_string(), 575 action: CommitAction::Put(PutAction { 576 record: RawValue::from_string("{}".to_string())?, 577 is_update: false, 578 }), 579 }, 580 &[0u8; 16], 581 ) 582 .unwrap(); 583 584 commits 585 .truncating_insert( 586 UFOsCommit { 587 cursor: Cursor::from_raw_u64(101), 588 did: Did::new("did:plc:whatever".to_string()).unwrap(), 589 rkey: RecordKey::new("rkey-asdf-b".to_string()).unwrap(), 590 rev: "rev-asdg".to_string(), 591 action: CommitAction::Cut, 592 }, 593 &[0u8; 16], 594 ) 595 .unwrap(); 596 597 let res = commits.truncating_insert( 598 UFOsCommit { 599 cursor: Cursor::from_raw_u64(102), 600 did: Did::new("did:plc:whatever".to_string()).unwrap(), 601 rkey: RecordKey::new("rkey-asdf-c".to_string()).unwrap(), 602 rev: "rev-asdh".to_string(), 603 action: CommitAction::Cut, 604 }, 605 &[0u8; 16], 606 ); 607 608 assert!(res.is_err()); 609 let overflowed = match res { 610 Err(BatchInsertError::BatchFull(c)) => c, 611 e => panic!("expected overflow but a different error happened: {e:?}"), 612 }; 613 assert_eq!(overflowed.rev, "rev-asdh"); 614 615 Ok(()) 616 } 617}