forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1pub mod consumer;
2pub mod delay;
3pub mod error;
4pub mod removable_delay_queue;
5pub mod server;
6pub mod storage;
7pub mod subscriber;
8
9use jetstream::events::CommitEvent;
10use links::CollectedLink;
11use serde::{Deserialize, Serialize};
12use server::MultiSubscribeQuery;
13use tokio_tungstenite::tungstenite::Message;
14
15#[derive(Debug)]
16pub struct FilterableProperties {
17 /// Full unmodified DID, at-uri, or url
18 pub subject: String,
19 /// User/identity DID.
20 ///
21 /// Will match both bare-DIDs and DIDs extracted from at-uris.
22 /// `None` for any URL.
23 pub subject_did: Option<String>,
24 /// Link source -- collection NSID joined with `:` to the record property path.
25 pub source: String,
26}
27
28/// A serialized message with filterable properties attached
29#[derive(Debug)]
30pub struct ClientMessage {
31 pub message: Message, // always Message::Text
32 pub properties: FilterableProperties,
33}
34
35impl ClientMessage {
36 pub fn new_link(
37 link: CollectedLink,
38 at_uri: &str,
39 commit: &CommitEvent,
40 ) -> Result<Self, serde_json::Error> {
41 let subject_did = link.target.did();
42
43 let subject = link.target.into_string();
44
45 let undotted = link.path.strip_prefix('.').unwrap_or_else(|| {
46 eprintln!("link path did not have expected '.' prefix: {}", link.path);
47 ""
48 });
49 let source = format!("{}:{undotted}", &*commit.collection);
50
51 let client_link_event = ClientLinkEvent {
52 operation: "create",
53 source: source.clone(),
54 source_record: at_uri.to_string(),
55 source_rev: commit.rev.to_string(),
56 subject: subject.clone(),
57 };
58
59 let client_event = ClientEvent {
60 kind: "link",
61 origin: "live", // TODO: indicate when we're locally replaying jetstream on reconnect?? maybe not.
62 link: client_link_event,
63 };
64
65 let client_event_json = serde_json::to_string(&client_event)?;
66
67 let message = Message::Text(client_event_json.into());
68
69 let properties = FilterableProperties {
70 subject,
71 subject_did,
72 source,
73 };
74
75 Ok(ClientMessage {
76 message,
77 properties,
78 })
79 }
80}
81
82#[derive(Debug, Serialize)]
83#[serde(rename_all = "snake_case")]
84pub struct ClientEvent {
85 kind: &'static str, // "link"
86 origin: &'static str, // "live", "replay", "backfill"
87 link: ClientLinkEvent,
88}
89
90#[derive(Debug, Serialize)]
91struct ClientLinkEvent {
92 operation: &'static str, // "create", "delete" (prob no update, though maybe for rev?)
93 source: String,
94 source_record: String,
95 source_rev: String,
96 subject: String,
97 // TODO: include the record too? would save clients a level of hydration
98 // ^^ no, not for now. until we backfill + support broader deletes at *least*.
99}
100
101#[derive(Debug, Deserialize)]
102#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
103pub enum SubscriberSourcedMessage {
104 OptionsUpdate(MultiSubscribeQuery),
105}