relay filter/appview bootstrap
1use std::sync::Arc;
2use std::time::Duration;
3
4use chrono::{DateTime, Utc};
5use futures::{SinkExt, StreamExt};
6use serde::Deserialize;
7use tokio::sync::broadcast;
8use tokio_tungstenite::{connect_async, tungstenite::Message};
9
10use crate::db;
11use crate::records::{
12 CidRef, NewAccount, NewRecord, CHANNEL_COLLECTION, INVITE_COLLECTION, LATTICE_COLLECTION,
13 MEMBERSHIP_COLLECTION, SHARD_COLLECTION,
14};
15use crate::AppState;
16
17#[derive(Debug, Deserialize)]
18pub struct TapMessage {
19 pub id: u64,
20 #[serde(rename = "type")]
21 pub msg_type: String,
22 pub record: Option<TapRecordEvent>,
23}
24
25#[derive(Debug, serde::Serialize)]
26struct TapAck {
27 #[serde(rename = "type")]
28 msg_type: &'static str,
29 id: u64,
30}
31
32impl TapAck {
33 fn new(id: u64) -> Self {
34 Self { msg_type: "ack", id }
35 }
36}
37
38#[derive(Debug, Deserialize)]
39pub struct TapRecordEvent {
40 pub did: String,
41 pub collection: String,
42 pub rkey: String,
43 pub cid: String,
44 pub action: String,
45 pub record: Option<serde_json::Value>,
46 #[serde(default)]
47 pub live: bool,
48}
49
50fn parse_datetime(s: &str) -> Option<DateTime<Utc>> {
51 DateTime::parse_from_rfc3339(s)
52 .map(|dt| dt.with_timezone(&Utc))
53 .ok()
54}
55
56fn build_uri(did: &str, collection: &str, rkey: &str) -> String {
57 format!("at://{}/{}/{}", did, collection, rkey)
58}
59
60fn extract_cid_ref(record: &serde_json::Value, field: &str) -> Option<String> {
61 record
62 .get(field)
63 .and_then(|v| serde_json::from_value::<CidRef>(v.clone()).ok())
64 .map(|r| r.cid)
65}
66
67async fn process_record_event(
68 state: &Arc<AppState>,
69 event: TapRecordEvent,
70 tx: &broadcast::Sender<String>,
71) -> anyhow::Result<()> {
72 if event.action == "delete" {
73 return Ok(());
74 }
75
76 let record = match &event.record {
77 Some(r) => r,
78 None => return Ok(()),
79 };
80
81 let pool = state.db.pool();
82 let now = Utc::now();
83 let uri = build_uri(&event.did, &event.collection, &event.rkey);
84
85 if !event.collection.starts_with("systems.gmstn.development.") {
86 return Ok(());
87 }
88
89 let created_at = record
90 .get("createdAt")
91 .and_then(|v| v.as_str())
92 .and_then(parse_datetime)
93 .unwrap_or(now);
94
95 db::upsert_account(
96 pool,
97 &NewAccount {
98 did: event.did.clone(),
99 handle: event.did.clone(),
100 created_at: now,
101 },
102 )
103 .await?;
104
105 let (creator_did, target_did, ref_cids) = match event.collection.as_str() {
106 LATTICE_COLLECTION | SHARD_COLLECTION | CHANNEL_COLLECTION => {
107 (event.did.clone(), None, vec![])
108 }
109 INVITE_COLLECTION => {
110 let channel_cid = extract_cid_ref(record, "channel");
111 let recipient = record
112 .get("recipient")
113 .and_then(|v| v.as_str())
114 .map(String::from);
115
116 if let Some(ref recipient_did) = recipient {
117 db::upsert_account(
118 pool,
119 &NewAccount {
120 did: recipient_did.clone(),
121 handle: recipient_did.clone(),
122 created_at: now,
123 },
124 )
125 .await?;
126 }
127
128 let ref_cids = channel_cid.into_iter().collect();
129 (event.did.clone(), recipient, ref_cids)
130 }
131 MEMBERSHIP_COLLECTION => {
132 let channel_cid = extract_cid_ref(record, "channel");
133 let invite_cid = extract_cid_ref(record, "invite");
134
135 let ref_cids: Vec<String> = [channel_cid, invite_cid].into_iter().flatten().collect();
136
137 (event.did.clone(), Some(event.did.clone()), ref_cids)
138 }
139 _ => {
140 tracing::debug!(collection = %event.collection, "Unknown collection type");
141 return Ok(());
142 }
143 };
144
145 db::insert_record(
146 pool,
147 &NewRecord {
148 uri,
149 cid: event.cid,
150 collection: event.collection,
151 creator_did,
152 created_at,
153 indexed_at: now,
154 data: record.clone(),
155 target_did,
156 ref_cids,
157 },
158 )
159 .await?;
160
161 if let Ok(json) = serde_json::to_string(record) {
162 let _ = tx.send(json);
163 }
164
165 Ok(())
166}
167
168pub async fn run(url: String, state: Arc<AppState>, tx: broadcast::Sender<String>) {
169 loop {
170 tracing::info!("Connecting to TAP at {}", url);
171
172 match connect_async(&url).await {
173 Ok((ws_stream, _)) => {
174 tracing::info!("Connected to TAP");
175 let (mut write, mut read) = ws_stream.split();
176
177 while let Some(msg) = read.next().await {
178 match msg {
179 Ok(Message::Text(text)) => {
180 match serde_json::from_str::<TapMessage>(&text) {
181 Ok(msg) => {
182 let event_id = msg.id;
183
184 if msg.msg_type == "record" {
185 if let Some(record_event) = msg.record {
186 if let Err(e) = process_record_event(&state, record_event, &tx).await {
187 tracing::error!(error = %e, "Failed to process TAP event");
188 }
189 }
190 }
191
192 let ack = TapAck::new(event_id);
193 if let Ok(ack_json) = serde_json::to_string(&ack) {
194 if let Err(e) = write.send(Message::Text(ack_json.into())).await {
195 tracing::error!(error = %e, "Failed to send ack");
196 break;
197 }
198 }
199 }
200 Err(e) => {
201 tracing::warn!(error = %e, "Failed to parse TAP message");
202 }
203 }
204 }
205 Ok(Message::Ping(data)) => {
206 if let Err(e) = write.send(Message::Pong(data)).await {
207 tracing::error!(error = %e, "Failed to send pong");
208 break;
209 }
210 }
211 Ok(Message::Close(_)) => {
212 tracing::info!("TAP connection closed");
213 break;
214 }
215 Err(e) => {
216 tracing::error!(error = %e, "TAP WebSocket error");
217 break;
218 }
219 _ => {}
220 }
221 }
222 }
223 Err(e) => {
224 tracing::error!(error = %e, "Failed to connect to TAP");
225 }
226 }
227
228 tracing::info!("Reconnecting to TAP in 5 seconds...");
229 tokio::time::sleep(Duration::from_secs(5)).await;
230 }
231}