tracks lexicons and how many times they appeared on the jetstream
1use std::time::Duration;
2
3use anyhow::anyhow;
4use futures_util::{SinkExt, StreamExt};
5use serde::{Deserialize, Serialize};
6use smol_str::SmolStr;
7use tokio::net::TcpStream;
8use tokio_util::sync::CancellationToken;
9use tokio_websockets::{ClientBuilder, MaybeTlsStream, Message as WsMessage, WebSocketStream};
10
11use crate::error::AppResult;
12
13pub struct JetstreamClient {
14 stream: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
15 tls_connector: tokio_websockets::Connector,
16 url: SmolStr,
17}
18
19impl JetstreamClient {
20 pub fn new(url: &str) -> AppResult<Self> {
21 Ok(Self {
22 stream: None,
23 tls_connector: tokio_websockets::Connector::new()?,
24 url: SmolStr::new(url),
25 })
26 }
27
28 pub async fn connect(&mut self) -> AppResult<()> {
29 let (stream, _) = ClientBuilder::new()
30 .connector(&self.tls_connector)
31 .uri(&self.url)?
32 .connect()
33 .await?;
34 self.stream = Some(stream);
35 tracing::info!("connected to jetstream ({})", self.url);
36 Ok(())
37 }
38
39 // automatically retries connection, only returning error if it fails many times
40 pub async fn read(&mut self, cancel_token: CancellationToken) -> AppResult<JetstreamEvent> {
41 let mut retry = false;
42 loop {
43 {
44 let Some(stream) = self.stream.as_mut() else {
45 return Err(anyhow!("not connected, call .connect() first").into());
46 };
47 tokio::select! {
48 res = stream.next() => match res {
49 Some(Ok(msg)) => {
50 if let Some(event) = msg
51 .as_text()
52 .and_then(|v| serde_json::from_str::<JetstreamEvent>(v).ok())
53 {
54 return Ok(event);
55 } else if msg.is_ping() {
56 let _ = stream.send(WsMessage::pong(msg.into_payload())).await;
57 } else {
58 return Err(anyhow!("unsupported message type").into());
59 }
60 }
61 Some(Err(err)) => {
62 tracing::error!("jetstream connection errored: {err}");
63 retry = true;
64 }
65 None => retry = true,
66 },
67 _ = cancel_token.cancelled() => {
68 return Err(anyhow!("cancelled").into());
69 }
70 }
71 }
72 // retry until connected
73 let mut backoff = Duration::from_secs(1);
74 while retry {
75 if backoff.as_secs() > 64 {
76 return Err(anyhow!("jetstream connection timed out").into());
77 }
78 tokio::select! {
79 res = self.connect() => if let Err(err) = res {
80 tracing::error!(
81 { retry_in = %backoff.as_secs() },
82 "couldn't retry jetstream connection: {err}",
83 );
84 tokio::time::sleep(backoff).await;
85 backoff *= 2;
86 continue;
87 },
88 _ = cancel_token.cancelled() => {
89 return Err(anyhow!("cancelled").into());
90 }
91 }
92
93 retry = false;
94 }
95 }
96 }
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
100#[serde(untagged)]
101pub enum JetstreamEvent {
102 /// Repository commit event (create/update operations)
103 Commit {
104 /// DID of the repository that was updated
105 did: String,
106 /// Event timestamp in microseconds since Unix epoch
107 time_us: u64,
108 /// Event type identifier
109 kind: String,
110
111 #[serde(rename = "commit")]
112 /// Commit operation details
113 commit: JetstreamEventCommit,
114 },
115
116 /// Repository delete event
117 Delete {
118 /// DID of the repository that was updated
119 did: String,
120 /// Event timestamp in microseconds since Unix epoch
121 time_us: u64,
122 /// Event type identifier
123 kind: String,
124
125 #[serde(rename = "commit")]
126 /// Delete operation details
127 commit: JetstreamEventDelete,
128 },
129
130 /// Identity document update event
131 Identity {
132 /// DID whose identity was updated
133 did: String,
134 /// Event timestamp in microseconds since Unix epoch
135 time_us: u64,
136 /// Event type identifier
137 kind: String,
138
139 #[serde(rename = "identity")]
140 /// Identity document data
141 identity: serde_json::Value,
142 },
143
144 /// Account-related event
145 Account {
146 /// DID of the account
147 did: String,
148 /// Event timestamp in microseconds since Unix epoch
149 time_us: u64,
150 /// Event type identifier
151 kind: String,
152
153 #[serde(rename = "account")]
154 /// Account data
155 identity: serde_json::Value,
156 },
157}
158
159/// Repository commit operation details
160#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct JetstreamEventCommit {
162 /// Repository revision identifier
163 pub rev: String,
164 /// Operation type (create, update)
165 pub operation: String,
166 /// AT Protocol collection name
167 pub collection: String,
168 /// Record key within the collection
169 pub rkey: String,
170 /// Content identifier (CID) of the record
171 pub cid: String,
172 /// Record data as JSON
173 pub record: serde_json::Value,
174}
175
176/// Repository delete operation details
177#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct JetstreamEventDelete {
179 /// Repository revision identifier
180 pub rev: String,
181 /// Operation type (delete)
182 pub operation: String,
183 /// AT Protocol collection name
184 pub collection: String,
185 /// Record key that was deleted
186 pub rkey: String,
187}