Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1pub mod error; 2pub mod events; 3pub mod exports; 4 5use std::{ 6 io::Cursor as IoCursor, 7 time::{ 8 Duration, 9 Instant, 10 }, 11}; 12 13use futures_util::{ 14 stream::StreamExt, 15 SinkExt, 16}; 17#[cfg(feature = "metrics")] 18use metrics::{ 19 counter, 20 describe_counter, 21 Unit, 22}; 23use tokio::{ 24 net::TcpStream, 25 sync::mpsc::{ 26 channel, 27 Receiver, 28 Sender, 29 }, 30 time::timeout, 31}; 32use tokio_tungstenite::{ 33 connect_async, 34 tungstenite::{ 35 client::{ 36 ClientRequestBuilder, 37 IntoClientRequest, 38 }, 39 handshake::client::Request, 40 Message, 41 }, 42 MaybeTlsStream, 43 WebSocketStream, 44}; 45use url::Url; 46use zstd::dict::DecoderDictionary; 47 48use crate::{ 49 error::{ 50 ConfigValidationError, 51 ConnectionError, 52 JetstreamEventError, 53 }, 54 events::{ 55 Cursor, 56 JetstreamEvent, 57 }, 58}; 59 60/// The Jetstream endpoints officially provided by Bluesky themselves. 61/// 62/// There are no guarantees that these endpoints will always be available, but you are free 63/// to run your own Jetstream instance in any case. 64pub enum DefaultJetstreamEndpoints { 65 /// `jetstream1.us-east.bsky.network` 66 USEastOne, 67 /// `jetstream2.us-east.bsky.network` 68 USEastTwo, 69 /// `jetstream1.us-west.bsky.network` 70 USWestOne, 71 /// `jetstream2.us-west.bsky.network` 72 USWestTwo, 73} 74 75impl DefaultJetstreamEndpoints { 76 /// Helper to reference official jetstream instances by shortcut 77 /// 78 /// This function will pass through a jetstream endpoint URL unless it matches a shortcut, 79 /// in which case it will be rewritten to the corresponding bluesky-operated jetstream endpoint 80 /// URL. 81 /// 82 /// The shortcuts available are 83 /// - 'us-east-1' 84 /// - 'us-east-2' 85 /// - 'us-west-1' 86 /// - 'us-west-2' 87 pub fn endpoint_or_shortcut(s: &str) -> String { 88 match s { 89 "us-east-1" => DefaultJetstreamEndpoints::USEastOne.into(), 90 "us-east-2" => DefaultJetstreamEndpoints::USEastTwo.into(), 91 "us-west-1" => DefaultJetstreamEndpoints::USWestOne.into(), 92 "us-west-2" => DefaultJetstreamEndpoints::USWestTwo.into(), 93 custom => custom.into(), 94 } 95 } 96} 97 98impl From<DefaultJetstreamEndpoints> for String { 99 fn from(endpoint: DefaultJetstreamEndpoints) -> Self { 100 match endpoint { 101 DefaultJetstreamEndpoints::USEastOne => { 102 "wss://jetstream1.us-east.bsky.network/subscribe".to_owned() 103 } 104 DefaultJetstreamEndpoints::USEastTwo => { 105 "wss://jetstream2.us-east.bsky.network/subscribe".to_owned() 106 } 107 DefaultJetstreamEndpoints::USWestOne => { 108 "wss://jetstream1.us-west.bsky.network/subscribe".to_owned() 109 } 110 DefaultJetstreamEndpoints::USWestTwo => { 111 "wss://jetstream2.us-west.bsky.network/subscribe".to_owned() 112 } 113 } 114 } 115} 116 117/// The maximum number of wanted collections that can be requested on a single Jetstream connection. 118const MAX_WANTED_COLLECTIONS: usize = 100; 119/// The maximum number of wanted DIDs that can be requested on a single Jetstream connection. 120const MAX_WANTED_DIDS: usize = 10_000; 121 122/// The custom `zstd` dictionary used for decoding compressed Jetstream messages. 123/// 124/// Sourced from the [official Bluesky Jetstream repo.](https://github.com/bluesky-social/jetstream/tree/main/pkg/models) 125const JETSTREAM_ZSTD_DICTIONARY: &[u8] = include_bytes!("../zstd/dictionary"); 126 127/// A receiver channel for consuming Jetstream events. 128pub type JetstreamReceiver = Receiver<JetstreamEvent>; 129 130/// An internal sender channel for sending Jetstream events to [JetstreamReceiver]'s. 131type JetstreamSender = Sender<JetstreamEvent>; 132 133/// A wrapper connector type for working with a WebSocket connection to a Jetstream instance to 134/// receive and consume events. See [JetstreamConnector::connect] for more info. 135pub struct JetstreamConnector { 136 /// The configuration for the Jetstream connection. 137 config: JetstreamConfig, 138} 139 140pub enum JetstreamCompression { 141 /// No compression, just raw plaintext JSON. 142 None, 143 /// Use the `zstd` compression algorithm, which can result in a ~56% smaller messages on 144 /// average. See [here](https://github.com/bluesky-social/jetstream?tab=readme-ov-file#compression) for more info. 145 Zstd, 146} 147 148impl From<JetstreamCompression> for bool { 149 fn from(compression: JetstreamCompression) -> Self { 150 match compression { 151 JetstreamCompression::None => false, 152 JetstreamCompression::Zstd => true, 153 } 154 } 155} 156 157impl From<bool> for JetstreamCompression { 158 fn from(compress: bool) -> Self { 159 if compress { 160 JetstreamCompression::Zstd 161 } else { 162 JetstreamCompression::None 163 } 164 } 165} 166 167pub struct JetstreamConfig { 168 /// A Jetstream endpoint to connect to with a WebSocket Scheme i.e. 169 /// `wss://jetstream1.us-east.bsky.network/subscribe`. 170 pub endpoint: String, 171 /// A list of collection [NSIDs](https://atproto.com/specs/nsid) to filter events for. 172 /// 173 /// An empty list will receive events for *all* collections. 174 /// 175 /// Regardless of desired collections, all subscribers receive 176 /// [AccountEvent](events::account::AccountEvent) and 177 /// [IdentityEvent](events::identity::Identity) events. 178 pub wanted_collections: Vec<exports::Nsid>, 179 /// A list of repo [DIDs](https://atproto.com/specs/did) to filter events for. 180 /// 181 /// An empty list will receive events for *all* repos, which is a lot of events! 182 pub wanted_dids: Vec<exports::Did>, 183 /// The compression algorithm to request and use for the WebSocket connection (if any). 184 pub compression: JetstreamCompression, 185 /// User agent string to include with the jetstream connection request 186 pub user_agent: Option<String>, 187 /// Do not append jetstream client info to user agent string 188 pub omit_user_agent_jetstream_info: bool, 189 /// Enable automatic cursor for auto-reconnect 190 /// 191 /// By default, reconnects will never set a cursor for the connection, so a small number of 192 /// events will always be dropped. 193 /// 194 /// If you want gapless playback across reconnects, set this to `true`. If you always want 195 /// the latest available events and can tolerate missing some: `false`. 196 pub replay_on_reconnect: bool, 197 /// Maximum size of send channel for jetstream events. 198 /// 199 /// If your consuming task can't keep up with every new jetstream event in real-time, 200 /// you might get disconnected from the server as a "slow consumer". Increasing channel_size 201 /// can help prevent that if your consumer sometimes pauses, at a cost of higher memory 202 /// usage while events are buffered. 203 pub channel_size: usize, 204 /// How long since the last jetstream message before we consider the connection dead 205 /// 206 /// Default: 15s 207 pub liveliness_ttl: Duration, 208} 209 210impl Default for JetstreamConfig { 211 fn default() -> Self { 212 JetstreamConfig { 213 endpoint: DefaultJetstreamEndpoints::USEastOne.into(), 214 wanted_collections: Vec::new(), 215 wanted_dids: Vec::new(), 216 compression: JetstreamCompression::None, 217 user_agent: None, 218 omit_user_agent_jetstream_info: false, 219 replay_on_reconnect: false, 220 channel_size: 4096, // a few seconds of firehose buffer 221 liveliness_ttl: Duration::from_secs(15), 222 } 223 } 224} 225 226impl JetstreamConfig { 227 /// Constructs a new endpoint URL with the given [JetstreamConfig] applied. 228 pub fn get_request_builder( 229 &self, 230 ) -> impl Fn(Option<Cursor>) -> Result<Request, ConnectionError> { 231 let did_search_query = self 232 .wanted_dids 233 .iter() 234 .map(|s| ("wantedDids", s.to_string())); 235 236 let collection_search_query = self 237 .wanted_collections 238 .iter() 239 .map(|s| ("wantedCollections", s.to_string())); 240 241 let compression = ( 242 "compress", 243 match self.compression { 244 JetstreamCompression::None => "false".to_owned(), 245 JetstreamCompression::Zstd => "true".to_owned(), 246 }, 247 ); 248 249 let base_params = did_search_query 250 .chain(collection_search_query) 251 .chain(std::iter::once(compression)) 252 .collect::<Vec<(&'static str, String)>>(); 253 254 let ua_info: Option<String> = if self.omit_user_agent_jetstream_info { 255 None 256 } else { 257 Some(format!( 258 "v{} via jetstream-oxide (microcosm/links fork)", 259 env!("CARGO_PKG_VERSION") 260 )) 261 }; 262 let maybe_ua = match (&self.user_agent, ua_info) { 263 (Some(ua), Some(info)) => Some(format!("{ua} {info}")), 264 (Some(ua), None) => Some(ua.clone()), 265 (None, Some(info)) => Some(info.clone()), 266 (None, None) => None, 267 }; 268 269 let endpoint = self.endpoint.clone(); 270 move |maybe_cursor: Option<Cursor>| { 271 let mut params = base_params.clone(); 272 if let Some(ref cursor) = maybe_cursor { 273 params.push(("cursor", cursor.to_jetstream())); 274 } 275 let url = Url::parse_with_params(&endpoint, params)?; 276 277 let mut req = ClientRequestBuilder::new(url.as_str().parse()?); 278 if let Some(ua) = &maybe_ua { 279 req = req.with_header("user-agent", ua) 280 }; 281 Ok(req.into_client_request()?) 282 } 283 } 284 285 /// Validates the configuration to make sure it is within the limits of the Jetstream API. 286 /// 287 /// # Constants 288 /// The following constants are used to validate the configuration and should only be changed 289 /// if the Jetstream API has itself changed. 290 /// - [MAX_WANTED_COLLECTIONS] 291 /// - [MAX_WANTED_DIDS] 292 /// 293 /// # Endpoint 294 /// 295 /// The provided `endpoint` is attempted to be parsed so that any errors occur early. 296 pub fn validate(&self) -> Result<(), ConfigValidationError> { 297 let collections = self.wanted_collections.len(); 298 let dids = self.wanted_dids.len(); 299 300 if collections > MAX_WANTED_COLLECTIONS { 301 return Err(ConfigValidationError::TooManyWantedCollections(collections)); 302 } 303 304 if dids > MAX_WANTED_DIDS { 305 return Err(ConfigValidationError::TooManyDids(dids)); 306 } 307 308 let _ = self.endpoint.parse::<Url>()?; 309 310 Ok(()) 311 } 312} 313 314#[cfg(feature = "metrics")] 315fn describe_metrics() { 316 describe_counter!( 317 "jetstream_connects", 318 Unit::Count, 319 "how many times we've tried to connect" 320 ); 321 describe_counter!( 322 "jetstream_disconnects", 323 Unit::Count, 324 "how many times we've been disconnected" 325 ); 326 describe_counter!( 327 "jetstream_total_events_received", 328 Unit::Count, 329 "total number of events received" 330 ); 331 describe_counter!( 332 "jetstream_total_bytes_received", 333 Unit::Count, 334 "total uncompressed bytes received, not including websocket overhead" 335 ); 336 describe_counter!( 337 "jetstream_total_event_errors", 338 Unit::Count, 339 "total errors when handling events" 340 ); 341 describe_counter!( 342 "jetstream_total_events_sent", 343 Unit::Count, 344 "total events sent to the consumer" 345 ); 346} 347 348impl JetstreamConnector { 349 /// Create a Jetstream connector with a valid [JetstreamConfig]. 350 /// 351 /// After creation, you can call [connect] to connect to the provided Jetstream instance. 352 pub fn new(config: JetstreamConfig) -> Result<Self, ConfigValidationError> { 353 #[cfg(feature = "metrics")] 354 describe_metrics(); 355 356 // We validate the configuration here so any issues are caught early. 357 config.validate()?; 358 Ok(JetstreamConnector { config }) 359 } 360 361 /// Connects to a Jetstream instance as defined in the [JetstreamConfig]. 362 /// 363 /// A [JetstreamReceiver] is returned which can be used to respond to events. When all instances 364 /// of this receiver are dropped, the connection and task are automatically closed. 365 pub async fn connect(&self) -> Result<JetstreamReceiver, ConnectionError> { 366 self.connect_cursor(None).await 367 } 368 369 /// Connects to a Jetstream instance as defined in the [JetstreamConfig] with playback from a 370 /// cursor 371 /// 372 /// A cursor from the future will result in live-tail operation. 373 /// 374 /// The cursor is only used for first successfull connection -- on auto-reconnect it will 375 /// live-tail by default. Set `replay_on_reconnect: true` in the config if you need to 376 /// receive every event, which will keep track of the last-seen cursor and reconnect from 377 /// there. 378 pub async fn connect_cursor( 379 &self, 380 cursor: Option<Cursor>, 381 ) -> Result<JetstreamReceiver, ConnectionError> { 382 // We validate the config again for good measure. Probably not necessary but it can't hurt. 383 self.config 384 .validate() 385 .map_err(ConnectionError::InvalidConfig)?; 386 387 let (send_channel, receive_channel) = channel(self.config.channel_size); 388 let replay_on_reconnect = self.config.replay_on_reconnect; 389 let liveliness_ttl = self.config.liveliness_ttl; 390 let build_request = self.config.get_request_builder(); 391 392 tokio::task::spawn(async move { 393 // TODO: maybe return the task handle so we can surface any errors 394 let max_retries = 300; 395 let base_delay_ms = 1_000; // 1 second 396 let max_delay_ms = 30_000; // 30 seconds 397 let success_threshold_s = 15; // 15 seconds, retry count is reset if we were connected at least this long 398 399 let mut retry_attempt = 0; 400 let mut connect_cursor = cursor; 401 loop { 402 let dict = DecoderDictionary::copy(JETSTREAM_ZSTD_DICTIONARY); 403 404 let req = match build_request(connect_cursor) { 405 Ok(req) => req, 406 Err(e) => { 407 log::error!("Could not build jetstream websocket request: {e:?}"); 408 break; // this is always fatal? no retry. 409 } 410 }; 411 412 #[cfg(feature = "metrics")] 413 if let Some(host) = req.uri().host() { 414 let retry = if retry_attempt > 0 { "yes" } else { "no" }; 415 counter!("jetstream_connects", "host" => host.to_string(), "retry" => retry) 416 .increment(1); 417 } 418 419 let mut last_cursor = connect_cursor; 420 retry_attempt += 1; 421 if let Ok((ws_stream, _)) = connect_async(req).await { 422 let t_connected = Instant::now(); 423 log::info!("jetstream connected. starting websocket task..."); 424 if let Err(e) = websocket_task( 425 dict, 426 ws_stream, 427 send_channel.clone(), 428 &mut last_cursor, 429 liveliness_ttl, 430 ) 431 .await 432 { 433 match e { 434 JetstreamEventError::ReceiverClosedError => { 435 #[cfg(feature="metrics")] 436 counter!("jetstream_disconnects", "reason" => "channel", "fatal" => "yes").increment(1); 437 log::error!("Jetstream receiver channel closed. Exiting consumer."); 438 return; 439 } 440 JetstreamEventError::CompressionDictionaryError(_) => { 441 #[cfg(feature="metrics")] 442 counter!("jetstream_disconnects", "reason" => "zstd", "fatal" => "no").increment(1); 443 } 444 JetstreamEventError::NoMessagesReceived => { 445 #[cfg(feature="metrics")] 446 counter!("jetstream_disconnects", "reason" => "ttl", "fatal" => "no").increment(1); 447 } 448 JetstreamEventError::PingPongError(_) => { 449 #[cfg(feature="metrics")] 450 counter!("jetstream_disconnects", "reason" => "pingpong", "fatal" => "no").increment(1); 451 } 452 } 453 log::warn!("Jetstream closed after encountering error: {e:?}"); 454 } else { 455 #[cfg(feature = "metrics")] 456 counter!("jetstream_disconnects", "reason" => "close", "fatal" => "no") 457 .increment(1); 458 log::warn!("Jetstream connection closed cleanly"); 459 } 460 if t_connected.elapsed() > Duration::from_secs(success_threshold_s) { 461 log::warn!("Jetstream: more than {success_threshold_s}s since last reconnect, reconnecting immediately."); 462 retry_attempt = 0; 463 } 464 } 465 466 if retry_attempt >= max_retries { 467 log::error!("jetstream: hit max retries, bye"); 468 break; 469 } 470 471 connect_cursor = if replay_on_reconnect { 472 last_cursor 473 } else { 474 None 475 }; 476 477 if retry_attempt > 0 { 478 // Exponential backoff 479 let delay = 480 (base_delay_ms * (2_u64.saturating_pow(retry_attempt))).min(max_delay_ms); 481 log::error!("Connection failed, retry #{retry_attempt} in {delay}ms..."); 482 tokio::time::sleep(Duration::from_millis(delay)).await; 483 log::info!("Attempting to reconnect..."); 484 } 485 } 486 log::error!("Connection retries exhausted. Jetstream is disconnected."); 487 }); 488 489 Ok(receive_channel) 490 } 491} 492 493/// The main task that handles the WebSocket connection and sends [JetstreamEvent]'s to any 494/// receivers that are listening for them. 495async fn websocket_task( 496 dictionary: DecoderDictionary<'_>, 497 ws: WebSocketStream<MaybeTlsStream<TcpStream>>, 498 send_channel: JetstreamSender, 499 last_cursor: &mut Option<Cursor>, 500 liveliness_ttl: Duration, 501) -> Result<(), JetstreamEventError> { 502 // TODO: Use the write half to allow the user to change configuration settings on the fly. 503 let (mut socket_write, mut socket_read) = ws.split(); 504 505 let mut closing_connection = false; 506 loop { 507 let next = match timeout(liveliness_ttl, socket_read.next()).await { 508 Ok(n) => n, 509 Err(_) => { 510 log::warn!("jetstream no events for {liveliness_ttl:?}, closing"); 511 _ = socket_write.close().await; 512 return Err(JetstreamEventError::NoMessagesReceived); 513 } 514 }; 515 match next { 516 Some(Ok(message)) => match message { 517 Message::Text(json) => { 518 #[cfg(feature = "metrics")] 519 { 520 counter!("jetstream_total_events_received", "compressed" => "false") 521 .increment(1); 522 counter!("jetstream_total_bytes_received", "compressed" => "false") 523 .increment(json.len() as u64); 524 } 525 let event: JetstreamEvent = match serde_json::from_str(&json) { 526 Ok(ev) => ev, 527 Err(e) => { 528 #[cfg(feature = "metrics")] 529 counter!("jetstream_total_event_errors", "reason" => "deserialize") 530 .increment(1); 531 log::warn!( 532 "failed to parse json: {e:?} (from {})", 533 json.get(..24).unwrap_or(&json) 534 ); 535 continue; 536 } 537 }; 538 let event_cursor = event.cursor; 539 540 if let Some(last) = last_cursor { 541 if event_cursor <= *last { 542 #[cfg(feature = "metrics")] 543 counter!("jetstream_total_event_errors", "reason" => "old") 544 .increment(1); 545 log::warn!("event cursor {event_cursor:?} was not newer than the last one: {last:?}. dropping event."); 546 continue; 547 } 548 } 549 550 if send_channel.send(event).await.is_err() { 551 log::warn!( 552 "All receivers for the Jetstream connection have been dropped, closing connection." 553 ); 554 socket_write.close().await?; 555 return Err(JetstreamEventError::ReceiverClosedError); 556 } else if let Some(last) = last_cursor.as_mut() { 557 *last = event_cursor; 558 } 559 #[cfg(feature = "metrics")] 560 counter!("jetstream_total_events_sent").increment(1); 561 } 562 Message::Binary(zstd_json) => { 563 #[cfg(feature = "metrics")] 564 { 565 counter!("jetstream_total_events_received", "compressed" => "true") 566 .increment(1); 567 counter!("jetstream_total_bytes_received", "compressed" => "true") 568 .increment(zstd_json.len() as u64); 569 } 570 let mut cursor = IoCursor::new(zstd_json); 571 let decoder = 572 zstd::stream::Decoder::with_prepared_dictionary(&mut cursor, &dictionary) 573 .map_err(JetstreamEventError::CompressionDictionaryError)?; 574 575 let event: JetstreamEvent = match serde_json::from_reader(decoder) { 576 Ok(ev) => ev, 577 Err(e) => { 578 #[cfg(feature = "metrics")] 579 counter!("jetstream_total_event_errors", "reason" => "deserialize") 580 .increment(1); 581 log::warn!("failed to parse json: {e:?}"); 582 continue; 583 } 584 }; 585 let event_cursor = event.cursor; 586 587 if let Some(last) = last_cursor { 588 if event_cursor <= *last { 589 #[cfg(feature = "metrics")] 590 counter!("jetstream_total_event_errors", "reason" => "old") 591 .increment(1); 592 log::warn!("event cursor {event_cursor:?} was not newer than the last one: {last:?}. dropping event."); 593 continue; 594 } 595 } 596 597 if send_channel.send(event).await.is_err() { 598 log::warn!( 599 "All receivers for the Jetstream connection have been dropped, closing connection." 600 ); 601 socket_write.close().await?; 602 return Err(JetstreamEventError::ReceiverClosedError); 603 } else if let Some(last) = last_cursor.as_mut() { 604 *last = event_cursor; 605 } 606 #[cfg(feature = "metrics")] 607 counter!("jetstream_total_events_sent").increment(1); 608 } 609 Message::Ping(vec) => { 610 log::trace!("Ping recieved, responding"); 611 socket_write 612 .send(Message::Pong(vec)) 613 .await 614 .map_err(JetstreamEventError::PingPongError)?; 615 } 616 Message::Close(close_frame) => { 617 log::trace!("Close recieved. I guess we just log here?"); 618 if let Some(close_frame) = close_frame { 619 let reason = close_frame.reason; 620 let code = close_frame.code; 621 log::trace!("Connection closed. Reason: {reason}, Code: {code}"); 622 } 623 } 624 Message::Pong(pong) => { 625 let pong_payload = 626 String::from_utf8(pong.to_vec()).unwrap_or("Invalid payload".to_string()); 627 log::trace!("Pong recieved. Payload: {pong_payload}"); 628 } 629 Message::Frame(_) => (), 630 }, 631 Some(Err(error)) => { 632 log::error!("Web socket error: {error}"); 633 closing_connection = true; 634 } 635 None => { 636 log::error!("No web socket result"); 637 closing_connection = true; 638 } 639 } 640 if closing_connection { 641 log::trace!("closing connection"); 642 _ = socket_write.close().await; 643 return Ok(()); 644 } 645 } 646}