tracks lexicons and how many times they appeared on the jetstream
at migrate 6.1 kB view raw
1use std::time::Duration; 2 3use anyhow::anyhow; 4use futures_util::{SinkExt, StreamExt}; 5use serde::{Deserialize, Serialize}; 6use smol_str::SmolStr; 7use tokio::net::TcpStream; 8use tokio_util::sync::CancellationToken; 9use tokio_websockets::{ClientBuilder, MaybeTlsStream, Message as WsMessage, WebSocketStream}; 10 11use crate::error::AppResult; 12 13pub struct JetstreamClient { 14 stream: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>, 15 tls_connector: tokio_websockets::Connector, 16 url: SmolStr, 17} 18 19impl JetstreamClient { 20 pub fn new(url: &str) -> AppResult<Self> { 21 Ok(Self { 22 stream: None, 23 tls_connector: tokio_websockets::Connector::new()?, 24 url: SmolStr::new(url), 25 }) 26 } 27 28 pub async fn connect(&mut self) -> AppResult<()> { 29 let (stream, _) = ClientBuilder::new() 30 .connector(&self.tls_connector) 31 .uri(&self.url)? 32 .connect() 33 .await?; 34 self.stream = Some(stream); 35 tracing::info!("connected to jetstream ({})", self.url); 36 Ok(()) 37 } 38 39 // automatically retries connection, only returning error if it fails many times 40 pub async fn read(&mut self, cancel_token: CancellationToken) -> AppResult<JetstreamEvent> { 41 let mut retry = false; 42 loop { 43 { 44 let Some(stream) = self.stream.as_mut() else { 45 return Err(anyhow!("not connected, call .connect() first").into()); 46 }; 47 tokio::select! { 48 res = stream.next() => match res { 49 Some(Ok(msg)) => { 50 if let Some(event) = msg 51 .as_text() 52 .and_then(|v| serde_json::from_str::<JetstreamEvent>(v).ok()) 53 { 54 return Ok(event); 55 } else if msg.is_ping() { 56 let _ = stream.send(WsMessage::pong(msg.into_payload())).await; 57 } else { 58 return Err(anyhow!("unsupported message type").into()); 59 } 60 } 61 Some(Err(err)) => { 62 tracing::error!("jetstream connection errored: {err}"); 63 retry = true; 64 } 65 None => retry = true, 66 }, 67 _ = cancel_token.cancelled() => { 68 return Err(anyhow!("cancelled").into()); 69 } 70 } 71 } 72 // retry until connected 73 let mut backoff = Duration::from_secs(1); 74 while retry { 75 if backoff.as_secs() > 64 { 76 return Err(anyhow!("jetstream connection timed out").into()); 77 } 78 tokio::select! { 79 res = self.connect() => if let Err(err) = res { 80 tracing::error!( 81 { retry_in = %backoff.as_secs() }, 82 "couldn't retry jetstream connection: {err}", 83 ); 84 tokio::time::sleep(backoff).await; 85 backoff *= 2; 86 continue; 87 }, 88 _ = cancel_token.cancelled() => { 89 return Err(anyhow!("cancelled").into()); 90 } 91 } 92 93 retry = false; 94 } 95 } 96 } 97} 98 99#[derive(Debug, Clone, Serialize, Deserialize)] 100#[serde(untagged)] 101pub enum JetstreamEvent { 102 /// Repository commit event (create/update operations) 103 Commit { 104 /// DID of the repository that was updated 105 did: String, 106 /// Event timestamp in microseconds since Unix epoch 107 time_us: u64, 108 /// Event type identifier 109 kind: String, 110 111 #[serde(rename = "commit")] 112 /// Commit operation details 113 commit: JetstreamEventCommit, 114 }, 115 116 /// Repository delete event 117 Delete { 118 /// DID of the repository that was updated 119 did: String, 120 /// Event timestamp in microseconds since Unix epoch 121 time_us: u64, 122 /// Event type identifier 123 kind: String, 124 125 #[serde(rename = "commit")] 126 /// Delete operation details 127 commit: JetstreamEventDelete, 128 }, 129 130 /// Identity document update event 131 Identity { 132 /// DID whose identity was updated 133 did: String, 134 /// Event timestamp in microseconds since Unix epoch 135 time_us: u64, 136 /// Event type identifier 137 kind: String, 138 139 #[serde(rename = "identity")] 140 /// Identity document data 141 identity: serde_json::Value, 142 }, 143 144 /// Account-related event 145 Account { 146 /// DID of the account 147 did: String, 148 /// Event timestamp in microseconds since Unix epoch 149 time_us: u64, 150 /// Event type identifier 151 kind: String, 152 153 #[serde(rename = "account")] 154 /// Account data 155 identity: serde_json::Value, 156 }, 157} 158 159/// Repository commit operation details 160#[derive(Debug, Clone, Serialize, Deserialize)] 161pub struct JetstreamEventCommit { 162 /// Repository revision identifier 163 pub rev: String, 164 /// Operation type (create, update) 165 pub operation: String, 166 /// AT Protocol collection name 167 pub collection: String, 168 /// Record key within the collection 169 pub rkey: String, 170 /// Content identifier (CID) of the record 171 pub cid: String, 172 /// Record data as JSON 173 pub record: serde_json::Value, 174} 175 176/// Repository delete operation details 177#[derive(Debug, Clone, Serialize, Deserialize)] 178pub struct JetstreamEventDelete { 179 /// Repository revision identifier 180 pub rev: String, 181 /// Operation type (delete) 182 pub operation: String, 183 /// AT Protocol collection name 184 pub collection: String, 185 /// Record key that was deleted 186 pub rkey: String, 187}