···
1
+
use anyhow::{anyhow, Result};
2
+
use atrium_api::com::atproto::sync::subscribe_repos::{Commit, NSID};
3
+
use atrium_api::client::AtpServiceClient;
5
+
use atrium_api::types;
6
+
use atrium_xrpc_client::isahc::IsahcClient;
7
+
use futures::StreamExt;
8
+
use ipld_core::ipld::Ipld;
10
+
use std::sync::{Arc, Mutex};
11
+
use tokio_tungstenite::connect_async;
12
+
use tokio_tungstenite::tungstenite::Message;
14
+
use crate::fs::{PdsFsCollection, PdsFsEntry, PdsFsRecord};
15
+
use indexmap::{IndexMap, IndexSet};
17
+
/// Frame header types for WebSocket messages
18
+
#[derive(Debug, Clone, PartialEq, Eq)]
20
+
Message(Option<String>),
24
+
impl TryFrom<Ipld> for FrameHeader {
25
+
type Error = anyhow::Error;
27
+
fn try_from(value: Ipld) -> Result<Self> {
28
+
if let Ipld::Map(map) = value {
29
+
if let Some(Ipld::Integer(i)) = map.get("op") {
32
+
let t = if let Some(Ipld::String(s)) = map.get("t") {
37
+
return Ok(FrameHeader::Message(t));
39
+
-1 => return Ok(FrameHeader::Error),
44
+
Err(anyhow!("invalid frame type"))
48
+
/// Frame types for parsed WebSocket messages
49
+
#[derive(Debug, Clone, PartialEq, Eq)]
51
+
Message(Option<String>, MessageFrame),
55
+
#[derive(Debug, Clone, PartialEq, Eq)]
56
+
pub struct MessageFrame {
60
+
#[derive(Debug, Clone, PartialEq, Eq)]
61
+
pub struct ErrorFrame {}
63
+
impl TryFrom<&[u8]> for Frame {
64
+
type Error = anyhow::Error;
66
+
fn try_from(value: &[u8]) -> Result<Self> {
67
+
let mut cursor = Cursor::new(value);
68
+
let (left, right) = match serde_ipld_dagcbor::from_reader::<Ipld, _>(&mut cursor) {
69
+
Err(serde_ipld_dagcbor::DecodeError::TrailingData) => {
70
+
value.split_at(cursor.position() as usize)
73
+
return Err(anyhow!("invalid frame type"));
76
+
let header = FrameHeader::try_from(serde_ipld_dagcbor::from_slice::<Ipld>(left)?)?;
77
+
if let FrameHeader::Message(t) = &header {
78
+
Ok(Frame::Message(t.clone(), MessageFrame { body: right.to_vec() }))
80
+
Ok(Frame::Error(ErrorFrame {}))
85
+
/// Subscribe to a repo's firehose and update inodes on changes
86
+
pub async fn subscribe_to_repo<R>(
89
+
inodes: Arc<Mutex<IndexSet<PdsFsEntry>>>,
90
+
sizes: Arc<Mutex<IndexMap<usize, u64>>>,
91
+
content_cache: Arc<Mutex<IndexMap<String, String>>>,
92
+
notifier: fuser::Notifier,
95
+
R: atrium_repo::blockstore::AsyncBlockStoreRead,
97
+
// Strip https:// or http:// prefix from PDS URL if present
98
+
let pds_host = pds.trim_start_matches("https://").trim_start_matches("http://");
99
+
let url = format!("wss://{}/xrpc/{}", pds_host, NSID);
100
+
println!("Connecting to firehose: {}", url);
102
+
let (mut stream, _) = connect_async(url).await?;
103
+
println!("Connected to firehose for {}", did);
106
+
match stream.next().await {
107
+
Some(Ok(Message::Binary(data))) => {
108
+
if let Ok(Frame::Message(Some(t), msg)) = Frame::try_from(data.as_slice()) {
109
+
if t.as_str() == "#commit" {
110
+
if let Ok(commit) = serde_ipld_dagcbor::from_reader::<Commit, _>(msg.body.as_slice()) {
111
+
// Only process commits for our DID
112
+
if commit.repo.as_str() == did {
113
+
if let Err(e) = handle_commit(&commit, &inodes, &sizes, &content_cache, &did, &pds, ¬ifier).await {
114
+
eprintln!("Error handling commit: {:?}", e);
121
+
Some(Ok(_)) => {} // Ignore other message types
123
+
eprintln!("WebSocket error: {}", e);
127
+
eprintln!("WebSocket closed");
136
+
/// Handle a commit by updating the inode tree and notifying Finder
137
+
async fn handle_commit(
139
+
inodes: &Arc<Mutex<IndexSet<PdsFsEntry>>>,
140
+
sizes: &Arc<Mutex<IndexMap<usize, u64>>>,
141
+
content_cache: &Arc<Mutex<IndexMap<String, String>>>,
144
+
notifier: &fuser::Notifier,
146
+
// Find the DID inode
147
+
let did_entry = PdsFsEntry::Did(did.to_string());
149
+
let inodes_lock = inodes.lock().unwrap();
150
+
inodes_lock.get_index_of(&did_entry)
153
+
let Some(did_inode) = did_inode else {
154
+
return Err(anyhow!("DID not found in inodes"));
157
+
for op in &commit.ops {
158
+
let Some((collection, rkey)) = op.path.split_once('/') else {
162
+
match op.action.as_str() {
164
+
// Fetch the record from PDS
165
+
let record_key = format!("{}/{}", collection, rkey);
166
+
let cache_key = format!("{}/{}", did, record_key);
168
+
// Fetch record content from PDS
169
+
match fetch_record(pds, did, collection, rkey).await {
171
+
let content_len = content.len() as u64;
173
+
// Add the record to inodes
174
+
let (collection_inode, record_inode) = {
175
+
let mut inodes_lock = inodes.lock().unwrap();
177
+
// Ensure collection exists
178
+
let collection_entry = PdsFsEntry::Collection(PdsFsCollection {
180
+
nsid: collection.to_string(),
182
+
let (collection_inode, _) = inodes_lock.insert_full(collection_entry);
185
+
let record_entry = PdsFsEntry::Record(PdsFsRecord {
186
+
parent: collection_inode,
187
+
rkey: rkey.to_string(),
189
+
let (record_inode, _) = inodes_lock.insert_full(record_entry);
190
+
(collection_inode, record_inode)
193
+
// Cache the content and size
194
+
content_cache.lock().unwrap().insert(cache_key, content);
195
+
sizes.lock().unwrap().insert(record_inode, content_len);
197
+
// Notify Finder about the new file (release lock first)
198
+
let filename = format!("{}.json", rkey);
199
+
if let Err(e) = notifier.inval_entry(collection_inode as u64, filename.as_ref()) {
200
+
eprintln!("Failed to invalidate entry for {}: {}", filename, e);
203
+
println!("Created: {}/{}", collection, rkey);
206
+
eprintln!("Failed to fetch record {}/{}: {}", collection, rkey, e);
211
+
// Get inodes before removing
212
+
let (collection_inode_opt, child_inode_opt) = {
213
+
let mut inodes_lock = inodes.lock().unwrap();
215
+
// Find the collection
216
+
let collection_entry = PdsFsEntry::Collection(PdsFsCollection {
218
+
nsid: collection.to_string(),
220
+
let collection_inode = inodes_lock.get_index_of(&collection_entry);
222
+
// Find and remove the record
223
+
let child_inode = if let Some(coll_ino) = collection_inode {
224
+
let record_entry = PdsFsEntry::Record(PdsFsRecord {
226
+
rkey: rkey.to_string(),
228
+
let child_ino = inodes_lock.get_index_of(&record_entry);
229
+
inodes_lock.shift_remove(&record_entry);
235
+
(collection_inode, child_inode)
238
+
// Notify Finder about the deletion (release lock first)
239
+
if let (Some(coll_ino), Some(child_ino)) = (collection_inode_opt, child_inode_opt) {
240
+
// Remove from caches
241
+
sizes.lock().unwrap().shift_remove(&child_ino);
242
+
let cache_key = format!("{}/{}/{}", did, collection, rkey);
243
+
content_cache.lock().unwrap().shift_remove(&cache_key);
245
+
let filename = format!("{}.json", rkey);
246
+
if let Err(e) = notifier.delete(coll_ino as u64, child_ino as u64, filename.as_ref()) {
247
+
eprintln!("Failed to notify deletion for {}: {}", filename, e);
251
+
println!("Deleted: {}/{}", collection, rkey);
254
+
// For updates, invalidate the inode so content is re-fetched
255
+
let record_inode_opt = {
256
+
let inodes_lock = inodes.lock().unwrap();
257
+
let collection_entry = PdsFsEntry::Collection(PdsFsCollection {
259
+
nsid: collection.to_string(),
262
+
if let Some(collection_inode) = inodes_lock.get_index_of(&collection_entry) {
263
+
let record_entry = PdsFsEntry::Record(PdsFsRecord {
264
+
parent: collection_inode,
265
+
rkey: rkey.to_string(),
267
+
inodes_lock.get_index_of(&record_entry)
273
+
// Notify Finder to invalidate the inode (release lock first)
274
+
if let Some(record_ino) = record_inode_opt {
275
+
// Clear caches so content is recalculated
276
+
sizes.lock().unwrap().shift_remove(&record_ino);
277
+
let cache_key = format!("{}/{}/{}", did, collection, rkey);
278
+
content_cache.lock().unwrap().shift_remove(&cache_key);
280
+
// Invalidate the entire inode (metadata and all data)
281
+
if let Err(e) = notifier.inval_inode(record_ino as u64, 0, 0) {
282
+
eprintln!("Failed to invalidate inode for {}/{}: {}", collection, rkey, e);
286
+
println!("Updated: {}/{}", collection, rkey);
295
+
/// Fetch a record from the PDS
296
+
async fn fetch_record(pds: &str, did: &str, collection: &str, rkey: &str) -> Result<String> {
297
+
let client = AtpServiceClient::new(IsahcClient::new(pds));
298
+
let did = types::string::Did::new(did.to_string()).map_err(|e| anyhow!(e))?;
299
+
let collection_nsid = types::string::Nsid::new(collection.to_string()).map_err(|e| anyhow!(e))?;
300
+
let record_key = types::string::RecordKey::new(rkey.to_string()).map_err(|e| anyhow!(e))?;
302
+
let response = client
307
+
.get_record(com::atproto::repo::get_record::Parameters::from(
308
+
com::atproto::repo::get_record::ParametersData {
310
+
collection: collection_nsid,
311
+
repo: types::string::AtIdentifier::Did(did),
317
+
Ok(serde_json::to_string_pretty(&response.value)?)