forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use anyhow::{bail, Result};
2use metrics::{
3 counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram, Unit,
4};
5use std::io::{Cursor, ErrorKind, Read};
6use std::net::ToSocketAddrs;
7use std::thread;
8use std::time;
9use tinyjson::JsonValue;
10use tokio_util::sync::CancellationToken;
11use tungstenite::{client::IntoClientRequest, Error as TError, Message};
12use zstd::dict::DecoderDictionary;
13
14const JETSTREAM_ZSTD_DICTIONARY: &[u8] = include_bytes!("../../zstd/dictionary");
15
16pub fn consume_jetstream(
17 sender: flume::Sender<JsonValue>,
18 cursor: Option<u64>,
19 stream: String,
20 staying_alive: CancellationToken,
21) -> Result<()> {
22 describe_counter!(
23 "jetstream_connnect",
24 Unit::Count,
25 "attempts to connect to a jetstream server"
26 );
27 describe_counter!(
28 "jetstream_read",
29 Unit::Count,
30 "attempts to read an event from jetstream"
31 );
32 describe_counter!(
33 "jetstream_read_fail",
34 Unit::Count,
35 "failures to read events from jetstream"
36 );
37 describe_counter!(
38 "jetstream_read_bytes",
39 Unit::Bytes,
40 "total received message bytes from jetstream"
41 );
42 describe_counter!(
43 "jetstream_read_bytes_decompressed",
44 Unit::Bytes,
45 "total decompressed message bytes from jetstream"
46 );
47 describe_histogram!(
48 "jetstream_read_bytes_decompressed",
49 Unit::Bytes,
50 "decompressed size of jetstream messages"
51 );
52 describe_counter!(
53 "jetstream_events",
54 Unit::Count,
55 "valid json messages received"
56 );
57 describe_histogram!(
58 "jetstream_events_queued",
59 Unit::Count,
60 "event messages waiting in queue"
61 );
62 describe_gauge!(
63 "jetstream_cursor_age",
64 Unit::Microseconds,
65 "microseconds between our clock and the jetstream event's time_us"
66 );
67
68 let dict = DecoderDictionary::copy(JETSTREAM_ZSTD_DICTIONARY);
69 let mut connect_retries = 0;
70 let mut latest_cursor = cursor;
71 'outer: loop {
72 let stream_url = format!(
73 "{stream}?compress=true{}",
74 latest_cursor
75 .map(|c| {
76 println!("starting with cursor from {:?} ago...", ts_age(c));
77 format!("&cursor={c}")
78 })
79 .unwrap_or("".into())
80 );
81 let mut req = (&stream_url).into_client_request()?;
82 let ua = format!("microcosm/constellation v{}", env!("CARGO_PKG_VERSION"));
83 req.headers_mut().insert("user-agent", ua.parse()?);
84
85 let host = req.uri().host().expect("jetstream request uri has a host");
86 let port = req.uri().port().map(|p| p.as_u16()).unwrap_or(443);
87 let dest = format!("{host}:{port}");
88 let addr = match dest.to_socket_addrs().map(|mut d| d.next()) {
89 Ok(Some(a)) => a,
90 Ok(None) => {
91 eprintln!(
92 "jetstream: could not resolve an address for {dest:?}. retrying after a bit?"
93 );
94 thread::sleep(time::Duration::from_secs(15));
95 continue;
96 }
97 Err(e) => {
98 eprintln!("jetstream failed to resolve address {dest:?}: {e:?} waiting and then retrying...");
99 thread::sleep(time::Duration::from_secs(3));
100 continue;
101 }
102 };
103 let tcp_stream = match std::net::TcpStream::connect_timeout(
104 &addr,
105 time::Duration::from_secs(8),
106 ) {
107 Ok(s) => s,
108 Err(e) => {
109 eprintln!(
110 "jetstream failed to make tcp connection: {e:?}. (todo: clean up retry logic)"
111 );
112 connect_retries += 1;
113 if connect_retries >= 7 {
114 eprintln!("jetstream: no more connect retries, breaking out.");
115 break;
116 }
117 let backoff = time::Duration::from_secs(connect_retries.try_into().unwrap());
118 eprintln!("jetstream tcp failed to connect: {e:?}. backing off {backoff:?} before retrying...");
119 thread::sleep(backoff);
120 continue;
121 }
122 };
123 tcp_stream.set_read_timeout(Some(time::Duration::from_secs(4)))?;
124 tcp_stream.set_write_timeout(Some(time::Duration::from_secs(4)))?;
125
126 counter!("jetstream_connect", "url" => stream.clone(), "is_retry" => (connect_retries > 0).to_string()).increment(1);
127 println!("jetstream connecting, attempt #{connect_retries}, {stream_url:?} with user-agent: {ua:?}");
128 let mut socket = match tungstenite::client_tls(req, tcp_stream) {
129 Ok((socket, _)) => {
130 println!("jetstream connected.");
131 // connect_retries = 0; // only reset once we have received a message vvv
132 socket
133 }
134 Err(e) => {
135 connect_retries += 1;
136 if connect_retries >= 7 {
137 eprintln!("jetstream: no more connect retries, breaking out.");
138 break;
139 }
140 let backoff = time::Duration::from_secs(connect_retries.try_into().unwrap());
141 eprintln!("jetstream failed to connect: {e:?}. backing off {backoff:?} before retrying...");
142 thread::sleep(backoff);
143 continue;
144 }
145 };
146
147 loop {
148 if !socket.can_read() {
149 eprintln!("jetstream: socket says we cannot read -- flushing then breaking out.");
150 if let Err(e) = socket.flush() {
151 eprintln!("error while flushing socket: {e:?}");
152 }
153 break;
154 }
155
156 if staying_alive.is_cancelled() {
157 eprintln!("jetstream: cancelling");
158 // TODO: cleanly close the connection?
159 break 'outer;
160 }
161
162 counter!("jetstream_read").increment(1);
163 let b = match socket.read() {
164 Ok(Message::Binary(b)) => b,
165 Ok(Message::Text(_)) => {
166 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "received text")
167 .increment(1);
168 eprintln!("jetstream: unexpected text message, should be binary for compressed (ignoring)");
169 continue;
170 }
171 Ok(Message::Close(f)) => {
172 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "server closed")
173 .increment(1);
174 println!("jetstream: closing the connection: {f:?}");
175 continue;
176 }
177 Ok(Message::Ping(bytes)) => {
178 let _ = socket.send(Message::Pong(bytes));
179 continue;
180 }
181 Ok(m) => {
182 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "unexpected message", "message" => format!("{m:?}")).increment(1);
183 eprintln!("jetstream: unexpected from read (ignoring): {m:?}");
184 continue;
185 }
186 Err(TError::ConnectionClosed) => {
187 // clean exit
188 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "clean close")
189 .increment(1);
190 println!("jetstream closed the websocket cleanly.");
191 break;
192 }
193 Err(TError::AlreadyClosed) => {
194 // programming error
195 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "already closed")
196 .increment(1);
197 eprintln!(
198 "jetstream: got AlreadyClosed trying to .read() websocket. probably a bug."
199 );
200 break;
201 }
202 Err(TError::Capacity(e)) => {
203 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "capacity error")
204 .increment(1);
205 eprintln!("jetstream: capacity error (ignoring): {e:?}");
206 continue;
207 }
208 Err(TError::Utf8) => {
209 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "utf8 error")
210 .increment(1);
211 eprintln!("jetstream: utf8 error (ignoring)");
212 continue;
213 }
214 Err(e) => {
215 eprintln!("jetstream: could not read message from socket. closing: {e:?}");
216 if let TError::Io(io_err) = e {
217 if matches!(io_err.kind(), ErrorKind::WouldBlock | ErrorKind::TimedOut) {
218 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "timed out").increment(1);
219 println!("jetstream socket timed out. bailing to reconnect -- should we be trying to close first?");
220 break;
221 }
222 }
223 match socket.close(None) {
224 Err(TError::ConnectionClosed) => {
225 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "clean close").increment(1);
226 println!("jetstream closed the websocket cleanly.");
227 break;
228 }
229 Err(_) => {
230 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "dirty close").increment(1);
231 println!("jetstream failed to close the websocket cleanly.");
232 break;
233 }
234 Ok(r) => {
235 eprintln!("jetstream: close result after error: {r:?}");
236 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "read error")
237 .increment(1);
238 // if we didn't immediately get ConnectionClosed, we should keep polling read
239 // until we get it.
240 continue;
241 }
242 }
243 }
244 };
245
246 counter!("jetstream_read_bytes", "url" => stream.clone()).increment(b.len() as u64);
247 let mut cursor = Cursor::new(b);
248 let mut decoder = match zstd::stream::Decoder::with_prepared_dictionary(
249 &mut cursor,
250 &dict,
251 ) {
252 Ok(d) => d,
253 Err(e) => {
254 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "zstd decompress")
255 .increment(1);
256 eprintln!("jetstream: failed to decompress zstd message: {e:?}");
257 continue;
258 }
259 };
260
261 let mut s = String::new();
262 match decoder.read_to_string(&mut s) {
263 Ok(n) => {
264 counter!("jetstream_read_bytes_decompressed", "url" => stream.clone())
265 .increment(n as u64);
266 histogram!("jetstream_read_bytes_decompressed", "url" => stream.clone())
267 .record(n as f64);
268 }
269 Err(e) => {
270 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "zstd string decode")
271 .increment(1);
272 eprintln!("jetstream: failed to decode zstd: {e:?}");
273 continue;
274 }
275 }
276
277 let v = match s.parse() {
278 Ok(v) => v,
279 Err(e) => {
280 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "json parse")
281 .increment(1);
282 eprintln!("jetstream: failed to parse message as json: {e:?}");
283 continue;
284 }
285 };
286
287 // bit of a hack to have this here for now...
288 let ts = match get_event_time(&v) {
289 Some(ts) => ts,
290 None => {
291 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "invalid event")
292 .increment(1);
293 eprintln!("jetstream: encountered an event without a timestamp: ignoring it.");
294 continue;
295 }
296 };
297
298 if let Err(flume::SendError(_rejected)) = sender.send(v) {
299 counter!("jetstream_events", "url" => stream.clone()).increment(1);
300 if sender.is_disconnected() {
301 eprintln!("jetstream: send channel disconnected -- nothing to do, bye.");
302 bail!("jetstream: send channel disconnected");
303 }
304 eprintln!(
305 "jetstream: failed to send on channel, dropping update! (FIXME / HANDLEME)"
306 );
307 }
308 histogram!("jetstream_events_queued", "url" => stream.clone())
309 .record(sender.len() as f64);
310
311 // only actually update our cursor after we've managed to queue the event
312 latest_cursor = Some(ts);
313 gauge!("jetstream_cursor_age", "url" => stream.clone())
314 .set(ts_age(ts).as_micros() as f64);
315
316 // great news if we got this far -- might be safe to assume the connection is up.
317 connect_retries = 0;
318 }
319 }
320 Ok(())
321}
322
323fn get_event_time(v: &JsonValue) -> Option<u64> {
324 if let JsonValue::Object(root) = v {
325 if let JsonValue::Number(time_us) = root.get("time_us")? {
326 return Some(*time_us as u64);
327 };
328 };
329 None
330}
331
332fn ts_age(ts: u64) -> time::Duration {
333 (time::UNIX_EPOCH + time::Duration::from_micros(ts))
334 .elapsed()
335 .unwrap_or(time::Duration::from_secs(0)) // saturate zero if ts > our system time
336}