tracks lexicons and how many times they appeared on the jetstream

feat(server): allow using multiple jetstream urls

ptr.pet a4dd7621 c0968d55

verified
Changed files
+34 -19
server
+21 -11
server/src/jetstream.rs
···
pub struct JetstreamClient {
stream: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
tls_connector: tokio_websockets::Connector,
-
url: SmolStr,
+
urls: Vec<SmolStr>,
}
impl JetstreamClient {
-
pub fn new(url: &str) -> AppResult<Self> {
+
pub fn new(urls: impl IntoIterator<Item = impl Into<SmolStr>>) -> AppResult<Self> {
Ok(Self {
stream: None,
tls_connector: tokio_websockets::Connector::new()?,
-
url: SmolStr::new(url),
+
urls: urls.into_iter().map(Into::into).collect(),
})
}
pub async fn connect(&mut self) -> AppResult<()> {
-
let (stream, _) = ClientBuilder::new()
-
.connector(&self.tls_connector)
-
.uri(&self.url)?
-
.connect()
-
.await?;
-
self.stream = Some(stream);
-
tracing::info!("connected to jetstream ({})", self.url);
-
Ok(())
+
for uri in &self.urls {
+
let conn_result = ClientBuilder::new()
+
.connector(&self.tls_connector)
+
.uri(uri)?
+
.connect()
+
.await;
+
match conn_result {
+
Ok((stream, _)) => {
+
self.stream = Some(stream);
+
tracing::info!("connected to jetstream {}", uri);
+
return Ok(());
+
}
+
Err(err) => {
+
tracing::error!("failed to connect to jetstream {uri}: {err}");
+
}
+
};
+
}
+
Err(anyhow!("failed to connect to any jetstream server").into())
}
// automatically retries connection, only returning error if it fails many times
+13 -8
server/src/main.rs
···
.install_default()
.expect("cant install rustls crypto provider");
-
let mut jetstream =
-
match JetstreamClient::new("wss://jetstream2.us-west.bsky.network/subscribe") {
-
Ok(client) => client,
-
Err(err) => {
-
tracing::error!("can't create jetstream client: {err}");
-
return;
-
}
-
};
+
let urls = [
+
"wss://jetstream2.fr.hose.cam/subscribe",
+
"wss://jetstream.fire.hose.cam/subscribe",
+
"wss://jetstream1.us-west.bsky.network/subscribe",
+
"wss://jetstream2.us-west.bsky.network/subscribe",
+
];
+
let mut jetstream = match JetstreamClient::new(urls) {
+
Ok(client) => client,
+
Err(err) => {
+
tracing::error!("can't create jetstream client: {err}");
+
return;
+
}
+
};
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(1000);
let consume_events = tokio::spawn({