Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1pub mod consumer; 2pub mod delay; 3pub mod error; 4pub mod removable_delay_queue; 5pub mod server; 6pub mod storage; 7pub mod subscriber; 8 9use jetstream::events::CommitEvent; 10use links::CollectedLink; 11use serde::{Deserialize, Serialize}; 12use server::MultiSubscribeQuery; 13use tokio_tungstenite::tungstenite::Message; 14 15#[derive(Debug)] 16pub struct FilterableProperties { 17 /// Full unmodified DID, at-uri, or url 18 pub subject: String, 19 /// User/identity DID. 20 /// 21 /// Will match both bare-DIDs and DIDs extracted from at-uris. 22 /// `None` for any URL. 23 pub subject_did: Option<String>, 24 /// Link source -- collection NSID joined with `:` to the record property path. 25 pub source: String, 26} 27 28/// A serialized message with filterable properties attached 29#[derive(Debug)] 30pub struct ClientMessage { 31 pub message: Message, // always Message::Text 32 pub properties: FilterableProperties, 33} 34 35impl ClientMessage { 36 pub fn new_link( 37 link: CollectedLink, 38 at_uri: &str, 39 commit: &CommitEvent, 40 ) -> Result<Self, serde_json::Error> { 41 let subject_did = link.target.did(); 42 43 let subject = link.target.into_string(); 44 45 let undotted = link.path.strip_prefix('.').unwrap_or_else(|| { 46 eprintln!("link path did not have expected '.' prefix: {}", link.path); 47 "" 48 }); 49 let source = format!("{}:{undotted}", &*commit.collection); 50 51 let client_link_event = ClientLinkEvent { 52 operation: "create", 53 source: source.clone(), 54 source_record: at_uri.to_string(), 55 source_rev: commit.rev.to_string(), 56 subject: subject.clone(), 57 }; 58 59 let client_event = ClientEvent { 60 kind: "link", 61 origin: "live", // TODO: indicate when we're locally replaying jetstream on reconnect?? maybe not. 62 link: client_link_event, 63 }; 64 65 let client_event_json = serde_json::to_string(&client_event)?; 66 67 let message = Message::Text(client_event_json.into()); 68 69 let properties = FilterableProperties { 70 subject, 71 subject_did, 72 source, 73 }; 74 75 Ok(ClientMessage { 76 message, 77 properties, 78 }) 79 } 80} 81 82#[derive(Debug, Serialize)] 83#[serde(rename_all = "snake_case")] 84pub struct ClientEvent { 85 kind: &'static str, // "link" 86 origin: &'static str, // "live", "replay", "backfill" 87 link: ClientLinkEvent, 88} 89 90#[derive(Debug, Serialize)] 91struct ClientLinkEvent { 92 operation: &'static str, // "create", "delete" (prob no update, though maybe for rev?) 93 source: String, 94 source_record: String, 95 source_rev: String, 96 subject: String, 97 // TODO: include the record too? would save clients a level of hydration 98 // ^^ no, not for now. until we backfill + support broader deletes at *least*. 99} 100 101#[derive(Debug, Deserialize)] 102#[serde(tag = "type", content = "payload", rename_all = "snake_case")] 103pub enum SubscriberSourcedMessage { 104 OptionsUpdate(MultiSubscribeQuery), 105}