forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1pub mod error;
2pub mod events;
3pub mod exports;
4
5use std::{
6 io::Cursor as IoCursor,
7 time::{
8 Duration,
9 Instant,
10 },
11};
12
13use futures_util::{
14 stream::StreamExt,
15 SinkExt,
16};
17#[cfg(feature = "metrics")]
18use metrics::{
19 counter,
20 describe_counter,
21 Unit,
22};
23use tokio::{
24 net::TcpStream,
25 sync::mpsc::{
26 channel,
27 Receiver,
28 Sender,
29 },
30 time::timeout,
31};
32use tokio_tungstenite::{
33 connect_async,
34 tungstenite::{
35 client::{
36 ClientRequestBuilder,
37 IntoClientRequest,
38 },
39 handshake::client::Request,
40 Message,
41 },
42 MaybeTlsStream,
43 WebSocketStream,
44};
45use url::Url;
46use zstd::dict::DecoderDictionary;
47
48use crate::{
49 error::{
50 ConfigValidationError,
51 ConnectionError,
52 JetstreamEventError,
53 },
54 events::{
55 Cursor,
56 JetstreamEvent,
57 },
58};
59
60/// The Jetstream endpoints officially provided by Bluesky themselves.
61///
62/// There are no guarantees that these endpoints will always be available, but you are free
63/// to run your own Jetstream instance in any case.
64pub enum DefaultJetstreamEndpoints {
65 /// `jetstream1.us-east.bsky.network`
66 USEastOne,
67 /// `jetstream2.us-east.bsky.network`
68 USEastTwo,
69 /// `jetstream1.us-west.bsky.network`
70 USWestOne,
71 /// `jetstream2.us-west.bsky.network`
72 USWestTwo,
73}
74
75impl DefaultJetstreamEndpoints {
76 /// Helper to reference official jetstream instances by shortcut
77 ///
78 /// This function will pass through a jetstream endpoint URL unless it matches a shortcut,
79 /// in which case it will be rewritten to the corresponding bluesky-operated jetstream endpoint
80 /// URL.
81 ///
82 /// The shortcuts available are
83 /// - 'us-east-1'
84 /// - 'us-east-2'
85 /// - 'us-west-1'
86 /// - 'us-west-2'
87 pub fn endpoint_or_shortcut(s: &str) -> String {
88 match s {
89 "us-east-1" => DefaultJetstreamEndpoints::USEastOne.into(),
90 "us-east-2" => DefaultJetstreamEndpoints::USEastTwo.into(),
91 "us-west-1" => DefaultJetstreamEndpoints::USWestOne.into(),
92 "us-west-2" => DefaultJetstreamEndpoints::USWestTwo.into(),
93 custom => custom.into(),
94 }
95 }
96}
97
98impl From<DefaultJetstreamEndpoints> for String {
99 fn from(endpoint: DefaultJetstreamEndpoints) -> Self {
100 match endpoint {
101 DefaultJetstreamEndpoints::USEastOne => {
102 "wss://jetstream1.us-east.bsky.network/subscribe".to_owned()
103 }
104 DefaultJetstreamEndpoints::USEastTwo => {
105 "wss://jetstream2.us-east.bsky.network/subscribe".to_owned()
106 }
107 DefaultJetstreamEndpoints::USWestOne => {
108 "wss://jetstream1.us-west.bsky.network/subscribe".to_owned()
109 }
110 DefaultJetstreamEndpoints::USWestTwo => {
111 "wss://jetstream2.us-west.bsky.network/subscribe".to_owned()
112 }
113 }
114 }
115}
116
117/// The maximum number of wanted collections that can be requested on a single Jetstream connection.
118const MAX_WANTED_COLLECTIONS: usize = 100;
119/// The maximum number of wanted DIDs that can be requested on a single Jetstream connection.
120const MAX_WANTED_DIDS: usize = 10_000;
121
122/// The custom `zstd` dictionary used for decoding compressed Jetstream messages.
123///
124/// Sourced from the [official Bluesky Jetstream repo.](https://github.com/bluesky-social/jetstream/tree/main/pkg/models)
125const JETSTREAM_ZSTD_DICTIONARY: &[u8] = include_bytes!("../zstd/dictionary");
126
127/// A receiver channel for consuming Jetstream events.
128pub type JetstreamReceiver = Receiver<JetstreamEvent>;
129
130/// An internal sender channel for sending Jetstream events to [JetstreamReceiver]'s.
131type JetstreamSender = Sender<JetstreamEvent>;
132
133/// A wrapper connector type for working with a WebSocket connection to a Jetstream instance to
134/// receive and consume events. See [JetstreamConnector::connect] for more info.
135pub struct JetstreamConnector {
136 /// The configuration for the Jetstream connection.
137 config: JetstreamConfig,
138}
139
140pub enum JetstreamCompression {
141 /// No compression, just raw plaintext JSON.
142 None,
143 /// Use the `zstd` compression algorithm, which can result in a ~56% smaller messages on
144 /// average. See [here](https://github.com/bluesky-social/jetstream?tab=readme-ov-file#compression) for more info.
145 Zstd,
146}
147
148impl From<JetstreamCompression> for bool {
149 fn from(compression: JetstreamCompression) -> Self {
150 match compression {
151 JetstreamCompression::None => false,
152 JetstreamCompression::Zstd => true,
153 }
154 }
155}
156
157impl From<bool> for JetstreamCompression {
158 fn from(compress: bool) -> Self {
159 if compress {
160 JetstreamCompression::Zstd
161 } else {
162 JetstreamCompression::None
163 }
164 }
165}
166
167pub struct JetstreamConfig {
168 /// A Jetstream endpoint to connect to with a WebSocket Scheme i.e.
169 /// `wss://jetstream1.us-east.bsky.network/subscribe`.
170 pub endpoint: String,
171 /// A list of collection [NSIDs](https://atproto.com/specs/nsid) to filter events for.
172 ///
173 /// An empty list will receive events for *all* collections.
174 ///
175 /// Regardless of desired collections, all subscribers receive
176 /// [AccountEvent](events::account::AccountEvent) and
177 /// [IdentityEvent](events::identity::Identity) events.
178 pub wanted_collections: Vec<exports::Nsid>,
179 /// A list of repo [DIDs](https://atproto.com/specs/did) to filter events for.
180 ///
181 /// An empty list will receive events for *all* repos, which is a lot of events!
182 pub wanted_dids: Vec<exports::Did>,
183 /// The compression algorithm to request and use for the WebSocket connection (if any).
184 pub compression: JetstreamCompression,
185 /// User agent string to include with the jetstream connection request
186 pub user_agent: Option<String>,
187 /// Do not append jetstream client info to user agent string
188 pub omit_user_agent_jetstream_info: bool,
189 /// Enable automatic cursor for auto-reconnect
190 ///
191 /// By default, reconnects will never set a cursor for the connection, so a small number of
192 /// events will always be dropped.
193 ///
194 /// If you want gapless playback across reconnects, set this to `true`. If you always want
195 /// the latest available events and can tolerate missing some: `false`.
196 pub replay_on_reconnect: bool,
197 /// Maximum size of send channel for jetstream events.
198 ///
199 /// If your consuming task can't keep up with every new jetstream event in real-time,
200 /// you might get disconnected from the server as a "slow consumer". Increasing channel_size
201 /// can help prevent that if your consumer sometimes pauses, at a cost of higher memory
202 /// usage while events are buffered.
203 pub channel_size: usize,
204 /// How long since the last jetstream message before we consider the connection dead
205 ///
206 /// Default: 15s
207 pub liveliness_ttl: Duration,
208}
209
210impl Default for JetstreamConfig {
211 fn default() -> Self {
212 JetstreamConfig {
213 endpoint: DefaultJetstreamEndpoints::USEastOne.into(),
214 wanted_collections: Vec::new(),
215 wanted_dids: Vec::new(),
216 compression: JetstreamCompression::None,
217 user_agent: None,
218 omit_user_agent_jetstream_info: false,
219 replay_on_reconnect: false,
220 channel_size: 4096, // a few seconds of firehose buffer
221 liveliness_ttl: Duration::from_secs(15),
222 }
223 }
224}
225
226impl JetstreamConfig {
227 /// Constructs a new endpoint URL with the given [JetstreamConfig] applied.
228 pub fn get_request_builder(
229 &self,
230 ) -> impl Fn(Option<Cursor>) -> Result<Request, ConnectionError> {
231 let did_search_query = self
232 .wanted_dids
233 .iter()
234 .map(|s| ("wantedDids", s.to_string()));
235
236 let collection_search_query = self
237 .wanted_collections
238 .iter()
239 .map(|s| ("wantedCollections", s.to_string()));
240
241 let compression = (
242 "compress",
243 match self.compression {
244 JetstreamCompression::None => "false".to_owned(),
245 JetstreamCompression::Zstd => "true".to_owned(),
246 },
247 );
248
249 let base_params = did_search_query
250 .chain(collection_search_query)
251 .chain(std::iter::once(compression))
252 .collect::<Vec<(&'static str, String)>>();
253
254 let ua_info: Option<String> = if self.omit_user_agent_jetstream_info {
255 None
256 } else {
257 Some(format!(
258 "v{} via jetstream-oxide (microcosm/links fork)",
259 env!("CARGO_PKG_VERSION")
260 ))
261 };
262 let maybe_ua = match (&self.user_agent, ua_info) {
263 (Some(ua), Some(info)) => Some(format!("{ua} {info}")),
264 (Some(ua), None) => Some(ua.clone()),
265 (None, Some(info)) => Some(info.clone()),
266 (None, None) => None,
267 };
268
269 let endpoint = self.endpoint.clone();
270 move |maybe_cursor: Option<Cursor>| {
271 let mut params = base_params.clone();
272 if let Some(ref cursor) = maybe_cursor {
273 params.push(("cursor", cursor.to_jetstream()));
274 }
275 let url = Url::parse_with_params(&endpoint, params)?;
276
277 let mut req = ClientRequestBuilder::new(url.as_str().parse()?);
278 if let Some(ua) = &maybe_ua {
279 req = req.with_header("user-agent", ua)
280 };
281 Ok(req.into_client_request()?)
282 }
283 }
284
285 /// Validates the configuration to make sure it is within the limits of the Jetstream API.
286 ///
287 /// # Constants
288 /// The following constants are used to validate the configuration and should only be changed
289 /// if the Jetstream API has itself changed.
290 /// - [MAX_WANTED_COLLECTIONS]
291 /// - [MAX_WANTED_DIDS]
292 ///
293 /// # Endpoint
294 ///
295 /// The provided `endpoint` is attempted to be parsed so that any errors occur early.
296 pub fn validate(&self) -> Result<(), ConfigValidationError> {
297 let collections = self.wanted_collections.len();
298 let dids = self.wanted_dids.len();
299
300 if collections > MAX_WANTED_COLLECTIONS {
301 return Err(ConfigValidationError::TooManyWantedCollections(collections));
302 }
303
304 if dids > MAX_WANTED_DIDS {
305 return Err(ConfigValidationError::TooManyDids(dids));
306 }
307
308 let _ = self.endpoint.parse::<Url>()?;
309
310 Ok(())
311 }
312}
313
314#[cfg(feature = "metrics")]
315fn describe_metrics() {
316 describe_counter!(
317 "jetstream_connects",
318 Unit::Count,
319 "how many times we've tried to connect"
320 );
321 describe_counter!(
322 "jetstream_disconnects",
323 Unit::Count,
324 "how many times we've been disconnected"
325 );
326 describe_counter!(
327 "jetstream_total_events_received",
328 Unit::Count,
329 "total number of events received"
330 );
331 describe_counter!(
332 "jetstream_total_bytes_received",
333 Unit::Count,
334 "total uncompressed bytes received, not including websocket overhead"
335 );
336 describe_counter!(
337 "jetstream_total_event_errors",
338 Unit::Count,
339 "total errors when handling events"
340 );
341 describe_counter!(
342 "jetstream_total_events_sent",
343 Unit::Count,
344 "total events sent to the consumer"
345 );
346}
347
348impl JetstreamConnector {
349 /// Create a Jetstream connector with a valid [JetstreamConfig].
350 ///
351 /// After creation, you can call [connect] to connect to the provided Jetstream instance.
352 pub fn new(config: JetstreamConfig) -> Result<Self, ConfigValidationError> {
353 #[cfg(feature = "metrics")]
354 describe_metrics();
355
356 // We validate the configuration here so any issues are caught early.
357 config.validate()?;
358 Ok(JetstreamConnector { config })
359 }
360
361 /// Connects to a Jetstream instance as defined in the [JetstreamConfig].
362 ///
363 /// A [JetstreamReceiver] is returned which can be used to respond to events. When all instances
364 /// of this receiver are dropped, the connection and task are automatically closed.
365 pub async fn connect(&self) -> Result<JetstreamReceiver, ConnectionError> {
366 self.connect_cursor(None).await
367 }
368
369 /// Connects to a Jetstream instance as defined in the [JetstreamConfig] with playback from a
370 /// cursor
371 ///
372 /// A cursor from the future will result in live-tail operation.
373 ///
374 /// The cursor is only used for first successfull connection -- on auto-reconnect it will
375 /// live-tail by default. Set `replay_on_reconnect: true` in the config if you need to
376 /// receive every event, which will keep track of the last-seen cursor and reconnect from
377 /// there.
378 pub async fn connect_cursor(
379 &self,
380 cursor: Option<Cursor>,
381 ) -> Result<JetstreamReceiver, ConnectionError> {
382 // We validate the config again for good measure. Probably not necessary but it can't hurt.
383 self.config
384 .validate()
385 .map_err(ConnectionError::InvalidConfig)?;
386
387 let (send_channel, receive_channel) = channel(self.config.channel_size);
388 let replay_on_reconnect = self.config.replay_on_reconnect;
389 let liveliness_ttl = self.config.liveliness_ttl;
390 let build_request = self.config.get_request_builder();
391
392 tokio::task::spawn(async move {
393 // TODO: maybe return the task handle so we can surface any errors
394 let max_retries = 300;
395 let base_delay_ms = 1_000; // 1 second
396 let max_delay_ms = 30_000; // 30 seconds
397 let success_threshold_s = 15; // 15 seconds, retry count is reset if we were connected at least this long
398
399 let mut retry_attempt = 0;
400 let mut connect_cursor = cursor;
401 loop {
402 let dict = DecoderDictionary::copy(JETSTREAM_ZSTD_DICTIONARY);
403
404 let req = match build_request(connect_cursor) {
405 Ok(req) => req,
406 Err(e) => {
407 log::error!("Could not build jetstream websocket request: {e:?}");
408 break; // this is always fatal? no retry.
409 }
410 };
411
412 #[cfg(feature = "metrics")]
413 if let Some(host) = req.uri().host() {
414 let retry = if retry_attempt > 0 { "yes" } else { "no" };
415 counter!("jetstream_connects", "host" => host.to_string(), "retry" => retry)
416 .increment(1);
417 }
418
419 let mut last_cursor = connect_cursor;
420 retry_attempt += 1;
421 if let Ok((ws_stream, _)) = connect_async(req).await {
422 let t_connected = Instant::now();
423 log::info!("jetstream connected. starting websocket task...");
424 if let Err(e) = websocket_task(
425 dict,
426 ws_stream,
427 send_channel.clone(),
428 &mut last_cursor,
429 liveliness_ttl,
430 )
431 .await
432 {
433 match e {
434 JetstreamEventError::ReceiverClosedError => {
435 #[cfg(feature="metrics")]
436 counter!("jetstream_disconnects", "reason" => "channel", "fatal" => "yes").increment(1);
437 log::error!("Jetstream receiver channel closed. Exiting consumer.");
438 return;
439 }
440 JetstreamEventError::CompressionDictionaryError(_) => {
441 #[cfg(feature="metrics")]
442 counter!("jetstream_disconnects", "reason" => "zstd", "fatal" => "no").increment(1);
443 }
444 JetstreamEventError::NoMessagesReceived => {
445 #[cfg(feature="metrics")]
446 counter!("jetstream_disconnects", "reason" => "ttl", "fatal" => "no").increment(1);
447 }
448 JetstreamEventError::PingPongError(_) => {
449 #[cfg(feature="metrics")]
450 counter!("jetstream_disconnects", "reason" => "pingpong", "fatal" => "no").increment(1);
451 }
452 }
453 log::warn!("Jetstream closed after encountering error: {e:?}");
454 } else {
455 #[cfg(feature = "metrics")]
456 counter!("jetstream_disconnects", "reason" => "close", "fatal" => "no")
457 .increment(1);
458 log::warn!("Jetstream connection closed cleanly");
459 }
460 if t_connected.elapsed() > Duration::from_secs(success_threshold_s) {
461 log::warn!("Jetstream: more than {success_threshold_s}s since last reconnect, reconnecting immediately.");
462 retry_attempt = 0;
463 }
464 }
465
466 if retry_attempt >= max_retries {
467 log::error!("jetstream: hit max retries, bye");
468 break;
469 }
470
471 connect_cursor = if replay_on_reconnect {
472 last_cursor
473 } else {
474 None
475 };
476
477 if retry_attempt > 0 {
478 // Exponential backoff
479 let delay =
480 (base_delay_ms * (2_u64.saturating_pow(retry_attempt))).min(max_delay_ms);
481 log::error!("Connection failed, retry #{retry_attempt} in {delay}ms...");
482 tokio::time::sleep(Duration::from_millis(delay)).await;
483 log::info!("Attempting to reconnect...");
484 }
485 }
486 log::error!("Connection retries exhausted. Jetstream is disconnected.");
487 });
488
489 Ok(receive_channel)
490 }
491}
492
493/// The main task that handles the WebSocket connection and sends [JetstreamEvent]'s to any
494/// receivers that are listening for them.
495async fn websocket_task(
496 dictionary: DecoderDictionary<'_>,
497 ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
498 send_channel: JetstreamSender,
499 last_cursor: &mut Option<Cursor>,
500 liveliness_ttl: Duration,
501) -> Result<(), JetstreamEventError> {
502 // TODO: Use the write half to allow the user to change configuration settings on the fly.
503 let (mut socket_write, mut socket_read) = ws.split();
504
505 let mut closing_connection = false;
506 loop {
507 let next = match timeout(liveliness_ttl, socket_read.next()).await {
508 Ok(n) => n,
509 Err(_) => {
510 log::warn!("jetstream no events for {liveliness_ttl:?}, closing");
511 _ = socket_write.close().await;
512 return Err(JetstreamEventError::NoMessagesReceived);
513 }
514 };
515 match next {
516 Some(Ok(message)) => match message {
517 Message::Text(json) => {
518 #[cfg(feature = "metrics")]
519 {
520 counter!("jetstream_total_events_received", "compressed" => "false")
521 .increment(1);
522 counter!("jetstream_total_bytes_received", "compressed" => "false")
523 .increment(json.len() as u64);
524 }
525 let event: JetstreamEvent = match serde_json::from_str(&json) {
526 Ok(ev) => ev,
527 Err(e) => {
528 #[cfg(feature = "metrics")]
529 counter!("jetstream_total_event_errors", "reason" => "deserialize")
530 .increment(1);
531 log::warn!(
532 "failed to parse json: {e:?} (from {})",
533 json.get(..24).unwrap_or(&json)
534 );
535 continue;
536 }
537 };
538 let event_cursor = event.cursor;
539
540 if let Some(last) = last_cursor {
541 if event_cursor <= *last {
542 #[cfg(feature = "metrics")]
543 counter!("jetstream_total_event_errors", "reason" => "old")
544 .increment(1);
545 log::warn!("event cursor {event_cursor:?} was not newer than the last one: {last:?}. dropping event.");
546 continue;
547 }
548 }
549
550 if send_channel.send(event).await.is_err() {
551 log::warn!(
552 "All receivers for the Jetstream connection have been dropped, closing connection."
553 );
554 socket_write.close().await?;
555 return Err(JetstreamEventError::ReceiverClosedError);
556 } else if let Some(last) = last_cursor.as_mut() {
557 *last = event_cursor;
558 }
559 #[cfg(feature = "metrics")]
560 counter!("jetstream_total_events_sent").increment(1);
561 }
562 Message::Binary(zstd_json) => {
563 #[cfg(feature = "metrics")]
564 {
565 counter!("jetstream_total_events_received", "compressed" => "true")
566 .increment(1);
567 counter!("jetstream_total_bytes_received", "compressed" => "true")
568 .increment(zstd_json.len() as u64);
569 }
570 let mut cursor = IoCursor::new(zstd_json);
571 let decoder =
572 zstd::stream::Decoder::with_prepared_dictionary(&mut cursor, &dictionary)
573 .map_err(JetstreamEventError::CompressionDictionaryError)?;
574
575 let event: JetstreamEvent = match serde_json::from_reader(decoder) {
576 Ok(ev) => ev,
577 Err(e) => {
578 #[cfg(feature = "metrics")]
579 counter!("jetstream_total_event_errors", "reason" => "deserialize")
580 .increment(1);
581 log::warn!("failed to parse json: {e:?}");
582 continue;
583 }
584 };
585 let event_cursor = event.cursor;
586
587 if let Some(last) = last_cursor {
588 if event_cursor <= *last {
589 #[cfg(feature = "metrics")]
590 counter!("jetstream_total_event_errors", "reason" => "old")
591 .increment(1);
592 log::warn!("event cursor {event_cursor:?} was not newer than the last one: {last:?}. dropping event.");
593 continue;
594 }
595 }
596
597 if send_channel.send(event).await.is_err() {
598 log::warn!(
599 "All receivers for the Jetstream connection have been dropped, closing connection."
600 );
601 socket_write.close().await?;
602 return Err(JetstreamEventError::ReceiverClosedError);
603 } else if let Some(last) = last_cursor.as_mut() {
604 *last = event_cursor;
605 }
606 #[cfg(feature = "metrics")]
607 counter!("jetstream_total_events_sent").increment(1);
608 }
609 Message::Ping(vec) => {
610 log::trace!("Ping recieved, responding");
611 socket_write
612 .send(Message::Pong(vec))
613 .await
614 .map_err(JetstreamEventError::PingPongError)?;
615 }
616 Message::Close(close_frame) => {
617 log::trace!("Close recieved. I guess we just log here?");
618 if let Some(close_frame) = close_frame {
619 let reason = close_frame.reason;
620 let code = close_frame.code;
621 log::trace!("Connection closed. Reason: {reason}, Code: {code}");
622 }
623 }
624 Message::Pong(pong) => {
625 let pong_payload =
626 String::from_utf8(pong.to_vec()).unwrap_or("Invalid payload".to_string());
627 log::trace!("Pong recieved. Payload: {pong_payload}");
628 }
629 Message::Frame(_) => (),
630 },
631 Some(Err(error)) => {
632 log::error!("Web socket error: {error}");
633 closing_connection = true;
634 }
635 None => {
636 log::error!("No web socket result");
637 closing_connection = true;
638 }
639 }
640 if closing_connection {
641 log::trace!("closing connection");
642 _ = socket_write.close().await;
643 return Ok(());
644 }
645 }
646}