tracks lexicons and how many times they appeared on the jetstream
at main 6.5 kB view raw
1use std::time::Duration; 2 3use anyhow::anyhow; 4use futures_util::{SinkExt, StreamExt}; 5use serde::{Deserialize, Serialize}; 6use smol_str::SmolStr; 7use tokio::net::TcpStream; 8use tokio_util::sync::CancellationToken; 9use tokio_websockets::{ClientBuilder, MaybeTlsStream, Message as WsMessage, WebSocketStream}; 10 11use crate::error::AppResult; 12 13pub struct JetstreamClient { 14 stream: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>, 15 tls_connector: tokio_websockets::Connector, 16 urls: Vec<SmolStr>, 17} 18 19impl JetstreamClient { 20 pub fn new(urls: impl IntoIterator<Item = impl Into<SmolStr>>) -> AppResult<Self> { 21 Ok(Self { 22 stream: None, 23 tls_connector: tokio_websockets::Connector::new()?, 24 urls: urls.into_iter().map(Into::into).collect(), 25 }) 26 } 27 28 pub async fn connect(&mut self) -> AppResult<()> { 29 for uri in &self.urls { 30 let conn_result = ClientBuilder::new() 31 .connector(&self.tls_connector) 32 .uri(uri)? 33 .connect() 34 .await; 35 match conn_result { 36 Ok((stream, _)) => { 37 self.stream = Some(stream); 38 tracing::info!("connected to jetstream {}", uri); 39 return Ok(()); 40 } 41 Err(err) => { 42 tracing::error!("failed to connect to jetstream {uri}: {err}"); 43 } 44 }; 45 } 46 Err(anyhow!("failed to connect to any jetstream server").into()) 47 } 48 49 // automatically retries connection, only returning error if it fails many times 50 pub async fn read(&mut self, cancel_token: CancellationToken) -> AppResult<JetstreamEvent> { 51 let mut retry = false; 52 loop { 53 { 54 let Some(stream) = self.stream.as_mut() else { 55 return Err(anyhow!("not connected, call .connect() first").into()); 56 }; 57 tokio::select! { 58 res = stream.next() => match res { 59 Some(Ok(msg)) => { 60 if let Some(event) = msg 61 .as_text() 62 .and_then(|v| serde_json::from_str::<JetstreamEvent>(v).ok()) 63 { 64 return Ok(event); 65 } else if msg.is_ping() { 66 let _ = stream.send(WsMessage::pong(msg.into_payload())).await; 67 } else { 68 return Err(anyhow!("unsupported message type").into()); 69 } 70 } 71 Some(Err(err)) => { 72 tracing::error!("jetstream connection errored: {err}"); 73 retry = true; 74 } 75 None => retry = true, 76 }, 77 _ = cancel_token.cancelled() => { 78 return Err(anyhow!("cancelled").into()); 79 } 80 } 81 } 82 // retry until connected 83 let mut backoff = Duration::from_secs(1); 84 while retry { 85 if backoff.as_secs() > 64 { 86 return Err(anyhow!("jetstream connection timed out").into()); 87 } 88 tokio::select! { 89 res = self.connect() => if let Err(err) = res { 90 tracing::error!( 91 { retry_in = %backoff.as_secs() }, 92 "couldn't retry jetstream connection: {err}", 93 ); 94 tokio::time::sleep(backoff).await; 95 backoff *= 2; 96 continue; 97 }, 98 _ = cancel_token.cancelled() => { 99 return Err(anyhow!("cancelled").into()); 100 } 101 } 102 103 retry = false; 104 } 105 } 106 } 107} 108 109#[derive(Debug, Clone, Serialize, Deserialize)] 110#[serde(untagged)] 111pub enum JetstreamEvent { 112 /// Repository commit event (create/update operations) 113 Commit { 114 /// DID of the repository that was updated 115 did: String, 116 /// Event timestamp in microseconds since Unix epoch 117 time_us: u64, 118 /// Event type identifier 119 kind: String, 120 121 #[serde(rename = "commit")] 122 /// Commit operation details 123 commit: JetstreamEventCommit, 124 }, 125 126 /// Repository delete event 127 Delete { 128 /// DID of the repository that was updated 129 did: String, 130 /// Event timestamp in microseconds since Unix epoch 131 time_us: u64, 132 /// Event type identifier 133 kind: String, 134 135 #[serde(rename = "commit")] 136 /// Delete operation details 137 commit: JetstreamEventDelete, 138 }, 139 140 /// Identity document update event 141 Identity { 142 /// DID whose identity was updated 143 did: String, 144 /// Event timestamp in microseconds since Unix epoch 145 time_us: u64, 146 /// Event type identifier 147 kind: String, 148 149 #[serde(rename = "identity")] 150 /// Identity document data 151 identity: serde_json::Value, 152 }, 153 154 /// Account-related event 155 Account { 156 /// DID of the account 157 did: String, 158 /// Event timestamp in microseconds since Unix epoch 159 time_us: u64, 160 /// Event type identifier 161 kind: String, 162 163 #[serde(rename = "account")] 164 /// Account data 165 identity: serde_json::Value, 166 }, 167} 168 169/// Repository commit operation details 170#[derive(Debug, Clone, Serialize, Deserialize)] 171pub struct JetstreamEventCommit { 172 /// Repository revision identifier 173 pub rev: String, 174 /// Operation type (create, update) 175 pub operation: String, 176 /// AT Protocol collection name 177 pub collection: String, 178 /// Record key within the collection 179 pub rkey: String, 180 /// Content identifier (CID) of the record 181 pub cid: String, 182 /// Record data as JSON 183 pub record: serde_json::Value, 184} 185 186/// Repository delete operation details 187#[derive(Debug, Clone, Serialize, Deserialize)] 188pub struct JetstreamEventDelete { 189 /// Repository revision identifier 190 pub rev: String, 191 /// Operation type (delete) 192 pub operation: String, 193 /// AT Protocol collection name 194 pub collection: String, 195 /// Record key that was deleted 196 pub rkey: String, 197}