tracks lexicons and how many times they appeared on the jetstream
1use std::time::Duration;
2
3use anyhow::anyhow;
4use futures_util::{SinkExt, StreamExt};
5use serde::{Deserialize, Serialize};
6use smol_str::SmolStr;
7use tokio::net::TcpStream;
8use tokio_util::sync::CancellationToken;
9use tokio_websockets::{ClientBuilder, MaybeTlsStream, Message as WsMessage, WebSocketStream};
10
11use crate::error::AppResult;
12
13pub struct JetstreamClient {
14 stream: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
15 tls_connector: tokio_websockets::Connector,
16 urls: Vec<SmolStr>,
17}
18
19impl JetstreamClient {
20 pub fn new(urls: impl IntoIterator<Item = impl Into<SmolStr>>) -> AppResult<Self> {
21 Ok(Self {
22 stream: None,
23 tls_connector: tokio_websockets::Connector::new()?,
24 urls: urls.into_iter().map(Into::into).collect(),
25 })
26 }
27
28 pub async fn connect(&mut self) -> AppResult<()> {
29 for uri in &self.urls {
30 let conn_result = ClientBuilder::new()
31 .connector(&self.tls_connector)
32 .uri(uri)?
33 .connect()
34 .await;
35 match conn_result {
36 Ok((stream, _)) => {
37 self.stream = Some(stream);
38 tracing::info!("connected to jetstream {}", uri);
39 return Ok(());
40 }
41 Err(err) => {
42 tracing::error!("failed to connect to jetstream {uri}: {err}");
43 }
44 };
45 }
46 Err(anyhow!("failed to connect to any jetstream server").into())
47 }
48
49 // automatically retries connection, only returning error if it fails many times
50 pub async fn read(&mut self, cancel_token: CancellationToken) -> AppResult<JetstreamEvent> {
51 let mut retry = false;
52 loop {
53 {
54 let Some(stream) = self.stream.as_mut() else {
55 return Err(anyhow!("not connected, call .connect() first").into());
56 };
57 tokio::select! {
58 res = stream.next() => match res {
59 Some(Ok(msg)) => {
60 if let Some(event) = msg
61 .as_text()
62 .and_then(|v| serde_json::from_str::<JetstreamEvent>(v).ok())
63 {
64 return Ok(event);
65 } else if msg.is_ping() {
66 let _ = stream.send(WsMessage::pong(msg.into_payload())).await;
67 } else {
68 return Err(anyhow!("unsupported message type").into());
69 }
70 }
71 Some(Err(err)) => {
72 tracing::error!("jetstream connection errored: {err}");
73 retry = true;
74 }
75 None => retry = true,
76 },
77 _ = cancel_token.cancelled() => {
78 return Err(anyhow!("cancelled").into());
79 }
80 }
81 }
82 // retry until connected
83 let mut backoff = Duration::from_secs(1);
84 while retry {
85 if backoff.as_secs() > 64 {
86 return Err(anyhow!("jetstream connection timed out").into());
87 }
88 tokio::select! {
89 res = self.connect() => if let Err(err) = res {
90 tracing::error!(
91 { retry_in = %backoff.as_secs() },
92 "couldn't retry jetstream connection: {err}",
93 );
94 tokio::time::sleep(backoff).await;
95 backoff *= 2;
96 continue;
97 },
98 _ = cancel_token.cancelled() => {
99 return Err(anyhow!("cancelled").into());
100 }
101 }
102
103 retry = false;
104 }
105 }
106 }
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
110#[serde(untagged)]
111pub enum JetstreamEvent {
112 /// Repository commit event (create/update operations)
113 Commit {
114 /// DID of the repository that was updated
115 did: String,
116 /// Event timestamp in microseconds since Unix epoch
117 time_us: u64,
118 /// Event type identifier
119 kind: String,
120
121 #[serde(rename = "commit")]
122 /// Commit operation details
123 commit: JetstreamEventCommit,
124 },
125
126 /// Repository delete event
127 Delete {
128 /// DID of the repository that was updated
129 did: String,
130 /// Event timestamp in microseconds since Unix epoch
131 time_us: u64,
132 /// Event type identifier
133 kind: String,
134
135 #[serde(rename = "commit")]
136 /// Delete operation details
137 commit: JetstreamEventDelete,
138 },
139
140 /// Identity document update event
141 Identity {
142 /// DID whose identity was updated
143 did: String,
144 /// Event timestamp in microseconds since Unix epoch
145 time_us: u64,
146 /// Event type identifier
147 kind: String,
148
149 #[serde(rename = "identity")]
150 /// Identity document data
151 identity: serde_json::Value,
152 },
153
154 /// Account-related event
155 Account {
156 /// DID of the account
157 did: String,
158 /// Event timestamp in microseconds since Unix epoch
159 time_us: u64,
160 /// Event type identifier
161 kind: String,
162
163 #[serde(rename = "account")]
164 /// Account data
165 identity: serde_json::Value,
166 },
167}
168
169/// Repository commit operation details
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct JetstreamEventCommit {
172 /// Repository revision identifier
173 pub rev: String,
174 /// Operation type (create, update)
175 pub operation: String,
176 /// AT Protocol collection name
177 pub collection: String,
178 /// Record key within the collection
179 pub rkey: String,
180 /// Content identifier (CID) of the record
181 pub cid: String,
182 /// Record data as JSON
183 pub record: serde_json::Value,
184}
185
186/// Repository delete operation details
187#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct JetstreamEventDelete {
189 /// Repository revision identifier
190 pub rev: String,
191 /// Operation type (delete)
192 pub operation: String,
193 /// AT Protocol collection name
194 pub collection: String,
195 /// Record key that was deleted
196 pub rkey: String,
197}