Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
at pocket 7.2 kB view raw
1use std::time::{ 2 Duration, 3 SystemTime, 4 SystemTimeError, 5 UNIX_EPOCH, 6}; 7 8use chrono::Utc; 9use serde::{ 10 Deserialize, 11 Serialize, 12}; 13use serde_json::value::RawValue; 14 15use crate::exports; 16 17/// Opaque wrapper for the time_us cursor used by jetstream 18#[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq, PartialOrd)] 19pub struct Cursor(u64); 20 21#[derive(Debug, Deserialize)] 22#[serde(rename_all = "snake_case")] 23pub struct JetstreamEvent { 24 #[serde(rename = "time_us")] 25 pub cursor: Cursor, 26 pub did: exports::Did, 27 pub kind: EventKind, 28 pub commit: Option<CommitEvent>, 29 pub identity: Option<IdentityEvent>, 30 pub account: Option<AccountEvent>, 31} 32 33#[derive(Debug, Deserialize, PartialEq)] 34#[serde(rename_all = "snake_case")] 35pub enum EventKind { 36 Commit, 37 Identity, 38 Account, 39} 40 41#[derive(Debug, Deserialize)] 42#[serde(rename_all = "snake_case")] 43pub struct CommitEvent { 44 pub collection: exports::Nsid, 45 pub rkey: exports::RecordKey, 46 pub rev: String, 47 pub operation: CommitOp, 48 pub record: Option<Box<RawValue>>, 49 pub cid: Option<exports::Cid>, 50} 51 52#[derive(Debug, Deserialize, PartialEq)] 53#[serde(rename_all = "snake_case")] 54pub enum CommitOp { 55 Create, 56 Update, 57 Delete, 58} 59 60#[derive(Debug, Deserialize, PartialEq)] 61pub struct IdentityEvent { 62 pub did: exports::Did, 63 pub handle: Option<exports::Handle>, 64 pub seq: u64, 65 pub time: chrono::DateTime<Utc>, 66} 67 68#[derive(Debug, Deserialize, PartialEq)] 69pub struct AccountEvent { 70 pub active: bool, 71 pub did: exports::Did, 72 pub seq: u64, 73 pub time: chrono::DateTime<Utc>, 74 pub status: Option<String>, 75} 76 77impl Cursor { 78 /// Get a cursor that will consume all available jetstream replay 79 /// 80 /// This sets the cursor to zero. 81 /// 82 /// Jetstream instances typically only have a few days of replay. 83 pub fn from_start() -> Self { 84 Self(0) 85 } 86 /// Get a cursor for a specific time 87 /// 88 /// Panics: if t is older than the unix epoch: Jan 1, 1970. 89 /// 90 /// If you want to receive all available jetstream replay (typically a few days), use 91 /// .from_start() 92 /// 93 /// Warning: this exploits the internal implementation detail of jetstream cursors 94 /// being ~microsecond timestamps. 95 pub fn at(t: impl Into<SystemTime>) -> Self { 96 let unix_dt = t 97 .into() 98 .duration_since(UNIX_EPOCH) 99 .expect("cannot set jetstream cursor earlier than unix epoch"); 100 Self(unix_dt.as_micros() as u64) 101 } 102 /// Get a cursor rewound from now by this amount 103 /// 104 /// Panics: if d is greater than the time since the unix epoch: Jan 1, 1970. 105 /// 106 /// Jetstream instances typically only have a few days of replay. 107 /// 108 /// Warning: this exploits the internal implementation detail of jetstream cursors 109 /// being ~microsecond timestamps. 110 pub fn back_by(d: Duration) -> Self { 111 Self::at(SystemTime::now() - d) 112 } 113 /// Get a Cursor from a raw u64 114 /// 115 /// For example, from a jetstream event's `time_us` field. 116 pub fn from_raw_u64(time_us: u64) -> Self { 117 Self(time_us) 118 } 119 /// Get the raw u64 value from this cursor. 120 pub fn to_raw_u64(&self) -> u64 { 121 self.0 122 } 123 /// Format the cursor value for use in a jetstream connection url querystring 124 pub fn to_jetstream(&self) -> String { 125 self.0.to_string() 126 } 127 /// Compute the time span since an earlier cursor or [SystemTime] 128 /// 129 /// Warning: this exploits the internal implementation detail of jetstream cursors 130 /// being ~microsecond timestamps. 131 pub fn duration_since( 132 &self, 133 earlier: impl Into<SystemTime>, 134 ) -> Result<Duration, SystemTimeError> { 135 let t: SystemTime = self.into(); 136 t.duration_since(earlier.into()) 137 } 138 /// Compute the age of the cursor vs the local clock 139 /// 140 /// Warning: this exploits the internal implementation detail of jetstream cursors 141 pub fn elapsed(&self) -> Result<Duration, SystemTimeError> { 142 let t: SystemTime = self.into(); 143 t.elapsed() 144 } 145 /// Compute the age of the cursor vs the local clock 146 /// 147 /// Converts the resulting duration into an f64, which can be negative! 148 /// 149 /// Warning: this exploits the internal implementation detail of jetstream cursors 150 pub fn elapsed_micros_f64(&self) -> f64 { 151 match self.elapsed() { 152 Ok(d) => d.as_micros() as f64, 153 Err(e) => -(e.duration().as_micros() as f64), 154 } 155 } 156 /// Get the immediate next cursor value 157 /// 158 /// This is possible for the implementation of jetstream cursors 159 pub fn next(&self) -> Cursor { 160 Self(self.0 + 1) 161 } 162} 163 164impl From<&Cursor> for SystemTime { 165 /// Convert a cursor directly to a [SystemTime] 166 /// 167 /// Warning: this exploits the internal implementation detail of jetstream cursors 168 /// being ~microsecond timestamps. 169 fn from(c: &Cursor) -> Self { 170 UNIX_EPOCH + Duration::from_micros(c.0) 171 } 172} 173 174#[cfg(test)] 175mod test { 176 use super::*; 177 178 #[test] 179 fn test_parse_commit_event() -> anyhow::Result<()> { 180 let json = r#"{ 181 "rev":"3llrdsginou2i", 182 "operation":"create", 183 "collection":"app.bsky.feed.post", 184 "rkey":"3llrdsglqdc2s", 185 "cid": "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy", 186 "record": {"$type":"app.bsky.feed.post","createdAt":"2025-04-01T16:58:06.154Z","langs":["en"],"text":"I wish apirl 1st would stop existing lol"} 187 }"#; 188 let commit: CommitEvent = serde_json::from_str(json)?; 189 assert_eq!( 190 commit.cid.unwrap(), 191 "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy".parse()? 192 ); 193 assert_eq!( 194 commit.record.unwrap().get(), 195 r#"{"$type":"app.bsky.feed.post","createdAt":"2025-04-01T16:58:06.154Z","langs":["en"],"text":"I wish apirl 1st would stop existing lol"}"# 196 ); 197 Ok(()) 198 } 199 200 #[test] 201 fn test_parse_whole_event() -> anyhow::Result<()> { 202 let json = r#"{"did":"did:plc:ai3dzf35cth7s3st7n7jsd7r","time_us":1743526687419798,"kind":"commit","commit":{"rev":"3llrdsginou2i","operation":"create","collection":"app.bsky.feed.post","rkey":"3llrdsglqdc2s","record":{"$type":"app.bsky.feed.post","createdAt":"2025-04-01T16:58:06.154Z","langs":["en"],"text":"I wish apirl 1st would stop existing lol"},"cid":"bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy"}}"#; 203 let event: JetstreamEvent = serde_json::from_str(json)?; 204 assert_eq!(event.kind, EventKind::Commit); 205 assert!(event.commit.is_some()); 206 let commit = event.commit.unwrap(); 207 assert_eq!( 208 commit.cid.unwrap(), 209 "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy".parse()? 210 ); 211 assert_eq!( 212 commit.record.unwrap().get(), 213 r#"{"$type":"app.bsky.feed.post","createdAt":"2025-04-01T16:58:06.154Z","langs":["en"],"text":"I wish apirl 1st would stop existing lol"}"# 214 ); 215 Ok(()) 216 } 217}