Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

cursor handling

Changed files
+156 -35
jetstream
src
events
+2 -3
jetstream/README.md
···
-
# jetstream-oxide
+
# fork of the awesome jetstream-oxide
-
[![Crate](https://img.shields.io/crates/v/jetstream-oxide.svg)](https://crates.io/crates/jetstream-oxide)
-
[![docs.rs](https://docs.rs/jetstream-oxide/badge.svg)](https://docs.rs/jetstream-oxide/latest/jetstream_oxide)
+
fork note: this readme is likely a bit out of date! i've been messing around with some apis.
A typed Rust library for easily interacting with and consuming the
Bluesky [Jetstream](https://github.com/bluesky-social/jetstream)
+76 -1
jetstream/src/events/mod.rs
···
pub mod commit;
pub mod identity;
+
use std::time::{
+
Duration,
+
SystemTime,
+
UNIX_EPOCH,
+
};
+
use serde::Deserialize;
use crate::exports;
+
/// Opaque wrapper for the time_us cursor used by jetstream
+
///
+
/// Generally, you should use a cursor
+
#[derive(Deserialize, Debug, Clone)]
+
pub struct Cursor(u64);
+
/// Basic data that is included with every event.
#[derive(Deserialize, Debug)]
pub struct EventInfo {
pub did: exports::Did,
-
pub time_us: u64,
+
pub time_us: Cursor,
pub kind: EventKind,
}
···
Identity,
Account,
}
+
+
impl<R> JetstreamEvent<R> {
+
pub fn cursor(&self) -> Cursor {
+
match self {
+
JetstreamEvent::Commit(commit::CommitEvent::Create { info, .. }) => {
+
info.time_us.clone()
+
}
+
JetstreamEvent::Commit(commit::CommitEvent::Update { info, .. }) => {
+
info.time_us.clone()
+
}
+
JetstreamEvent::Commit(commit::CommitEvent::Delete { info, .. }) => {
+
info.time_us.clone()
+
}
+
JetstreamEvent::Identity(e) => e.info.time_us.clone(),
+
JetstreamEvent::Account(e) => e.info.time_us.clone(),
+
}
+
}
+
}
+
+
impl Cursor {
+
/// Get a cursor that will consume all available jetstream replay
+
///
+
/// This sets the cursor to zero.
+
///
+
/// Jetstream instances typically only have a few days of replay.
+
pub fn from_start() -> Self {
+
Self(0)
+
}
+
/// Get a cursor for a specific time
+
///
+
/// Panics: if t is older than the unix epoch: Jan 1, 1970.
+
///
+
/// If you want to receive all available jetstream replay (typically a few days), use
+
/// .from_start()
+
pub fn at(t: SystemTime) -> Self {
+
let unix_dt = t
+
.duration_since(UNIX_EPOCH)
+
.expect("cannot set jetstream cursor earlier than unix epoch");
+
Self(unix_dt.as_micros() as u64)
+
}
+
/// Get a cursor rewound from now by this amount
+
///
+
/// Panics: if d is greater than the time since the unix epoch: Jan 1, 1970.
+
///
+
/// Jetstream instances typically only have a few days of replay.
+
pub fn back_by(d: Duration) -> Self {
+
Self::at(SystemTime::now() - d)
+
}
+
/// Get a Cursor from a raw u64
+
///
+
/// For example, from a jetstream event's `time_us` field.
+
pub fn from_raw_u64(time_us: u64) -> Self {
+
Self(time_us)
+
}
+
/// Get the raw u64 value from this cursor.
+
pub fn to_raw_u64(&self) -> u64 {
+
self.0
+
}
+
/// Format the cursor value for use in a jetstream connection url querystring
+
pub fn to_jetstream(&self) -> String {
+
self.0.to_string()
+
}
+
}
+78 -31
jetstream/src/lib.rs
···
use std::{
io::{
-
Cursor,
+
Cursor as IoCursor,
Read,
},
marker::PhantomData,
···
};
use atrium_api::record::KnownRecord;
-
use chrono::Utc;
use futures_util::{
stream::StreamExt,
SinkExt,
···
ConnectionError,
JetstreamEventError,
},
-
events::JetstreamEvent,
+
events::{
+
Cursor,
+
JetstreamEvent,
+
},
};
/// The Jetstream endpoints officially provided by Bluesky themselves.
···
pub wanted_dids: Vec<exports::Did>,
/// The compression algorithm to request and use for the WebSocket connection (if any).
pub compression: JetstreamCompression,
-
/// An optional timestamp to begin playback from.
+
/// Enable automatic cursor for auto-reconnect
///
-
/// An absent cursor or a cursor from the future will result in live-tail operation.
+
/// By default, reconnects will never set a cursor for the connection, so a small number of
+
/// events will always be dropped.
///
-
/// When reconnecting, use the time_us from your most recently processed event and maybe
-
/// provide a negative buffer (i.e. subtract a few seconds) to ensure gapless playback.
-
pub cursor: Option<chrono::DateTime<Utc>>,
+
/// If you want gapless playback across reconnects, set this to `true`. If you always want
+
/// the latest available events and can tolerate missing some: `false`.
+
pub replay_on_reconnect: bool,
/// Maximum size of send channel for jetstream events.
///
/// If your consuming task can't keep up with every new jetstream event in real-time,
···
wanted_collections: Vec::new(),
wanted_dids: Vec::new(),
compression: JetstreamCompression::None,
-
cursor: None,
+
replay_on_reconnect: false,
channel_size: 4096, // a few seconds of firehose buffer
record_type: PhantomData,
}
···
},
);
-
let cursor = self
-
.cursor
-
.map(|c| ("cursor", c.timestamp_micros().to_string()));
-
let params = did_search_query
.chain(collection_search_query)
.chain(std::iter::once(compression))
-
.chain(cursor)
.collect::<Vec<(&str, String)>>();
Url::parse_with_params(endpoint, params)
···
/// A [JetstreamReceiver] is returned which can be used to respond to events. When all instances
/// of this receiver are dropped, the connection and task are automatically closed.
pub async fn connect(&self) -> Result<JetstreamReceiver<R>, ConnectionError> {
+
self.base_connect(None).await
+
}
+
+
/// Connects to a Jetstream instance as defined in the [JetstreamConfig] with playback from a
+
/// cursor
+
///
+
/// A cursor from the future will result in live-tail operation.
+
///
+
/// The cursor is only used for first successfull connection -- on auto-reconnect it will
+
/// live-tail by default. Set `replay_on_reconnect: true` in the config if you need to
+
/// receive every event, which will keep track of the last-seen cursor and reconnect from
+
/// there.
+
pub async fn connect_cursor(
+
&self,
+
cursor: Cursor,
+
) -> Result<JetstreamReceiver<R>, ConnectionError> {
+
self.base_connect(Some(cursor)).await
+
}
+
+
async fn base_connect(
+
&self,
+
cursor: Option<Cursor>,
+
) -> Result<JetstreamReceiver<R>, ConnectionError> {
// We validate the config again for good measure. Probably not necessary but it can't hurt.
self.config
.validate()
···
.construct_endpoint(&self.config.endpoint)
.map_err(ConnectionError::InvalidEndpoint)?;
+
let replay_on_reconnect = self.config.replay_on_reconnect;
+
tokio::task::spawn(async move {
let max_retries = 30;
let base_delay_ms = 1_000; // 1 second
···
let success_threshold_s = 15; // 15 seconds, retry count is reset if we were connected at least this long
let mut retry_attempt = 0;
+
let mut connect_cursor = cursor;
loop {
let dict = DecoderDictionary::copy(JETSTREAM_ZSTD_DICTIONARY);
+
let mut configured_endpoint = configured_endpoint.clone();
+
if let Some(ref cursor) = connect_cursor {
+
configured_endpoint
+
.query_pairs_mut()
+
.append_pair("cursor", &cursor.to_jetstream());
+
}
+
+
let mut last_cursor = connect_cursor.clone();
+
retry_attempt += 1;
if let Ok((ws_stream, _)) = connect_async(&configured_endpoint).await {
let t_connected = Instant::now();
-
if let Err(e) = websocket_task(dict, ws_stream, send_channel.clone()).await {
+
if let Err(e) =
+
websocket_task(dict, ws_stream, send_channel.clone(), &mut last_cursor)
+
.await
+
{
log::error!("Jetstream closed after encountering error: {e:?}");
} else {
log::error!("Jetstream connection closed cleanly");
}
if t_connected.elapsed() > Duration::from_secs(success_threshold_s) {
retry_attempt = 0;
-
continue;
}
}
if retry_attempt >= max_retries {
-
eprintln!("max retries, bye");
+
log::error!("hit max retries, bye");
break;
}
-
eprintln!("will try to reconnect");
+
connect_cursor = if replay_on_reconnect {
+
last_cursor
+
} else {
+
None
+
};
-
// Exponential backoff
-
let delay_ms = base_delay_ms * (2_u64.pow(retry_attempt));
-
-
log::error!("Connection failed, retrying in {delay_ms}ms...");
-
tokio::time::sleep(Duration::from_millis(delay_ms.min(max_delay_ms))).await;
-
log::info!("Attempting to reconnect...")
+
if retry_attempt > 0 {
+
// Exponential backoff
+
let delay_ms = base_delay_ms * (2_u64.pow(retry_attempt));
+
log::error!("Connection failed, retrying in {delay_ms}ms...");
+
tokio::time::sleep(Duration::from_millis(delay_ms.min(max_delay_ms))).await;
+
log::info!("Attempting to reconnect...");
+
}
}
log::error!("Connection retries exhausted. Jetstream is disconnected.");
});
···
dictionary: DecoderDictionary<'_>,
ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
send_channel: JetstreamSender<R>,
+
last_cursor: &mut Option<Cursor>,
) -> Result<(), JetstreamEventError> {
// TODO: Use the write half to allow the user to change configuration settings on the fly.
let (socket_write, mut socket_read) = ws.split();
···
Some(Ok(message)) => {
match message {
Message::Text(json) => {
-
let event = serde_json::from_str(&json)
+
let event: JetstreamEvent<R> = serde_json::from_str(&json)
.map_err(JetstreamEventError::ReceivedMalformedJSON)?;
+
let event_cursor = event.cursor();
if send_channel.send(event).await.is_err() {
// We can assume that all receivers have been dropped, so we can close
// the connection and exit the task.
log::info!(
-
"All receivers for the Jetstream connection have been dropped, closing connection."
-
);
+
"All receivers for the Jetstream connection have been dropped, closing connection."
+
);
closing_connection = true;
+
} else if let Some(v) = last_cursor.as_mut() {
+
*v = event_cursor;
}
}
Message::Binary(zstd_json) => {
-
let mut cursor = Cursor::new(zstd_json);
+
let mut cursor = IoCursor::new(zstd_json);
let mut decoder = zstd::stream::Decoder::with_prepared_dictionary(
&mut cursor,
&dictionary,
···
.read_to_string(&mut json)
.map_err(JetstreamEventError::CompressionDecoderError)?;
-
let event = serde_json::from_str(&json)
+
let event: JetstreamEvent<R> = serde_json::from_str(&json)
.map_err(JetstreamEventError::ReceivedMalformedJSON)?;
+
let event_cursor = event.cursor();
if send_channel.send(event).await.is_err() {
// We can assume that all receivers have been dropped, so we can close
// the connection and exit the task.
log::info!(
-
"All receivers for the Jetstream connection have been dropped, closing connection..."
-
);
+
"All receivers for the Jetstream connection have been dropped, closing connection..."
+
);
closing_connection = true;
+
} else if let Some(v) = last_cursor.as_mut() {
+
*v = event_cursor;
}
}
Message::Ping(vec) => {