Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use anyhow::{bail, Result}; 2use metrics::{ 3 counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram, Unit, 4}; 5use std::io::{Cursor, ErrorKind, Read}; 6use std::net::ToSocketAddrs; 7use std::thread; 8use std::time; 9use tinyjson::JsonValue; 10use tokio_util::sync::CancellationToken; 11use tungstenite::{client::IntoClientRequest, Error as TError, Message}; 12use zstd::dict::DecoderDictionary; 13 14const JETSTREAM_ZSTD_DICTIONARY: &[u8] = include_bytes!("../../zstd/dictionary"); 15 16pub fn consume_jetstream( 17 sender: flume::Sender<JsonValue>, 18 cursor: Option<u64>, 19 stream: String, 20 staying_alive: CancellationToken, 21) -> Result<()> { 22 describe_counter!( 23 "jetstream_connnect", 24 Unit::Count, 25 "attempts to connect to a jetstream server" 26 ); 27 describe_counter!( 28 "jetstream_read", 29 Unit::Count, 30 "attempts to read an event from jetstream" 31 ); 32 describe_counter!( 33 "jetstream_read_fail", 34 Unit::Count, 35 "failures to read events from jetstream" 36 ); 37 describe_counter!( 38 "jetstream_read_bytes", 39 Unit::Bytes, 40 "total received message bytes from jetstream" 41 ); 42 describe_counter!( 43 "jetstream_read_bytes_decompressed", 44 Unit::Bytes, 45 "total decompressed message bytes from jetstream" 46 ); 47 describe_histogram!( 48 "jetstream_read_bytes_decompressed", 49 Unit::Bytes, 50 "decompressed size of jetstream messages" 51 ); 52 describe_counter!( 53 "jetstream_events", 54 Unit::Count, 55 "valid json messages received" 56 ); 57 describe_histogram!( 58 "jetstream_events_queued", 59 Unit::Count, 60 "event messages waiting in queue" 61 ); 62 describe_gauge!( 63 "jetstream_cursor_age", 64 Unit::Microseconds, 65 "microseconds between our clock and the jetstream event's time_us" 66 ); 67 68 let dict = DecoderDictionary::copy(JETSTREAM_ZSTD_DICTIONARY); 69 let mut connect_retries = 0; 70 let mut latest_cursor = cursor; 71 'outer: loop { 72 let stream_url = format!( 73 "{stream}?compress=true{}", 74 latest_cursor 75 .map(|c| { 76 println!("starting with cursor from {:?} ago...", ts_age(c)); 77 format!("&cursor={c}") 78 }) 79 .unwrap_or("".into()) 80 ); 81 let mut req = (&stream_url).into_client_request()?; 82 let ua = format!("microcosm/constellation v{}", env!("CARGO_PKG_VERSION")); 83 req.headers_mut().insert("user-agent", ua.parse()?); 84 85 let host = req.uri().host().expect("jetstream request uri has a host"); 86 let port = req.uri().port().map(|p| p.as_u16()).unwrap_or(443); 87 let dest = format!("{host}:{port}"); 88 let addr = match dest.to_socket_addrs().map(|mut d| d.next()) { 89 Ok(Some(a)) => a, 90 Ok(None) => { 91 eprintln!( 92 "jetstream: could not resolve an address for {dest:?}. retrying after a bit?" 93 ); 94 thread::sleep(time::Duration::from_secs(15)); 95 continue; 96 } 97 Err(e) => { 98 eprintln!("jetstream failed to resolve address {dest:?}: {e:?} waiting and then retrying..."); 99 thread::sleep(time::Duration::from_secs(3)); 100 continue; 101 } 102 }; 103 let tcp_stream = match std::net::TcpStream::connect_timeout( 104 &addr, 105 time::Duration::from_secs(8), 106 ) { 107 Ok(s) => s, 108 Err(e) => { 109 eprintln!( 110 "jetstream failed to make tcp connection: {e:?}. (todo: clean up retry logic)" 111 ); 112 connect_retries += 1; 113 if connect_retries >= 7 { 114 eprintln!("jetstream: no more connect retries, breaking out."); 115 break; 116 } 117 let backoff = time::Duration::from_secs(connect_retries.try_into().unwrap()); 118 eprintln!("jetstream tcp failed to connect: {e:?}. backing off {backoff:?} before retrying..."); 119 thread::sleep(backoff); 120 continue; 121 } 122 }; 123 tcp_stream.set_read_timeout(Some(time::Duration::from_secs(4)))?; 124 tcp_stream.set_write_timeout(Some(time::Duration::from_secs(4)))?; 125 126 counter!("jetstream_connect", "url" => stream.clone(), "is_retry" => (connect_retries > 0).to_string()).increment(1); 127 println!("jetstream connecting, attempt #{connect_retries}, {stream_url:?} with user-agent: {ua:?}"); 128 let mut socket = match tungstenite::client_tls(req, tcp_stream) { 129 Ok((socket, _)) => { 130 println!("jetstream connected."); 131 // connect_retries = 0; // only reset once we have received a message vvv 132 socket 133 } 134 Err(e) => { 135 connect_retries += 1; 136 if connect_retries >= 7 { 137 eprintln!("jetstream: no more connect retries, breaking out."); 138 break; 139 } 140 let backoff = time::Duration::from_secs(connect_retries.try_into().unwrap()); 141 eprintln!("jetstream failed to connect: {e:?}. backing off {backoff:?} before retrying..."); 142 thread::sleep(backoff); 143 continue; 144 } 145 }; 146 147 loop { 148 if !socket.can_read() { 149 eprintln!("jetstream: socket says we cannot read -- flushing then breaking out."); 150 if let Err(e) = socket.flush() { 151 eprintln!("error while flushing socket: {e:?}"); 152 } 153 break; 154 } 155 156 if staying_alive.is_cancelled() { 157 eprintln!("jetstream: cancelling"); 158 // TODO: cleanly close the connection? 159 break 'outer; 160 } 161 162 counter!("jetstream_read").increment(1); 163 let b = match socket.read() { 164 Ok(Message::Binary(b)) => b, 165 Ok(Message::Text(_)) => { 166 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "received text") 167 .increment(1); 168 eprintln!("jetstream: unexpected text message, should be binary for compressed (ignoring)"); 169 continue; 170 } 171 Ok(Message::Close(f)) => { 172 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "server closed") 173 .increment(1); 174 println!("jetstream: closing the connection: {f:?}"); 175 continue; 176 } 177 Ok(Message::Ping(bytes)) => { 178 let _ = socket.send(Message::Pong(bytes)); 179 continue; 180 } 181 Ok(m) => { 182 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "unexpected message", "message" => format!("{m:?}")).increment(1); 183 eprintln!("jetstream: unexpected from read (ignoring): {m:?}"); 184 continue; 185 } 186 Err(TError::ConnectionClosed) => { 187 // clean exit 188 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "clean close") 189 .increment(1); 190 println!("jetstream closed the websocket cleanly."); 191 break; 192 } 193 Err(TError::AlreadyClosed) => { 194 // programming error 195 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "already closed") 196 .increment(1); 197 eprintln!( 198 "jetstream: got AlreadyClosed trying to .read() websocket. probably a bug." 199 ); 200 break; 201 } 202 Err(TError::Capacity(e)) => { 203 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "capacity error") 204 .increment(1); 205 eprintln!("jetstream: capacity error (ignoring): {e:?}"); 206 continue; 207 } 208 Err(TError::Utf8) => { 209 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "utf8 error") 210 .increment(1); 211 eprintln!("jetstream: utf8 error (ignoring)"); 212 continue; 213 } 214 Err(e) => { 215 eprintln!("jetstream: could not read message from socket. closing: {e:?}"); 216 if let TError::Io(io_err) = e { 217 if matches!(io_err.kind(), ErrorKind::WouldBlock | ErrorKind::TimedOut) { 218 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "timed out").increment(1); 219 println!("jetstream socket timed out. bailing to reconnect -- should we be trying to close first?"); 220 break; 221 } 222 } 223 match socket.close(None) { 224 Err(TError::ConnectionClosed) => { 225 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "clean close").increment(1); 226 println!("jetstream closed the websocket cleanly."); 227 break; 228 } 229 r => eprintln!("jetstream: close result after error: {r:?}"), 230 } 231 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "read error") 232 .increment(1); 233 // if we didn't immediately get ConnectionClosed, we should keep polling read 234 // until we get it. 235 continue; 236 } 237 }; 238 239 counter!("jetstream_read_bytes", "url" => stream.clone()).increment(b.len() as u64); 240 let mut cursor = Cursor::new(b); 241 let mut decoder = match zstd::stream::Decoder::with_prepared_dictionary( 242 &mut cursor, 243 &dict, 244 ) { 245 Ok(d) => d, 246 Err(e) => { 247 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "zstd decompress") 248 .increment(1); 249 eprintln!("jetstream: failed to decompress zstd message: {e:?}"); 250 continue; 251 } 252 }; 253 254 let mut s = String::new(); 255 match decoder.read_to_string(&mut s) { 256 Ok(n) => { 257 counter!("jetstream_read_bytes_decompressed", "url" => stream.clone()) 258 .increment(n as u64); 259 histogram!("jetstream_read_bytes_decompressed", "url" => stream.clone()) 260 .record(n as f64); 261 } 262 Err(e) => { 263 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "zstd string decode") 264 .increment(1); 265 eprintln!("jetstream: failed to decode zstd: {e:?}"); 266 continue; 267 } 268 } 269 270 let v = match s.parse() { 271 Ok(v) => v, 272 Err(e) => { 273 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "json parse") 274 .increment(1); 275 eprintln!("jetstream: failed to parse message as json: {e:?}"); 276 continue; 277 } 278 }; 279 280 // bit of a hack to have this here for now... 281 let ts = match get_event_time(&v) { 282 Some(ts) => ts, 283 None => { 284 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "invalid event") 285 .increment(1); 286 eprintln!("jetstream: encountered an event without a timestamp: ignoring it."); 287 continue; 288 } 289 }; 290 291 if let Err(flume::SendError(_rejected)) = sender.send(v) { 292 counter!("jetstream_events", "url" => stream.clone()).increment(1); 293 if sender.is_disconnected() { 294 eprintln!("jetstream: send channel disconnected -- nothing to do, bye."); 295 bail!("jetstream: send channel disconnected"); 296 } 297 eprintln!( 298 "jetstream: failed to send on channel, dropping update! (FIXME / HANDLEME)" 299 ); 300 } 301 histogram!("jetstream_events_queued", "url" => stream.clone()) 302 .record(sender.len() as f64); 303 304 // only actually update our cursor after we've managed to queue the event 305 latest_cursor = Some(ts); 306 gauge!("jetstream_cursor_age", "url" => stream.clone()) 307 .set(ts_age(ts).as_micros() as f64); 308 309 // great news if we got this far -- might be safe to assume the connection is up. 310 connect_retries = 0; 311 } 312 } 313 Ok(()) 314} 315 316fn get_event_time(v: &JsonValue) -> Option<u64> { 317 if let JsonValue::Object(root) = v { 318 if let JsonValue::Number(time_us) = root.get("time_us")? { 319 return Some(*time_us as u64); 320 }; 321 }; 322 None 323} 324 325fn ts_age(ts: u64) -> time::Duration { 326 (time::UNIX_EPOCH + time::Duration::from_micros(ts)) 327 .elapsed() 328 .unwrap_or(time::Duration::from_secs(0)) // saturate zero if ts > our system time 329}