forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use std::time::{
2 Duration,
3 SystemTime,
4 SystemTimeError,
5 UNIX_EPOCH,
6};
7
8use chrono::Utc;
9use serde::{
10 Deserialize,
11 Serialize,
12};
13use serde_json::value::RawValue;
14
15use crate::exports;
16
17/// Opaque wrapper for the time_us cursor used by jetstream
18#[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq, PartialOrd)]
19pub struct Cursor(u64);
20
21#[derive(Debug, Deserialize)]
22#[serde(rename_all = "snake_case")]
23pub struct JetstreamEvent {
24 #[serde(rename = "time_us")]
25 pub cursor: Cursor,
26 pub did: exports::Did,
27 pub kind: EventKind,
28 pub commit: Option<CommitEvent>,
29 pub identity: Option<IdentityEvent>,
30 pub account: Option<AccountEvent>,
31}
32
33#[derive(Debug, Deserialize, PartialEq)]
34#[serde(rename_all = "snake_case")]
35pub enum EventKind {
36 Commit,
37 Identity,
38 Account,
39}
40
41#[derive(Debug, Deserialize)]
42#[serde(rename_all = "snake_case")]
43pub struct CommitEvent {
44 pub collection: exports::Nsid,
45 pub rkey: exports::RecordKey,
46 pub rev: String,
47 pub operation: CommitOp,
48 pub record: Option<Box<RawValue>>,
49 pub cid: Option<exports::Cid>,
50}
51
52#[derive(Debug, Deserialize, PartialEq)]
53#[serde(rename_all = "snake_case")]
54pub enum CommitOp {
55 Create,
56 Update,
57 Delete,
58}
59
60#[derive(Debug, Deserialize, PartialEq)]
61pub struct IdentityEvent {
62 pub did: exports::Did,
63 pub handle: Option<exports::Handle>,
64 pub seq: u64,
65 pub time: chrono::DateTime<Utc>,
66}
67
68#[derive(Debug, Deserialize, PartialEq)]
69pub struct AccountEvent {
70 pub active: bool,
71 pub did: exports::Did,
72 pub seq: u64,
73 pub time: chrono::DateTime<Utc>,
74 pub status: Option<String>,
75}
76
77impl Cursor {
78 /// Get a cursor that will consume all available jetstream replay
79 ///
80 /// This sets the cursor to zero.
81 ///
82 /// Jetstream instances typically only have a few days of replay.
83 pub fn from_start() -> Self {
84 Self(0)
85 }
86 /// Get a cursor for a specific time
87 ///
88 /// Panics: if t is older than the unix epoch: Jan 1, 1970.
89 ///
90 /// If you want to receive all available jetstream replay (typically a few days), use
91 /// .from_start()
92 ///
93 /// Warning: this exploits the internal implementation detail of jetstream cursors
94 /// being ~microsecond timestamps.
95 pub fn at(t: impl Into<SystemTime>) -> Self {
96 let unix_dt = t
97 .into()
98 .duration_since(UNIX_EPOCH)
99 .expect("cannot set jetstream cursor earlier than unix epoch");
100 Self(unix_dt.as_micros() as u64)
101 }
102 /// Get a cursor rewound from now by this amount
103 ///
104 /// Panics: if d is greater than the time since the unix epoch: Jan 1, 1970.
105 ///
106 /// Jetstream instances typically only have a few days of replay.
107 ///
108 /// Warning: this exploits the internal implementation detail of jetstream cursors
109 /// being ~microsecond timestamps.
110 pub fn back_by(d: Duration) -> Self {
111 Self::at(SystemTime::now() - d)
112 }
113 /// Get a Cursor from a raw u64
114 ///
115 /// For example, from a jetstream event's `time_us` field.
116 pub fn from_raw_u64(time_us: u64) -> Self {
117 Self(time_us)
118 }
119 /// Get the raw u64 value from this cursor.
120 pub fn to_raw_u64(&self) -> u64 {
121 self.0
122 }
123 /// Format the cursor value for use in a jetstream connection url querystring
124 pub fn to_jetstream(&self) -> String {
125 self.0.to_string()
126 }
127 /// Compute the time span since an earlier cursor or [SystemTime]
128 ///
129 /// Warning: this exploits the internal implementation detail of jetstream cursors
130 /// being ~microsecond timestamps.
131 pub fn duration_since(
132 &self,
133 earlier: impl Into<SystemTime>,
134 ) -> Result<Duration, SystemTimeError> {
135 let t: SystemTime = self.into();
136 t.duration_since(earlier.into())
137 }
138 /// Compute the age of the cursor vs the local clock
139 ///
140 /// Warning: this exploits the internal implementation detail of jetstream cursors
141 pub fn elapsed(&self) -> Result<Duration, SystemTimeError> {
142 let t: SystemTime = self.into();
143 t.elapsed()
144 }
145 /// Compute the age of the cursor vs the local clock
146 ///
147 /// Converts the resulting duration into an f64, which can be negative!
148 ///
149 /// Warning: this exploits the internal implementation detail of jetstream cursors
150 pub fn elapsed_micros_f64(&self) -> f64 {
151 match self.elapsed() {
152 Ok(d) => d.as_micros() as f64,
153 Err(e) => -(e.duration().as_micros() as f64),
154 }
155 }
156 /// Get the immediate next cursor value
157 ///
158 /// This is possible for the implementation of jetstream cursors
159 pub fn next(&self) -> Cursor {
160 Self(self.0 + 1)
161 }
162}
163
164impl From<&Cursor> for SystemTime {
165 /// Convert a cursor directly to a [SystemTime]
166 ///
167 /// Warning: this exploits the internal implementation detail of jetstream cursors
168 /// being ~microsecond timestamps.
169 fn from(c: &Cursor) -> Self {
170 UNIX_EPOCH + Duration::from_micros(c.0)
171 }
172}
173
174#[cfg(test)]
175mod test {
176 use super::*;
177
178 #[test]
179 fn test_parse_commit_event() -> anyhow::Result<()> {
180 let json = r#"{
181 "rev":"3llrdsginou2i",
182 "operation":"create",
183 "collection":"app.bsky.feed.post",
184 "rkey":"3llrdsglqdc2s",
185 "cid": "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy",
186 "record": {"$type":"app.bsky.feed.post","createdAt":"2025-04-01T16:58:06.154Z","langs":["en"],"text":"I wish apirl 1st would stop existing lol"}
187 }"#;
188 let commit: CommitEvent = serde_json::from_str(json)?;
189 assert_eq!(
190 commit.cid.unwrap(),
191 "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy".parse()?
192 );
193 assert_eq!(
194 commit.record.unwrap().get(),
195 r#"{"$type":"app.bsky.feed.post","createdAt":"2025-04-01T16:58:06.154Z","langs":["en"],"text":"I wish apirl 1st would stop existing lol"}"#
196 );
197 Ok(())
198 }
199
200 #[test]
201 fn test_parse_whole_event() -> anyhow::Result<()> {
202 let json = r#"{"did":"did:plc:ai3dzf35cth7s3st7n7jsd7r","time_us":1743526687419798,"kind":"commit","commit":{"rev":"3llrdsginou2i","operation":"create","collection":"app.bsky.feed.post","rkey":"3llrdsglqdc2s","record":{"$type":"app.bsky.feed.post","createdAt":"2025-04-01T16:58:06.154Z","langs":["en"],"text":"I wish apirl 1st would stop existing lol"},"cid":"bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy"}}"#;
203 let event: JetstreamEvent = serde_json::from_str(json)?;
204 assert_eq!(event.kind, EventKind::Commit);
205 assert!(event.commit.is_some());
206 let commit = event.commit.unwrap();
207 assert_eq!(
208 commit.cid.unwrap(),
209 "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy".parse()?
210 );
211 assert_eq!(
212 commit.record.unwrap().get(),
213 r#"{"$type":"app.bsky.feed.post","createdAt":"2025-04-01T16:58:06.154Z","langs":["en"],"text":"I wish apirl 1st would stop existing lol"}"#
214 );
215 Ok(())
216 }
217}