forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1pub mod consumer;
2pub mod db_types;
3pub mod error;
4pub mod file_consumer;
5pub mod index_html;
6pub mod server;
7pub mod storage;
8pub mod storage_fjall;
9pub mod store_types;
10
11use crate::db_types::{EncodingError, EncodingResult};
12use crate::error::BatchInsertError;
13use crate::store_types::{CountsValue, SketchSecretPrefix};
14use cardinality_estimator_safe::{Element, Sketch};
15use error::FirehoseEventError;
16use jetstream::events::{CommitEvent, CommitOp, Cursor};
17use jetstream::exports::{Did, Nsid, RecordKey};
18use schemars::JsonSchema;
19use serde::Serialize;
20use serde_json::value::RawValue;
21use sha2::Sha256;
22use std::collections::HashMap;
23use std::time::Duration;
24
25fn did_element(sketch_secret: &SketchSecretPrefix, did: &Did) -> Element<14> {
26 Element::from_digest_with_prefix::<Sha256>(sketch_secret, did.as_bytes())
27}
28
29pub fn nice_duration(dt: Duration) -> String {
30 let secs = dt.as_secs_f64();
31 if secs < 1. {
32 return format!("{:.0}ms", secs * 1000.);
33 }
34 if secs < 60. {
35 return format!("{secs:.02}s");
36 }
37 let mins = (secs / 60.).floor();
38 let rsecs = secs - (mins * 60.);
39 if mins < 60. {
40 return format!("{mins:.0}m{rsecs:.0}s");
41 }
42 let hrs = (mins / 60.).floor();
43 let rmins = mins - (hrs * 60.);
44 if hrs < 24. {
45 return format!("{hrs:.0}h{rmins:.0}m{rsecs:.0}s");
46 }
47 let days = (hrs / 24.).floor();
48 let rhrs = hrs - (days * 24.);
49 format!("{days:.0}d{rhrs:.0}h{rmins:.0}m{rsecs:.0}s")
50}
51
52#[derive(Debug, Default, Clone)]
53pub struct CollectionCommits<const LIMIT: usize> {
54 pub creates: usize,
55 pub updates: usize,
56 pub deletes: usize,
57 pub dids_estimate: Sketch<14>,
58 pub commits: Vec<UFOsCommit>,
59 head: usize,
60}
61
62impl<const LIMIT: usize> CollectionCommits<LIMIT> {
63 fn advance_head(&mut self) {
64 self.head += 1;
65 if self.head > LIMIT {
66 self.head = 0;
67 }
68 }
69 /// lossy-ish commit insertion
70 ///
71 /// - new commits are *always* added to the batch or else rejected as full.
72 /// - when LIMIT is reached, new commits can displace existing `creates`.
73 /// `update`s and `delete`s are *never* displaced.
74 /// - if all batched `creates` have been displaced, the batch is full.
75 ///
76 /// in general it's rare for commits to be displaced except for very high-
77 /// volume collections such as `app.bsky.feed.like`.
78 ///
79 /// it could be nice in the future to retain all batched commits and just
80 /// drop new `creates` after a limit instead.
81 pub fn truncating_insert(
82 &mut self,
83 commit: UFOsCommit,
84 sketch_secret: &SketchSecretPrefix,
85 ) -> Result<(), BatchInsertError> {
86 if (self.updates + self.deletes) == LIMIT {
87 // nothing can be displaced (only `create`s may be displaced)
88 return Err(BatchInsertError::BatchFull(commit));
89 }
90
91 // every kind of commit counts as "user activity"
92 self.dids_estimate
93 .insert(did_element(sketch_secret, &commit.did));
94
95 match commit.action {
96 CommitAction::Put(PutAction {
97 is_update: false, ..
98 }) => {
99 self.creates += 1;
100 }
101 CommitAction::Put(PutAction {
102 is_update: true, ..
103 }) => {
104 self.updates += 1;
105 }
106 CommitAction::Cut => {
107 self.deletes += 1;
108 }
109 }
110
111 if self.commits.len() < LIMIT {
112 // normal insert: there's space left to put a new commit at the end
113 self.commits.push(commit);
114 } else {
115 // displacement insert: find an old `create` we can displace
116 let head_started_at = self.head;
117 loop {
118 let candidate = self
119 .commits
120 .get_mut(self.head)
121 .ok_or(BatchInsertError::BatchOverflow(self.head))?;
122 if candidate.action.is_create() {
123 *candidate = commit;
124 break;
125 }
126 self.advance_head();
127 if self.head == head_started_at {
128 return Err(BatchInsertError::BatchForever);
129 }
130 }
131 }
132
133 Ok(())
134 }
135}
136
137#[derive(Debug, Clone)]
138pub struct DeleteAccount {
139 pub did: Did,
140 pub cursor: Cursor,
141}
142
143#[derive(Debug, Clone)]
144pub enum CommitAction {
145 Put(PutAction),
146 Cut,
147}
148impl CommitAction {
149 pub fn is_create(&self) -> bool {
150 match self {
151 CommitAction::Put(PutAction { is_update, .. }) => !is_update,
152 CommitAction::Cut => false,
153 }
154 }
155}
156
157#[derive(Debug, Clone)]
158pub struct PutAction {
159 record: Box<RawValue>,
160 is_update: bool,
161}
162
163#[derive(Debug, Clone)]
164pub struct UFOsCommit {
165 cursor: Cursor,
166 did: Did,
167 rkey: RecordKey,
168 rev: String,
169 action: CommitAction,
170}
171
172#[derive(Debug, Clone, Serialize)]
173pub struct UFOsRecord {
174 pub cursor: Cursor,
175 pub did: Did,
176 pub collection: Nsid,
177 pub rkey: RecordKey,
178 pub rev: String,
179 // TODO: cid?
180 pub record: Box<RawValue>,
181 pub is_update: bool,
182}
183
184impl UFOsCommit {
185 pub fn from_commit_info(
186 commit: CommitEvent,
187 did: Did,
188 cursor: Cursor,
189 ) -> Result<(Self, Nsid), FirehoseEventError> {
190 let action = match commit.operation {
191 CommitOp::Delete => CommitAction::Cut,
192 cru => CommitAction::Put(PutAction {
193 record: commit.record.ok_or(FirehoseEventError::CruMissingRecord)?,
194 is_update: cru == CommitOp::Update,
195 }),
196 };
197 let batched = Self {
198 cursor,
199 did,
200 rkey: commit.rkey,
201 rev: commit.rev,
202 action,
203 };
204 Ok((batched, commit.collection))
205 }
206}
207
208#[derive(Debug, Default, Clone)]
209pub struct EventBatch<const LIMIT: usize> {
210 pub commits_by_nsid: HashMap<Nsid, CollectionCommits<LIMIT>>,
211 pub account_removes: Vec<DeleteAccount>,
212}
213
214impl<const LIMIT: usize> EventBatch<LIMIT> {
215 pub fn insert_commit_by_nsid(
216 &mut self,
217 collection: &Nsid,
218 commit: UFOsCommit,
219 max_collections: usize,
220 sketch_secret: &SketchSecretPrefix,
221 ) -> Result<(), BatchInsertError> {
222 let map = &mut self.commits_by_nsid;
223 if !map.contains_key(collection) && map.len() >= max_collections {
224 return Err(BatchInsertError::BatchFull(commit));
225 }
226 map.entry(collection.clone())
227 .or_default()
228 .truncating_insert(commit, sketch_secret)?;
229 Ok(())
230 }
231 pub fn total_collections(&self) -> usize {
232 self.commits_by_nsid.len()
233 }
234 pub fn account_removes(&self) -> usize {
235 self.account_removes.len()
236 }
237 pub fn estimate_dids(&self) -> usize {
238 let mut estimator = Sketch::<14>::default();
239 for commits in self.commits_by_nsid.values() {
240 estimator.merge(&commits.dids_estimate);
241 }
242 estimator.estimate()
243 }
244 pub fn latest_cursor(&self) -> Option<Cursor> {
245 let mut oldest = Cursor::from_start();
246 for commits in self.commits_by_nsid.values() {
247 for commit in &commits.commits {
248 if commit.cursor > oldest {
249 oldest = commit.cursor;
250 }
251 }
252 }
253 if let Some(del) = self.account_removes.last() {
254 if del.cursor > oldest {
255 oldest = del.cursor;
256 }
257 }
258 if oldest > Cursor::from_start() {
259 Some(oldest)
260 } else {
261 None
262 }
263 }
264 pub fn is_empty(&self) -> bool {
265 self.commits_by_nsid.is_empty() && self.account_removes.is_empty()
266 }
267}
268
269#[derive(Debug, Serialize, JsonSchema)]
270#[serde(rename_all = "camelCase")]
271pub enum ConsumerInfo {
272 Jetstream {
273 endpoint: String,
274 started_at: u64,
275 latest_cursor: Option<u64>,
276 rollup_cursor: Option<u64>,
277 },
278}
279
280#[derive(Debug, PartialEq, Serialize, JsonSchema)]
281pub struct NsidCount {
282 nsid: String,
283 creates: u64,
284 updates: u64,
285 deletes: u64,
286 dids_estimate: u64,
287}
288impl NsidCount {
289 pub fn new(nsid: &Nsid, counts: &CountsValue) -> Self {
290 let crud = counts.counts();
291 Self {
292 nsid: nsid.to_string(),
293 creates: crud.creates,
294 updates: crud.updates,
295 deletes: crud.deletes,
296 dids_estimate: counts.dids().estimate() as u64,
297 }
298 }
299}
300
301#[derive(Debug, PartialEq, Serialize, JsonSchema)]
302pub struct PrefixCount {
303 prefix: String,
304 creates: u64,
305 updates: u64,
306 deletes: u64,
307 dids_estimate: u64,
308}
309impl PrefixCount {
310 pub fn new(prefix: &str, counts: &CountsValue) -> Self {
311 let crud = counts.counts();
312 Self {
313 prefix: prefix.to_string(),
314 creates: crud.creates,
315 updates: crud.updates,
316 deletes: crud.deletes,
317 dids_estimate: counts.dids().estimate() as u64,
318 }
319 }
320}
321
322#[derive(Debug, PartialEq, Serialize, JsonSchema)]
323#[serde(tag = "type", rename_all = "camelCase")]
324pub enum PrefixChild {
325 Collection(NsidCount),
326 Prefix(PrefixCount),
327}
328
329#[derive(Debug, Serialize, JsonSchema)]
330pub struct NsidPrefix(String);
331impl NsidPrefix {
332 /// Input must not include a trailing dot.
333 pub fn new(pre: &str) -> EncodingResult<Self> {
334 // it's a valid prefix if appending `.name` makes it a valid NSID
335 Nsid::new(format!("{pre}.name")).map_err(EncodingError::BadAtriumStringType)?;
336 // hack (shouldn't really be here): reject prefixes that aren't at least 2 segments long
337 if !pre.contains('.') {
338 return Err(EncodingError::NotEnoughNsidSegments);
339 }
340 Ok(Self(pre.to_string()))
341 }
342 pub fn is_group_of(&self, other: &Nsid) -> bool {
343 assert!(
344 other.as_str().starts_with(&self.0),
345 "must be a prefix of other"
346 );
347 self.0 == other.domain_authority()
348 }
349 /// The prefix as initialized (no trailing dot)
350 pub fn as_str(&self) -> &str {
351 self.0.as_str()
352 }
353 /// The prefix with a trailing `.` appended to avoid matching a longer segment
354 pub fn terminated(&self) -> String {
355 format!("{}.", self.0)
356 }
357}
358
359#[derive(Debug, Serialize, JsonSchema)]
360pub struct JustCount {
361 creates: u64,
362 updates: u64,
363 deletes: u64,
364 dids_estimate: u64,
365}
366
367#[derive(Debug)]
368pub enum OrderCollectionsBy {
369 Lexi { cursor: Option<Vec<u8>> },
370 RecordsCreated,
371 DidsEstimate,
372}
373impl Default for OrderCollectionsBy {
374 fn default() -> Self {
375 Self::Lexi { cursor: None }
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382
383 #[test]
384 fn test_truncating_insert_truncates() -> anyhow::Result<()> {
385 let mut commits: CollectionCommits<2> = Default::default();
386
387 commits.truncating_insert(
388 UFOsCommit {
389 cursor: Cursor::from_raw_u64(100),
390 did: Did::new("did:plc:whatever".to_string()).unwrap(),
391 rkey: RecordKey::new("rkey-asdf-a".to_string()).unwrap(),
392 rev: "rev-asdf".to_string(),
393 action: CommitAction::Put(PutAction {
394 record: RawValue::from_string("{}".to_string())?,
395 is_update: false,
396 }),
397 },
398 &[0u8; 16],
399 )?;
400
401 commits.truncating_insert(
402 UFOsCommit {
403 cursor: Cursor::from_raw_u64(101),
404 did: Did::new("did:plc:whatever".to_string()).unwrap(),
405 rkey: RecordKey::new("rkey-asdf-b".to_string()).unwrap(),
406 rev: "rev-asdg".to_string(),
407 action: CommitAction::Put(PutAction {
408 record: RawValue::from_string("{}".to_string())?,
409 is_update: false,
410 }),
411 },
412 &[0u8; 16],
413 )?;
414
415 commits.truncating_insert(
416 UFOsCommit {
417 cursor: Cursor::from_raw_u64(102),
418 did: Did::new("did:plc:whatever".to_string()).unwrap(),
419 rkey: RecordKey::new("rkey-asdf-c".to_string()).unwrap(),
420 rev: "rev-asdh".to_string(),
421 action: CommitAction::Put(PutAction {
422 record: RawValue::from_string("{}".to_string())?,
423 is_update: false,
424 }),
425 },
426 &[0u8; 16],
427 )?;
428
429 assert_eq!(commits.creates, 3);
430 assert_eq!(commits.dids_estimate.estimate(), 1);
431 assert_eq!(commits.commits.len(), 2);
432
433 let mut found_first = false;
434 let mut found_last = false;
435 for commit in commits.commits {
436 match commit.rev.as_ref() {
437 "rev-asdf" => {
438 found_first = true;
439 }
440 "rev-asdh" => {
441 found_last = true;
442 }
443 _ => {}
444 }
445 }
446 assert!(!found_first);
447 assert!(found_last);
448
449 Ok(())
450 }
451
452 #[test]
453 fn test_truncating_insert_counts_updates() -> anyhow::Result<()> {
454 let mut commits: CollectionCommits<2> = Default::default();
455
456 commits.truncating_insert(
457 UFOsCommit {
458 cursor: Cursor::from_raw_u64(100),
459 did: Did::new("did:plc:whatever".to_string()).unwrap(),
460 rkey: RecordKey::new("rkey-asdf-a".to_string()).unwrap(),
461 rev: "rev-asdf".to_string(),
462 action: CommitAction::Put(PutAction {
463 record: RawValue::from_string("{}".to_string())?,
464 is_update: true,
465 }),
466 },
467 &[0u8; 16],
468 )?;
469
470 assert_eq!(commits.creates, 0);
471 assert_eq!(commits.updates, 1);
472 assert_eq!(commits.deletes, 0);
473 assert_eq!(commits.dids_estimate.estimate(), 1);
474 assert_eq!(commits.commits.len(), 1);
475 Ok(())
476 }
477
478 #[test]
479 fn test_truncating_insert_does_not_truncate_deletes() -> anyhow::Result<()> {
480 let mut commits: CollectionCommits<2> = Default::default();
481
482 commits.truncating_insert(
483 UFOsCommit {
484 cursor: Cursor::from_raw_u64(100),
485 did: Did::new("did:plc:whatever".to_string()).unwrap(),
486 rkey: RecordKey::new("rkey-asdf-a".to_string()).unwrap(),
487 rev: "rev-asdf".to_string(),
488 action: CommitAction::Cut,
489 },
490 &[0u8; 16],
491 )?;
492
493 commits.truncating_insert(
494 UFOsCommit {
495 cursor: Cursor::from_raw_u64(101),
496 did: Did::new("did:plc:whatever".to_string()).unwrap(),
497 rkey: RecordKey::new("rkey-asdf-b".to_string()).unwrap(),
498 rev: "rev-asdg".to_string(),
499 action: CommitAction::Put(PutAction {
500 record: RawValue::from_string("{}".to_string())?,
501 is_update: false,
502 }),
503 },
504 &[0u8; 16],
505 )?;
506
507 commits.truncating_insert(
508 UFOsCommit {
509 cursor: Cursor::from_raw_u64(102),
510 did: Did::new("did:plc:whatever".to_string()).unwrap(),
511 rkey: RecordKey::new("rkey-asdf-c".to_string()).unwrap(),
512 rev: "rev-asdh".to_string(),
513 action: CommitAction::Put(PutAction {
514 record: RawValue::from_string("{}".to_string())?,
515 is_update: false,
516 }),
517 },
518 &[0u8; 16],
519 )?;
520
521 assert_eq!(commits.creates, 2);
522 assert_eq!(commits.deletes, 1);
523 assert_eq!(commits.dids_estimate.estimate(), 1);
524 assert_eq!(commits.commits.len(), 2);
525
526 let mut found_first = false;
527 let mut found_last = false;
528 let mut found_delete = false;
529 for commit in commits.commits {
530 match commit.rev.as_ref() {
531 "rev-asdg" => {
532 found_first = true;
533 }
534 "rev-asdh" => {
535 found_last = true;
536 }
537 _ => {}
538 }
539 if let CommitAction::Cut = commit.action {
540 found_delete = true;
541 }
542 }
543 assert!(!found_first);
544 assert!(found_last);
545 assert!(found_delete);
546
547 Ok(())
548 }
549
550 #[test]
551 fn test_truncating_insert_maxes_out_deletes() -> anyhow::Result<()> {
552 let mut commits: CollectionCommits<2> = Default::default();
553
554 commits
555 .truncating_insert(
556 UFOsCommit {
557 cursor: Cursor::from_raw_u64(100),
558 did: Did::new("did:plc:whatever".to_string()).unwrap(),
559 rkey: RecordKey::new("rkey-asdf-a".to_string()).unwrap(),
560 rev: "rev-asdf".to_string(),
561 action: CommitAction::Cut,
562 },
563 &[0u8; 16],
564 )
565 .unwrap();
566
567 // this create will just be discarded
568 commits
569 .truncating_insert(
570 UFOsCommit {
571 cursor: Cursor::from_raw_u64(80),
572 did: Did::new("did:plc:whatever".to_string()).unwrap(),
573 rkey: RecordKey::new("rkey-asdf-zzz".to_string()).unwrap(),
574 rev: "rev-asdzzz".to_string(),
575 action: CommitAction::Put(PutAction {
576 record: RawValue::from_string("{}".to_string())?,
577 is_update: false,
578 }),
579 },
580 &[0u8; 16],
581 )
582 .unwrap();
583
584 commits
585 .truncating_insert(
586 UFOsCommit {
587 cursor: Cursor::from_raw_u64(101),
588 did: Did::new("did:plc:whatever".to_string()).unwrap(),
589 rkey: RecordKey::new("rkey-asdf-b".to_string()).unwrap(),
590 rev: "rev-asdg".to_string(),
591 action: CommitAction::Cut,
592 },
593 &[0u8; 16],
594 )
595 .unwrap();
596
597 let res = commits.truncating_insert(
598 UFOsCommit {
599 cursor: Cursor::from_raw_u64(102),
600 did: Did::new("did:plc:whatever".to_string()).unwrap(),
601 rkey: RecordKey::new("rkey-asdf-c".to_string()).unwrap(),
602 rev: "rev-asdh".to_string(),
603 action: CommitAction::Cut,
604 },
605 &[0u8; 16],
606 );
607
608 assert!(res.is_err());
609 let overflowed = match res {
610 Err(BatchInsertError::BatchFull(c)) => c,
611 e => panic!("expected overflow but a different error happened: {e:?}"),
612 };
613 assert_eq!(overflowed.rev, "rev-asdh");
614
615 Ok(())
616 }
617}