forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use anyhow::{bail, Result};
2use metrics::{
3 counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram, Unit,
4};
5use std::io::{Cursor, ErrorKind, Read};
6use std::net::ToSocketAddrs;
7use std::thread;
8use std::time;
9use tinyjson::JsonValue;
10use tokio_util::sync::CancellationToken;
11use tungstenite::{client::IntoClientRequest, Error as TError, Message};
12use zstd::dict::DecoderDictionary;
13
14const JETSTREAM_ZSTD_DICTIONARY: &[u8] = include_bytes!("../../zstd/dictionary");
15
16pub fn consume_jetstream(
17 sender: flume::Sender<JsonValue>,
18 cursor: Option<u64>,
19 stream: String,
20 staying_alive: CancellationToken,
21) -> Result<()> {
22 describe_counter!(
23 "jetstream_connnect",
24 Unit::Count,
25 "attempts to connect to a jetstream server"
26 );
27 describe_counter!(
28 "jetstream_read",
29 Unit::Count,
30 "attempts to read an event from jetstream"
31 );
32 describe_counter!(
33 "jetstream_read_fail",
34 Unit::Count,
35 "failures to read events from jetstream"
36 );
37 describe_counter!(
38 "jetstream_read_bytes",
39 Unit::Bytes,
40 "total received message bytes from jetstream"
41 );
42 describe_counter!(
43 "jetstream_read_bytes_decompressed",
44 Unit::Bytes,
45 "total decompressed message bytes from jetstream"
46 );
47 describe_histogram!(
48 "jetstream_read_bytes_decompressed",
49 Unit::Bytes,
50 "decompressed size of jetstream messages"
51 );
52 describe_counter!(
53 "jetstream_events",
54 Unit::Count,
55 "valid json messages received"
56 );
57 describe_histogram!(
58 "jetstream_events_queued",
59 Unit::Count,
60 "event messages waiting in queue"
61 );
62 describe_gauge!(
63 "jetstream_cursor_age",
64 Unit::Microseconds,
65 "microseconds between our clock and the jetstream event's time_us"
66 );
67
68 let dict = DecoderDictionary::copy(JETSTREAM_ZSTD_DICTIONARY);
69 let mut connect_retries = 0;
70 let mut latest_cursor = cursor;
71 'outer: loop {
72 let stream_url = format!(
73 "{stream}?compress=true{}",
74 latest_cursor
75 .map(|c| {
76 println!("starting with cursor from {:?} ago...", ts_age(c));
77 format!("&cursor={c}")
78 })
79 .unwrap_or("".into())
80 );
81 let mut req = (&stream_url).into_client_request()?;
82 let ua = format!("microcosm/constellation v{}", env!("CARGO_PKG_VERSION"));
83 req.headers_mut().insert("user-agent", ua.parse()?);
84
85 let host = req.uri().host().expect("jetstream request uri has a host");
86 let port = req.uri().port().map(|p| p.as_u16()).unwrap_or(443);
87 let dest = format!("{host}:{port}");
88 let addr = match dest.to_socket_addrs().map(|mut d| d.next()) {
89 Ok(Some(a)) => a,
90 Ok(None) => {
91 eprintln!(
92 "jetstream: could not resolve an address for {dest:?}. retrying after a bit?"
93 );
94 thread::sleep(time::Duration::from_secs(15));
95 continue;
96 }
97 Err(e) => {
98 eprintln!("jetstream failed to resolve address {dest:?}: {e:?} waiting and then retrying...");
99 thread::sleep(time::Duration::from_secs(3));
100 continue;
101 }
102 };
103 let tcp_stream = match std::net::TcpStream::connect_timeout(
104 &addr,
105 time::Duration::from_secs(8),
106 ) {
107 Ok(s) => s,
108 Err(e) => {
109 eprintln!(
110 "jetstream failed to make tcp connection: {e:?}. (todo: clean up retry logic)"
111 );
112 connect_retries += 1;
113 if connect_retries >= 7 {
114 eprintln!("jetstream: no more connect retries, breaking out.");
115 break;
116 }
117 let backoff = time::Duration::from_secs(connect_retries.try_into().unwrap());
118 eprintln!("jetstream tcp failed to connect: {e:?}. backing off {backoff:?} before retrying...");
119 thread::sleep(backoff);
120 continue;
121 }
122 };
123 tcp_stream.set_read_timeout(Some(time::Duration::from_secs(4)))?;
124 tcp_stream.set_write_timeout(Some(time::Duration::from_secs(4)))?;
125
126 counter!("jetstream_connect", "url" => stream.clone(), "is_retry" => (connect_retries > 0).to_string()).increment(1);
127 println!("jetstream connecting, attempt #{connect_retries}, {stream_url:?} with user-agent: {ua:?}");
128 let mut socket = match tungstenite::client_tls(req, tcp_stream) {
129 Ok((socket, _)) => {
130 println!("jetstream connected.");
131 // connect_retries = 0; // only reset once we have received a message vvv
132 socket
133 }
134 Err(e) => {
135 connect_retries += 1;
136 if connect_retries >= 7 {
137 eprintln!("jetstream: no more connect retries, breaking out.");
138 break;
139 }
140 let backoff = time::Duration::from_secs(connect_retries.try_into().unwrap());
141 eprintln!("jetstream failed to connect: {e:?}. backing off {backoff:?} before retrying...");
142 thread::sleep(backoff);
143 continue;
144 }
145 };
146
147 loop {
148 if !socket.can_read() {
149 eprintln!("jetstream: socket says we cannot read -- flushing then breaking out.");
150 if let Err(e) = socket.flush() {
151 eprintln!("error while flushing socket: {e:?}");
152 }
153 break;
154 }
155
156 if staying_alive.is_cancelled() {
157 eprintln!("jetstream: cancelling");
158 // TODO: cleanly close the connection?
159 break 'outer;
160 }
161
162 counter!("jetstream_read").increment(1);
163 let b = match socket.read() {
164 Ok(Message::Binary(b)) => b,
165 Ok(Message::Text(_)) => {
166 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "received text")
167 .increment(1);
168 eprintln!("jetstream: unexpected text message, should be binary for compressed (ignoring)");
169 continue;
170 }
171 Ok(Message::Close(f)) => {
172 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "server closed")
173 .increment(1);
174 println!("jetstream: closing the connection: {f:?}");
175 continue;
176 }
177 Ok(Message::Ping(bytes)) => {
178 let _ = socket.send(Message::Pong(bytes));
179 continue;
180 }
181 Ok(m) => {
182 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "unexpected message", "message" => format!("{m:?}")).increment(1);
183 eprintln!("jetstream: unexpected from read (ignoring): {m:?}");
184 continue;
185 }
186 Err(TError::ConnectionClosed) => {
187 // clean exit
188 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "clean close")
189 .increment(1);
190 println!("jetstream closed the websocket cleanly.");
191 break;
192 }
193 Err(TError::AlreadyClosed) => {
194 // programming error
195 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "already closed")
196 .increment(1);
197 eprintln!(
198 "jetstream: got AlreadyClosed trying to .read() websocket. probably a bug."
199 );
200 break;
201 }
202 Err(TError::Capacity(e)) => {
203 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "capacity error")
204 .increment(1);
205 eprintln!("jetstream: capacity error (ignoring): {e:?}");
206 continue;
207 }
208 Err(TError::Utf8) => {
209 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "utf8 error")
210 .increment(1);
211 eprintln!("jetstream: utf8 error (ignoring)");
212 continue;
213 }
214 Err(e) => {
215 eprintln!("jetstream: could not read message from socket. closing: {e:?}");
216 if let TError::Io(io_err) = e {
217 if matches!(io_err.kind(), ErrorKind::WouldBlock | ErrorKind::TimedOut) {
218 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "timed out").increment(1);
219 println!("jetstream socket timed out. bailing to reconnect -- should we be trying to close first?");
220 break;
221 }
222 }
223 match socket.close(None) {
224 Err(TError::ConnectionClosed) => {
225 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "clean close").increment(1);
226 println!("jetstream closed the websocket cleanly.");
227 break;
228 }
229 r => eprintln!("jetstream: close result after error: {r:?}"),
230 }
231 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "read error")
232 .increment(1);
233 // if we didn't immediately get ConnectionClosed, we should keep polling read
234 // until we get it.
235 continue;
236 }
237 };
238
239 counter!("jetstream_read_bytes", "url" => stream.clone()).increment(b.len() as u64);
240 let mut cursor = Cursor::new(b);
241 let mut decoder = match zstd::stream::Decoder::with_prepared_dictionary(
242 &mut cursor,
243 &dict,
244 ) {
245 Ok(d) => d,
246 Err(e) => {
247 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "zstd decompress")
248 .increment(1);
249 eprintln!("jetstream: failed to decompress zstd message: {e:?}");
250 continue;
251 }
252 };
253
254 let mut s = String::new();
255 match decoder.read_to_string(&mut s) {
256 Ok(n) => {
257 counter!("jetstream_read_bytes_decompressed", "url" => stream.clone())
258 .increment(n as u64);
259 histogram!("jetstream_read_bytes_decompressed", "url" => stream.clone())
260 .record(n as f64);
261 }
262 Err(e) => {
263 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "zstd string decode")
264 .increment(1);
265 eprintln!("jetstream: failed to decode zstd: {e:?}");
266 continue;
267 }
268 }
269
270 let v = match s.parse() {
271 Ok(v) => v,
272 Err(e) => {
273 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "json parse")
274 .increment(1);
275 eprintln!("jetstream: failed to parse message as json: {e:?}");
276 continue;
277 }
278 };
279
280 // bit of a hack to have this here for now...
281 let ts = match get_event_time(&v) {
282 Some(ts) => ts,
283 None => {
284 counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "invalid event")
285 .increment(1);
286 eprintln!("jetstream: encountered an event without a timestamp: ignoring it.");
287 continue;
288 }
289 };
290
291 if let Err(flume::SendError(_rejected)) = sender.send(v) {
292 counter!("jetstream_events", "url" => stream.clone()).increment(1);
293 if sender.is_disconnected() {
294 eprintln!("jetstream: send channel disconnected -- nothing to do, bye.");
295 bail!("jetstream: send channel disconnected");
296 }
297 eprintln!(
298 "jetstream: failed to send on channel, dropping update! (FIXME / HANDLEME)"
299 );
300 }
301 histogram!("jetstream_events_queued", "url" => stream.clone())
302 .record(sender.len() as f64);
303
304 // only actually update our cursor after we've managed to queue the event
305 latest_cursor = Some(ts);
306 gauge!("jetstream_cursor_age", "url" => stream.clone())
307 .set(ts_age(ts).as_micros() as f64);
308
309 // great news if we got this far -- might be safe to assume the connection is up.
310 connect_retries = 0;
311 }
312 }
313 Ok(())
314}
315
316fn get_event_time(v: &JsonValue) -> Option<u64> {
317 if let JsonValue::Object(root) = v {
318 if let JsonValue::Number(time_us) = root.get("time_us")? {
319 return Some(*time_us as u64);
320 };
321 };
322 None
323}
324
325fn ts_age(ts: u64) -> time::Duration {
326 (time::UNIX_EPOCH + time::Duration::from_micros(ts))
327 .elapsed()
328 .unwrap_or(time::Duration::from_secs(0)) // saturate zero if ts > our system time
329}