forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1pub mod consumer;
2pub mod delay;
3pub mod error;
4pub mod removable_delay_queue;
5pub mod server;
6pub mod subscriber;
7
8use jetstream::events::CommitEvent;
9use links::CollectedLink;
10use serde::{Deserialize, Serialize};
11use server::MultiSubscribeQuery;
12use tokio_tungstenite::tungstenite::Message;
13
14#[derive(Debug)]
15pub struct FilterableProperties {
16 /// Full unmodified DID, at-uri, or url
17 pub subject: String,
18 /// User/identity DID.
19 ///
20 /// Will match both bare-DIDs and DIDs extracted from at-uris.
21 /// `None` for any URL.
22 pub subject_did: Option<String>,
23 /// Link source -- collection NSID joined with `:` to the record property path.
24 pub source: String,
25}
26
27/// A serialized message with filterable properties attached
28#[derive(Debug)]
29pub struct ClientMessage {
30 pub message: Message, // always Message::Text
31 pub properties: FilterableProperties,
32}
33
34impl ClientMessage {
35 pub fn new_link(
36 link: CollectedLink,
37 at_uri: &str,
38 commit: &CommitEvent,
39 ) -> Result<Self, serde_json::Error> {
40 let subject_did = link.target.did();
41
42 let subject = link.target.into_string();
43
44 let undotted = link.path.strip_prefix('.').unwrap_or_else(|| {
45 eprintln!("link path did not have expected '.' prefix: {}", link.path);
46 ""
47 });
48 let source = format!("{}:{undotted}", &*commit.collection);
49
50 let client_link_event = ClientLinkEvent {
51 operation: "create",
52 source: source.clone(),
53 source_record: at_uri.to_string(),
54 source_rev: commit.rev.to_string(),
55 subject: subject.clone(),
56 };
57
58 let client_event = ClientEvent {
59 kind: "link",
60 origin: "live", // TODO: indicate when we're locally replaying jetstream on reconnect?? maybe not.
61 link: client_link_event,
62 };
63
64 let client_event_json = serde_json::to_string(&client_event)?;
65
66 let message = Message::Text(client_event_json.into());
67
68 let properties = FilterableProperties {
69 subject,
70 subject_did,
71 source,
72 };
73
74 Ok(ClientMessage {
75 message,
76 properties,
77 })
78 }
79}
80
81#[derive(Debug, Serialize)]
82#[serde(rename_all = "snake_case")]
83pub struct ClientEvent {
84 kind: &'static str, // "link"
85 origin: &'static str, // "live", "replay", "backfill"
86 link: ClientLinkEvent,
87}
88
89#[derive(Debug, Serialize)]
90struct ClientLinkEvent {
91 operation: &'static str, // "create", "delete" (prob no update, though maybe for rev?)
92 source: String,
93 source_record: String,
94 source_rev: String,
95 subject: String,
96 // TODO: include the record too? would save clients a level of hydration
97 // ^^ no, not for now. until we backfill + support broader deletes at *least*.
98}
99
100#[derive(Debug, Deserialize)]
101#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
102pub enum SubscriberSourcedMessage {
103 OptionsUpdate(MultiSubscribeQuery),
104}