Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

subscriber sourced messages

Changed files
+54 -5
spacedust
+12
spacedust/src/error.rs
···
}
#[derive(Debug, Error)]
pub enum DelayError {
#[error("delay ended")]
DelayEnded,
···
}
#[derive(Debug, Error)]
+
pub enum SubscriberUpdateError {
+
#[error("failed to parse json for subscriber update: {0}")]
+
FailedToParseMessage(serde_json::Error),
+
#[error("more wantedSources were requested than allowed (max 1,000)")]
+
TooManySourcesWanted,
+
#[error("more wantedSubjectDids were requested than allowed (max 10,000)")]
+
TooManyDidsWanted,
+
#[error("more wantedSubjects were requested than allowed (max 50,000)")]
+
TooManySubjectsWanted,
+
}
+
+
#[derive(Debug, Error)]
pub enum DelayError {
#[error("delay ended")]
DelayEnded,
+8 -1
spacedust/src/lib.rs
···
use links::CollectedLink;
use jetstream::events::CommitEvent;
use tokio_tungstenite::tungstenite::Message;
-
use serde::Serialize;
#[derive(Debug)]
pub struct FilterableProperties {
···
// TODO: include the record too? would save clients a level of hydration
// ^^ no, not for now. until we backfill + support broader deletes at *least*.
}
···
use links::CollectedLink;
use jetstream::events::CommitEvent;
use tokio_tungstenite::tungstenite::Message;
+
use serde::{Deserialize, Serialize};
+
use server::MultiSubscribeQuery;
#[derive(Debug)]
pub struct FilterableProperties {
···
// TODO: include the record too? would save clients a level of hydration
// ^^ no, not for now. until we backfill + support broader deletes at *least*.
}
+
+
#[derive(Debug, Deserialize)]
+
#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
+
pub enum SubscriberSourcedMessage {
+
OptionsUpdate(MultiSubscribeQuery),
+
}
+4 -2
spacedust/src/server.rs
···
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use tokio::time::Instant;
-
use tokio_tungstenite::tungstenite::protocol::Role;
use tokio_util::sync::CancellationToken;
use async_trait::async_trait;
use std::collections::HashSet;
···
let ws = tokio_tungstenite::WebSocketStream::from_raw_socket(
upgraded.into_inner(),
Role::Server,
-
None,
)
.await;
···
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use tokio::time::Instant;
+
use tokio_tungstenite::tungstenite::protocol::{Role, WebSocketConfig};
use tokio_util::sync::CancellationToken;
use async_trait::async_trait;
use std::collections::HashSet;
···
let ws = tokio_tungstenite::WebSocketStream::from_raw_socket(
upgraded.into_inner(),
Role::Server,
+
Some(WebSocketConfig::default().max_message_size(
+
Some(10 * 2_usize.pow(20)) // 10MiB, matching jetstream
+
)),
)
.await;
+30 -2
spacedust/src/subscriber.rs
···
use std::sync::Arc;
use tokio::time::interval;
use std::time::Duration;
use futures::StreamExt;
-
use crate::{ClientMessage, FilterableProperties};
use crate::server::MultiSubscribeQuery;
use futures::SinkExt;
use std::error::Error;
···
}
pub async fn start(
-
self,
ws: WebSocketStream<WebsocketConnectionRaw>,
mut receiver: broadcast::Receiver<Arc<ClientMessage>>
) -> Result<(), Box<dyn Error>> {
···
self.shutdown.cancel();
}
}
Some(Ok(m)) => log::trace!("subscriber sent an unexpected message: {m:?}"),
Some(Err(e)) => {
log::error!("failed to receive subscriber message: {e:?}");
···
true
}
}
···
+
use crate::error::SubscriberUpdateError;
use std::sync::Arc;
use tokio::time::interval;
use std::time::Duration;
use futures::StreamExt;
+
use crate::{ClientMessage, FilterableProperties, SubscriberSourcedMessage};
use crate::server::MultiSubscribeQuery;
use futures::SinkExt;
use std::error::Error;
···
}
pub async fn start(
+
mut self,
ws: WebSocketStream<WebsocketConnectionRaw>,
mut receiver: broadcast::Receiver<Arc<ClientMessage>>
) -> Result<(), Box<dyn Error>> {
···
self.shutdown.cancel();
}
}
+
Some(Ok(Message::Text(raw))) => {
+
if let Err(e) = self.query.update_from_raw(&raw) {
+
log::error!("subscriber options could not be updated, dropping: {e:?}");
+
// TODO: send client an explanation
+
self.shutdown.cancel();
+
}
+
},
Some(Ok(m)) => log::trace!("subscriber sent an unexpected message: {m:?}"),
Some(Err(e)) => {
log::error!("failed to receive subscriber message: {e:?}");
···
true
}
}
+
+
+
+
impl MultiSubscribeQuery {
+
pub fn update_from_raw(&mut self, s: &str) -> Result<(), SubscriberUpdateError> {
+
let SubscriberSourcedMessage::OptionsUpdate(opts) = serde_json::from_str(s)
+
.map_err(SubscriberUpdateError::FailedToParseMessage)?;
+
if opts.wanted_sources.len() > 1_000 {
+
return Err(SubscriberUpdateError::TooManySourcesWanted);
+
}
+
if opts.wanted_subject_dids.len() > 10_000 {
+
return Err(SubscriberUpdateError::TooManyDidsWanted);
+
}
+
if opts.wanted_subjects.len() > 50_000 {
+
return Err(SubscriberUpdateError::TooManySubjectsWanted);
+
}
+
*self = opts;
+
Ok(())
+
}
+
}