relay filter/appview bootstrap
at main 7.3 kB view raw
1use std::sync::Arc; 2use std::time::Duration; 3 4use chrono::{DateTime, Utc}; 5use futures::{SinkExt, StreamExt}; 6use serde::Deserialize; 7use tokio::sync::broadcast; 8use tokio_tungstenite::{connect_async, tungstenite::Message}; 9 10use crate::db; 11use crate::records::{ 12 CidRef, NewAccount, NewRecord, CHANNEL_COLLECTION, INVITE_COLLECTION, LATTICE_COLLECTION, 13 MEMBERSHIP_COLLECTION, SHARD_COLLECTION, 14}; 15use crate::AppState; 16 17#[derive(Debug, Deserialize)] 18pub struct TapMessage { 19 pub id: u64, 20 #[serde(rename = "type")] 21 pub msg_type: String, 22 pub record: Option<TapRecordEvent>, 23} 24 25#[derive(Debug, serde::Serialize)] 26struct TapAck { 27 #[serde(rename = "type")] 28 msg_type: &'static str, 29 id: u64, 30} 31 32impl TapAck { 33 fn new(id: u64) -> Self { 34 Self { msg_type: "ack", id } 35 } 36} 37 38#[derive(Debug, Deserialize)] 39pub struct TapRecordEvent { 40 pub did: String, 41 pub collection: String, 42 pub rkey: String, 43 pub cid: String, 44 pub action: String, 45 pub record: Option<serde_json::Value>, 46 #[serde(default)] 47 pub live: bool, 48} 49 50fn parse_datetime(s: &str) -> Option<DateTime<Utc>> { 51 DateTime::parse_from_rfc3339(s) 52 .map(|dt| dt.with_timezone(&Utc)) 53 .ok() 54} 55 56fn build_uri(did: &str, collection: &str, rkey: &str) -> String { 57 format!("at://{}/{}/{}", did, collection, rkey) 58} 59 60fn extract_cid_ref(record: &serde_json::Value, field: &str) -> Option<String> { 61 record 62 .get(field) 63 .and_then(|v| serde_json::from_value::<CidRef>(v.clone()).ok()) 64 .map(|r| r.cid) 65} 66 67async fn process_record_event( 68 state: &Arc<AppState>, 69 event: TapRecordEvent, 70 tx: &broadcast::Sender<String>, 71) -> anyhow::Result<()> { 72 if event.action == "delete" { 73 return Ok(()); 74 } 75 76 let record = match &event.record { 77 Some(r) => r, 78 None => return Ok(()), 79 }; 80 81 let pool = state.db.pool(); 82 let now = Utc::now(); 83 let uri = build_uri(&event.did, &event.collection, &event.rkey); 84 85 if !event.collection.starts_with("systems.gmstn.development.") { 86 return Ok(()); 87 } 88 89 let created_at = record 90 .get("createdAt") 91 .and_then(|v| v.as_str()) 92 .and_then(parse_datetime) 93 .unwrap_or(now); 94 95 db::upsert_account( 96 pool, 97 &NewAccount { 98 did: event.did.clone(), 99 handle: event.did.clone(), 100 created_at: now, 101 }, 102 ) 103 .await?; 104 105 let (creator_did, target_did, ref_cids) = match event.collection.as_str() { 106 LATTICE_COLLECTION | SHARD_COLLECTION | CHANNEL_COLLECTION => { 107 (event.did.clone(), None, vec![]) 108 } 109 INVITE_COLLECTION => { 110 let channel_cid = extract_cid_ref(record, "channel"); 111 let recipient = record 112 .get("recipient") 113 .and_then(|v| v.as_str()) 114 .map(String::from); 115 116 if let Some(ref recipient_did) = recipient { 117 db::upsert_account( 118 pool, 119 &NewAccount { 120 did: recipient_did.clone(), 121 handle: recipient_did.clone(), 122 created_at: now, 123 }, 124 ) 125 .await?; 126 } 127 128 let ref_cids = channel_cid.into_iter().collect(); 129 (event.did.clone(), recipient, ref_cids) 130 } 131 MEMBERSHIP_COLLECTION => { 132 let channel_cid = extract_cid_ref(record, "channel"); 133 let invite_cid = extract_cid_ref(record, "invite"); 134 135 let ref_cids: Vec<String> = [channel_cid, invite_cid].into_iter().flatten().collect(); 136 137 (event.did.clone(), Some(event.did.clone()), ref_cids) 138 } 139 _ => { 140 tracing::debug!(collection = %event.collection, "Unknown collection type"); 141 return Ok(()); 142 } 143 }; 144 145 db::insert_record( 146 pool, 147 &NewRecord { 148 uri, 149 cid: event.cid, 150 collection: event.collection, 151 creator_did, 152 created_at, 153 indexed_at: now, 154 data: record.clone(), 155 target_did, 156 ref_cids, 157 }, 158 ) 159 .await?; 160 161 if let Ok(json) = serde_json::to_string(record) { 162 let _ = tx.send(json); 163 } 164 165 Ok(()) 166} 167 168pub async fn run(url: String, state: Arc<AppState>, tx: broadcast::Sender<String>) { 169 loop { 170 tracing::info!("Connecting to TAP at {}", url); 171 172 match connect_async(&url).await { 173 Ok((ws_stream, _)) => { 174 tracing::info!("Connected to TAP"); 175 let (mut write, mut read) = ws_stream.split(); 176 177 while let Some(msg) = read.next().await { 178 match msg { 179 Ok(Message::Text(text)) => { 180 match serde_json::from_str::<TapMessage>(&text) { 181 Ok(msg) => { 182 let event_id = msg.id; 183 184 if msg.msg_type == "record" { 185 if let Some(record_event) = msg.record { 186 if let Err(e) = process_record_event(&state, record_event, &tx).await { 187 tracing::error!(error = %e, "Failed to process TAP event"); 188 } 189 } 190 } 191 192 let ack = TapAck::new(event_id); 193 if let Ok(ack_json) = serde_json::to_string(&ack) { 194 if let Err(e) = write.send(Message::Text(ack_json.into())).await { 195 tracing::error!(error = %e, "Failed to send ack"); 196 break; 197 } 198 } 199 } 200 Err(e) => { 201 tracing::warn!(error = %e, "Failed to parse TAP message"); 202 } 203 } 204 } 205 Ok(Message::Ping(data)) => { 206 if let Err(e) = write.send(Message::Pong(data)).await { 207 tracing::error!(error = %e, "Failed to send pong"); 208 break; 209 } 210 } 211 Ok(Message::Close(_)) => { 212 tracing::info!("TAP connection closed"); 213 break; 214 } 215 Err(e) => { 216 tracing::error!(error = %e, "TAP WebSocket error"); 217 break; 218 } 219 _ => {} 220 } 221 } 222 } 223 Err(e) => { 224 tracing::error!(error = %e, "Failed to connect to TAP"); 225 } 226 } 227 228 tracing::info!("Reconnecting to TAP in 5 seconds..."); 229 tokio::time::sleep(Duration::from_secs(5)).await; 230 } 231}