Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
at main 3.0 kB view raw
1pub mod consumer; 2pub mod delay; 3pub mod error; 4pub mod removable_delay_queue; 5pub mod server; 6pub mod subscriber; 7 8use jetstream::events::CommitEvent; 9use links::CollectedLink; 10use serde::{Deserialize, Serialize}; 11use server::MultiSubscribeQuery; 12use tokio_tungstenite::tungstenite::Message; 13 14#[derive(Debug)] 15pub struct FilterableProperties { 16 /// Full unmodified DID, at-uri, or url 17 pub subject: String, 18 /// User/identity DID. 19 /// 20 /// Will match both bare-DIDs and DIDs extracted from at-uris. 21 /// `None` for any URL. 22 pub subject_did: Option<String>, 23 /// Link source -- collection NSID joined with `:` to the record property path. 24 pub source: String, 25} 26 27/// A serialized message with filterable properties attached 28#[derive(Debug)] 29pub struct ClientMessage { 30 pub message: Message, // always Message::Text 31 pub properties: FilterableProperties, 32} 33 34impl ClientMessage { 35 pub fn new_link( 36 link: CollectedLink, 37 at_uri: &str, 38 commit: &CommitEvent, 39 ) -> Result<Self, serde_json::Error> { 40 let subject_did = link.target.did(); 41 42 let subject = link.target.into_string(); 43 44 let undotted = link.path.strip_prefix('.').unwrap_or_else(|| { 45 eprintln!("link path did not have expected '.' prefix: {}", link.path); 46 "" 47 }); 48 let source = format!("{}:{undotted}", &*commit.collection); 49 50 let client_link_event = ClientLinkEvent { 51 operation: "create", 52 source: source.clone(), 53 source_record: at_uri.to_string(), 54 source_rev: commit.rev.to_string(), 55 subject: subject.clone(), 56 }; 57 58 let client_event = ClientEvent { 59 kind: "link", 60 origin: "live", // TODO: indicate when we're locally replaying jetstream on reconnect?? maybe not. 61 link: client_link_event, 62 }; 63 64 let client_event_json = serde_json::to_string(&client_event)?; 65 66 let message = Message::Text(client_event_json.into()); 67 68 let properties = FilterableProperties { 69 subject, 70 subject_did, 71 source, 72 }; 73 74 Ok(ClientMessage { 75 message, 76 properties, 77 }) 78 } 79} 80 81#[derive(Debug, Serialize)] 82#[serde(rename_all = "snake_case")] 83pub struct ClientEvent { 84 kind: &'static str, // "link" 85 origin: &'static str, // "live", "replay", "backfill" 86 link: ClientLinkEvent, 87} 88 89#[derive(Debug, Serialize)] 90struct ClientLinkEvent { 91 operation: &'static str, // "create", "delete" (prob no update, though maybe for rev?) 92 source: String, 93 source_record: String, 94 source_rev: String, 95 subject: String, 96 // TODO: include the record too? would save clients a level of hydration 97 // ^^ no, not for now. until we backfill + support broader deletes at *least*. 98} 99 100#[derive(Debug, Deserialize)] 101#[serde(tag = "type", content = "payload", rename_all = "snake_case")] 102pub enum SubscriberSourcedMessage { 103 OptionsUpdate(MultiSubscribeQuery), 104}