From 4bb72e43a23c8d781823650562557b03b5266e8f Mon Sep 17 00:00:00 2001 From: Dan Abramov Date: Mon, 13 Oct 2025 18:01:53 +0100 Subject: [PATCH] show files as .json --- src/fs.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/fs.rs b/src/fs.rs index a9a1524..631c320 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -190,7 +190,7 @@ where .block_on(repo.get_raw::(&key)) .ok() .flatten() - .map_or(0, |v| serde_json::to_string(&v).unwrap().len()) + .map_or(0, |v| serde_json::to_string_pretty(&v).unwrap().len()) as u64; let blocks = ((size as u32 + BLKSIZE - 1) / BLKSIZE) as u64; @@ -315,7 +315,7 @@ where .chain(self.inodes.iter().enumerate().filter_map(|(i, e)| { if let PdsFsEntry::Record(record) = e { if record.parent == ino as usize { - Some((i as u64, record.rkey.clone())) + Some((i as u64, format!("{}.json", record.rkey))) } else { None } @@ -377,9 +377,11 @@ where } } Some(PdsFsEntry::Collection(_)) => { + let name_str = name.to_string_lossy(); + let rkey = name_str.strip_suffix(".json").unwrap_or(&name_str).to_string(); let record = PdsFsEntry::Record(PdsFsRecord { parent: parent as usize, - rkey: name.to_string_lossy().to_string(), + rkey, }); if let Some(ino) = self.inodes.get_index_of(&record) { reply.entry(&TTL, &self.attr(ino as u64), 0); @@ -410,7 +412,7 @@ where let key = format!("{}/{}", col.nsid, r.rkey); if let Ok(Some(val)) = rt.block_on(repo.get_raw::(&key)) { - reply.data(&serde_json::to_string(&val).unwrap().as_bytes()[offset as usize..]); + reply.data(&serde_json::to_string_pretty(&val).unwrap().as_bytes()[offset as usize..]); return; } } -- 2.43.0 From e7ba090f4681d97f92205be3e7eab4dcc0d9a4cf Mon Sep 17 00:00:00 2001 From: Dan Abramov Date: Mon, 13 Oct 2025 19:14:22 +0100 Subject: [PATCH] initialize tokio once --- src/fs.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/fs.rs b/src/fs.rs index 631c320..8f9066b 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -30,6 +30,7 @@ fn tid_to_timestamp(tid: &str) -> Option { pub struct PdsFs { repos: IndexMap>, inodes: IndexSet, + rt: tokio::runtime::Runtime, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -112,6 +113,7 @@ where PdsFs { repos: Default::default(), inodes: IndexSet::from([PdsFsEntry::Zero, PdsFsEntry::Root]), + rt: tokio::runtime::Runtime::new().unwrap(), } } @@ -185,8 +187,8 @@ where let did = self.inodes[col.parent].unwrap_did(); let repo = &mut self.repos[did]; let key = format!("{}/{}", col.nsid, r.rkey); - let rt = tokio::runtime::Runtime::new().unwrap(); - let size = rt + let size = self + .rt .block_on(repo.get_raw::(&key)) .ok() .flatten() @@ -404,14 +406,13 @@ where _lock: Option, reply: fuser::ReplyData, ) { - let rt = tokio::runtime::Runtime::new().unwrap(); if let Some(PdsFsEntry::Record(r)) = self.inodes.get_index(ino as usize) { let col = self.inodes[r.parent].unwrap_collection(); let did = self.inodes[col.parent].unwrap_did(); let repo = &mut self.repos[did]; let key = format!("{}/{}", col.nsid, r.rkey); - if let Ok(Some(val)) = rt.block_on(repo.get_raw::(&key)) { + if let Ok(Some(val)) = self.rt.block_on(repo.get_raw::(&key)) { reply.data(&serde_json::to_string_pretty(&val).unwrap().as_bytes()[offset as usize..]); return; } -- 2.43.0 From c9898260ddd47eabf2d3c10cbfbeadd3c1d851fb Mon Sep 17 00:00:00 2001 From: Dan Abramov Date: Mon, 13 Oct 2025 20:21:10 +0100 Subject: [PATCH] disable cache --- src/main.rs | 27 +++------------------------ 1 file changed, 3 insertions(+), 24 deletions(-) diff --git a/src/main.rs b/src/main.rs index 30c006c..22a1e64 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,7 +17,6 @@ use std::{ path::PathBuf, sync::Arc, }; -use xdg::BaseDirectories; fn main() { let rt = tokio::runtime::Runtime::new().unwrap(); @@ -117,29 +116,9 @@ async fn cached_download( pb.enable_steady_tick(std::time::Duration::from_millis(100)); pb = m.add(pb); - let dirs = BaseDirectories::new(); - - let dir = dirs - .get_cache_home() - .expect("$HOME is absent") - .join("pdsfs"); - tokio::fs::create_dir_all(&dir).await?; - - let file = dir.join(&id.did); - let exists = std::fs::exists(&file)?; - - let bytes = if !exists { - pb.set_message(format!("downloading CAR file for...{}", id.did)); - download_car_file(id, &pb).await? - } else { - pb.set_message(format!("using cached CAR file for...{}", id.did)); - tokio::fs::read(&file).await? - }; - - // write to disk - if !exists { - tokio::fs::write(&file, &bytes).await?; - } + // Always download fresh - no caching for now to ensure up-to-date data + pb.set_message(format!("downloading CAR file for...{}", id.did)); + let bytes = download_car_file(id, &pb).await?; pb.finish(); Ok(bytes) -- 2.43.0 From 15b22fe26461a37e83785b1c14772a1398170567 Mon Sep 17 00:00:00 2001 From: Dan Abramov Date: Tue, 14 Oct 2025 00:12:21 +0100 Subject: [PATCH] watch firehose --- Cargo.lock | 108 ++++++++++++++-- Cargo.toml | 6 +- src/firehose.rs | 318 ++++++++++++++++++++++++++++++++++++++++++++++++ src/fs.rs | 178 ++++++++++++++++++--------- src/main.rs | 51 ++++++-- 5 files changed, 585 insertions(+), 76 deletions(-) create mode 100644 src/firehose.rs diff --git a/Cargo.lock b/Cargo.lock index fd5e8bc..ae76942 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -97,6 +97,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "anyhow" +version = "1.0.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" + [[package]] name = "async-channel" version = "1.9.0" @@ -328,6 +334,12 @@ version = "3.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.10.1" @@ -669,7 +681,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -991,7 +1003,7 @@ dependencies = [ "idna", "ipnet", "once_cell", - "rand", + "rand 0.9.2", "ring", "thiserror 2.0.12", "tinyvec", @@ -1013,7 +1025,7 @@ dependencies = [ "moka", "once_cell", "parking_lot", - "rand", + "rand 0.9.2", "resolv-conf", "smallvec", "thiserror 2.0.12", @@ -1757,6 +1769,7 @@ dependencies = [ name = "pdsfs" version = "0.1.0" dependencies = [ + "anyhow", "atrium-api", "atrium-common", "atrium-identity", @@ -1776,6 +1789,7 @@ dependencies = [ "serde_json", "thiserror 2.0.12", "tokio", + "tokio-tungstenite", "xdg", ] @@ -1887,14 +1901,35 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + [[package]] name = "rand" version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" dependencies = [ - "rand_chacha", - "rand_core", + "rand_chacha 0.9.0", + "rand_core 0.9.3", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core 0.6.4", ] [[package]] @@ -1904,7 +1939,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.9.3", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.16", ] [[package]] @@ -2057,7 +2101,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -2233,6 +2277,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.9" @@ -2514,6 +2569,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" +dependencies = [ + "futures-util", + "log", + "native-tls", + "tokio", + "tokio-native-tls", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.15" @@ -2662,6 +2731,25 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.3.1", + "httparse", + "log", + "native-tls", + "rand 0.8.5", + "sha1", + "thiserror 1.0.69", + "utf-8", +] + [[package]] name = "typenum" version = "1.18.0" @@ -2713,6 +2801,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8_iter" version = "1.0.4" diff --git a/Cargo.toml b/Cargo.toml index d899d0f..bf2d85f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2024" [dependencies] +anyhow = "1.0" atrium-api = "0.25.4" atrium-common = "0.1.2" atrium-identity = "0.1.5" @@ -11,7 +12,7 @@ atrium-repo = "0.1.4" atrium-xrpc = "0.12.3" atrium-xrpc-client = { version = "0.5.14", features=["isahc"] } clap = { version = "4.5.41", features = ["cargo"] } -fuser = "0.15.1" +fuser = { version = "0.15.1", features = ["abi-7-18"] } futures = "0.3.31" hickory-resolver = "0.25.2" indexmap = "2.10.0" @@ -22,5 +23,6 @@ reqwest = "0.12.22" serde_ipld_dagcbor = "0.6.3" serde_json = "1.0.141" thiserror = "2.0.12" -tokio = { version = "1.46.1", features = ["fs"] } +tokio = { version = "1.46.1", features = ["fs", "sync", "rt-multi-thread"] } +tokio-tungstenite = { version = "0.24", features = ["native-tls"] } xdg = "3.0.0" diff --git a/src/firehose.rs b/src/firehose.rs new file mode 100644 index 0000000..29c0e47 --- /dev/null +++ b/src/firehose.rs @@ -0,0 +1,318 @@ +use anyhow::{anyhow, Result}; +use atrium_api::com::atproto::sync::subscribe_repos::{Commit, NSID}; +use atrium_api::client::AtpServiceClient; +use atrium_api::com; +use atrium_api::types; +use atrium_xrpc_client::isahc::IsahcClient; +use futures::StreamExt; +use ipld_core::ipld::Ipld; +use std::io::Cursor; +use std::sync::{Arc, Mutex}; +use tokio_tungstenite::connect_async; +use tokio_tungstenite::tungstenite::Message; + +use crate::fs::{PdsFsCollection, PdsFsEntry, PdsFsRecord}; +use indexmap::{IndexMap, IndexSet}; + +/// Frame header types for WebSocket messages +#[derive(Debug, Clone, PartialEq, Eq)] +enum FrameHeader { + Message(Option), + Error, +} + +impl TryFrom for FrameHeader { + type Error = anyhow::Error; + + fn try_from(value: Ipld) -> Result { + if let Ipld::Map(map) = value { + if let Some(Ipld::Integer(i)) = map.get("op") { + match i { + 1 => { + let t = if let Some(Ipld::String(s)) = map.get("t") { + Some(s.clone()) + } else { + None + }; + return Ok(FrameHeader::Message(t)); + } + -1 => return Ok(FrameHeader::Error), + _ => {} + } + } + } + Err(anyhow!("invalid frame type")) + } +} + +/// Frame types for parsed WebSocket messages +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Frame { + Message(Option, MessageFrame), + Error(ErrorFrame), +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct MessageFrame { + pub body: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ErrorFrame {} + +impl TryFrom<&[u8]> for Frame { + type Error = anyhow::Error; + + fn try_from(value: &[u8]) -> Result { + let mut cursor = Cursor::new(value); + let (left, right) = match serde_ipld_dagcbor::from_reader::(&mut cursor) { + Err(serde_ipld_dagcbor::DecodeError::TrailingData) => { + value.split_at(cursor.position() as usize) + } + _ => { + return Err(anyhow!("invalid frame type")); + } + }; + let header = FrameHeader::try_from(serde_ipld_dagcbor::from_slice::(left)?)?; + if let FrameHeader::Message(t) = &header { + Ok(Frame::Message(t.clone(), MessageFrame { body: right.to_vec() })) + } else { + Ok(Frame::Error(ErrorFrame {})) + } + } +} + +/// Subscribe to a repo's firehose and update inodes on changes +pub async fn subscribe_to_repo( + did: String, + pds: String, + inodes: Arc>>, + sizes: Arc>>, + content_cache: Arc>>, + notifier: fuser::Notifier, +) -> Result<()> +where + R: atrium_repo::blockstore::AsyncBlockStoreRead, +{ + // Strip https:// or http:// prefix from PDS URL if present + let pds_host = pds.trim_start_matches("https://").trim_start_matches("http://"); + let url = format!("wss://{}/xrpc/{}", pds_host, NSID); + println!("Connecting to firehose: {}", url); + + let (mut stream, _) = connect_async(url).await?; + println!("Connected to firehose for {}", did); + + loop { + match stream.next().await { + Some(Ok(Message::Binary(data))) => { + if let Ok(Frame::Message(Some(t), msg)) = Frame::try_from(data.as_slice()) { + if t.as_str() == "#commit" { + if let Ok(commit) = serde_ipld_dagcbor::from_reader::(msg.body.as_slice()) { + // Only process commits for our DID + if commit.repo.as_str() == did { + if let Err(e) = handle_commit(&commit, &inodes, &sizes, &content_cache, &did, &pds, ¬ifier).await { + eprintln!("Error handling commit: {:?}", e); + } + } + } + } + } + } + Some(Ok(_)) => {} // Ignore other message types + Some(Err(e)) => { + eprintln!("WebSocket error: {}", e); + break; + } + None => { + eprintln!("WebSocket closed"); + break; + } + } + } + + Ok(()) +} + +/// Handle a commit by updating the inode tree and notifying Finder +async fn handle_commit( + commit: &Commit, + inodes: &Arc>>, + sizes: &Arc>>, + content_cache: &Arc>>, + did: &str, + pds: &str, + notifier: &fuser::Notifier, +) -> Result<()> { + // Find the DID inode + let did_entry = PdsFsEntry::Did(did.to_string()); + let did_inode = { + let inodes_lock = inodes.lock().unwrap(); + inodes_lock.get_index_of(&did_entry) + }; + + let Some(did_inode) = did_inode else { + return Err(anyhow!("DID not found in inodes")); + }; + + for op in &commit.ops { + let Some((collection, rkey)) = op.path.split_once('/') else { + continue; + }; + + match op.action.as_str() { + "create" => { + // Fetch the record from PDS + let record_key = format!("{}/{}", collection, rkey); + let cache_key = format!("{}/{}", did, record_key); + + // Fetch record content from PDS + match fetch_record(pds, did, collection, rkey).await { + Ok(content) => { + let content_len = content.len() as u64; + + // Add the record to inodes + let (collection_inode, record_inode) = { + let mut inodes_lock = inodes.lock().unwrap(); + + // Ensure collection exists + let collection_entry = PdsFsEntry::Collection(PdsFsCollection { + parent: did_inode, + nsid: collection.to_string(), + }); + let (collection_inode, _) = inodes_lock.insert_full(collection_entry); + + // Add the record + let record_entry = PdsFsEntry::Record(PdsFsRecord { + parent: collection_inode, + rkey: rkey.to_string(), + }); + let (record_inode, _) = inodes_lock.insert_full(record_entry); + (collection_inode, record_inode) + }; + + // Cache the content and size + content_cache.lock().unwrap().insert(cache_key, content); + sizes.lock().unwrap().insert(record_inode, content_len); + + // Notify Finder about the new file (release lock first) + let filename = format!("{}.json", rkey); + if let Err(e) = notifier.inval_entry(collection_inode as u64, filename.as_ref()) { + eprintln!("Failed to invalidate entry for {}: {}", filename, e); + } + + println!("Created: {}/{}", collection, rkey); + } + Err(e) => { + eprintln!("Failed to fetch record {}/{}: {}", collection, rkey, e); + } + } + } + "delete" => { + // Get inodes before removing + let (collection_inode_opt, child_inode_opt) = { + let mut inodes_lock = inodes.lock().unwrap(); + + // Find the collection + let collection_entry = PdsFsEntry::Collection(PdsFsCollection { + parent: did_inode, + nsid: collection.to_string(), + }); + let collection_inode = inodes_lock.get_index_of(&collection_entry); + + // Find and remove the record + let child_inode = if let Some(coll_ino) = collection_inode { + let record_entry = PdsFsEntry::Record(PdsFsRecord { + parent: coll_ino, + rkey: rkey.to_string(), + }); + let child_ino = inodes_lock.get_index_of(&record_entry); + inodes_lock.shift_remove(&record_entry); + child_ino + } else { + None + }; + + (collection_inode, child_inode) + }; + + // Notify Finder about the deletion (release lock first) + if let (Some(coll_ino), Some(child_ino)) = (collection_inode_opt, child_inode_opt) { + // Remove from caches + sizes.lock().unwrap().shift_remove(&child_ino); + let cache_key = format!("{}/{}/{}", did, collection, rkey); + content_cache.lock().unwrap().shift_remove(&cache_key); + + let filename = format!("{}.json", rkey); + if let Err(e) = notifier.delete(coll_ino as u64, child_ino as u64, filename.as_ref()) { + eprintln!("Failed to notify deletion for {}: {}", filename, e); + } + } + + println!("Deleted: {}/{}", collection, rkey); + } + "update" => { + // For updates, invalidate the inode so content is re-fetched + let record_inode_opt = { + let inodes_lock = inodes.lock().unwrap(); + let collection_entry = PdsFsEntry::Collection(PdsFsCollection { + parent: did_inode, + nsid: collection.to_string(), + }); + + if let Some(collection_inode) = inodes_lock.get_index_of(&collection_entry) { + let record_entry = PdsFsEntry::Record(PdsFsRecord { + parent: collection_inode, + rkey: rkey.to_string(), + }); + inodes_lock.get_index_of(&record_entry) + } else { + None + } + }; + + // Notify Finder to invalidate the inode (release lock first) + if let Some(record_ino) = record_inode_opt { + // Clear caches so content is recalculated + sizes.lock().unwrap().shift_remove(&record_ino); + let cache_key = format!("{}/{}/{}", did, collection, rkey); + content_cache.lock().unwrap().shift_remove(&cache_key); + + // Invalidate the entire inode (metadata and all data) + if let Err(e) = notifier.inval_inode(record_ino as u64, 0, 0) { + eprintln!("Failed to invalidate inode for {}/{}: {}", collection, rkey, e); + } + } + + println!("Updated: {}/{}", collection, rkey); + } + _ => {} + } + } + + Ok(()) +} + +/// Fetch a record from the PDS +async fn fetch_record(pds: &str, did: &str, collection: &str, rkey: &str) -> Result { + let client = AtpServiceClient::new(IsahcClient::new(pds)); + let did = types::string::Did::new(did.to_string()).map_err(|e| anyhow!(e))?; + let collection_nsid = types::string::Nsid::new(collection.to_string()).map_err(|e| anyhow!(e))?; + let record_key = types::string::RecordKey::new(rkey.to_string()).map_err(|e| anyhow!(e))?; + + let response = client + .service + .com + .atproto + .repo + .get_record(com::atproto::repo::get_record::Parameters::from( + com::atproto::repo::get_record::ParametersData { + cid: None, + collection: collection_nsid, + repo: types::string::AtIdentifier::Did(did), + rkey: record_key, + } + )) + .await?; + + Ok(serde_json::to_string_pretty(&response.value)?) +} diff --git a/src/fs.rs b/src/fs.rs index 8f9066b..b1fce09 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -1,3 +1,4 @@ +use std::sync::{Arc, Mutex}; use std::time::{self, SystemTime, UNIX_EPOCH, Duration}; use atrium_repo::{Repository, blockstore::AsyncBlockStoreRead}; @@ -28,8 +29,10 @@ fn tid_to_timestamp(tid: &str) -> Option { } pub struct PdsFs { - repos: IndexMap>, - inodes: IndexSet, + repos: Arc>>>, + inodes: Arc>>, + sizes: Arc>>, + content_cache: Arc>>, rt: tokio::runtime::Runtime, } @@ -68,14 +71,14 @@ impl PdsFsEntry { #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct PdsFsCollection { - parent: Inode, - nsid: String, + pub parent: Inode, + pub nsid: String, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct PdsFsRecord { - parent: Inode, - rkey: String, + pub parent: Inode, + pub rkey: String, } // impl PdsFsRecord { @@ -111,28 +114,37 @@ where { pub fn new() -> Self { PdsFs { - repos: Default::default(), - inodes: IndexSet::from([PdsFsEntry::Zero, PdsFsEntry::Root]), + repos: Arc::new(Mutex::new(Default::default())), + inodes: Arc::new(Mutex::new(IndexSet::from([PdsFsEntry::Zero, PdsFsEntry::Root]))), + sizes: Arc::new(Mutex::new(Default::default())), + content_cache: Arc::new(Mutex::new(Default::default())), rt: tokio::runtime::Runtime::new().unwrap(), } } + pub fn get_shared_state(&self) -> (Arc>>>, Arc>>, Arc>>, Arc>>) { + (Arc::clone(&self.repos), Arc::clone(&self.inodes), Arc::clone(&self.sizes), Arc::clone(&self.content_cache)) + } + pub async fn add(&mut self, did: String, mut repo: Repository) { let mut mst = repo.tree(); - let (did_inode, _) = self.inodes.insert_full(PdsFsEntry::Did(did.clone())); + let did_inode = { + let mut inodes = self.inodes.lock().unwrap(); + let (did_inode, _) = inodes.insert_full(PdsFsEntry::Did(did.clone())); + did_inode + }; let mut keys = Box::pin(mst.keys()); while let Some(Ok(key)) = keys.next().await { if let Some((collection_name, rkey)) = key.split_once("/") { - let (collection_inode, _) = - self.inodes - .insert_full(PdsFsEntry::Collection(PdsFsCollection { - parent: did_inode, - nsid: collection_name.to_owned(), - })); - - self.inodes.insert(PdsFsEntry::Record(PdsFsRecord { + let mut inodes = self.inodes.lock().unwrap(); + let (collection_inode, _) = inodes.insert_full(PdsFsEntry::Collection(PdsFsCollection { + parent: did_inode, + nsid: collection_name.to_owned(), + })); + + inodes.insert(PdsFsEntry::Record(PdsFsRecord { parent: collection_inode, rkey: rkey.to_owned(), })); @@ -142,11 +154,12 @@ where drop(keys); drop(mst); - self.repos.insert(did, repo); + self.repos.lock().unwrap().insert(did, repo); } fn attr(&mut self, ino: u64) -> fuser::FileAttr { - match self.inodes.get_index(ino as usize) { + let inodes = self.inodes.lock().unwrap(); + match inodes.get_index(ino as usize) { Some(PdsFsEntry::Root) => ROOTDIR_ATTR, Some(PdsFsEntry::Collection(_)) => fuser::FileAttr { ino, @@ -183,21 +196,39 @@ where blksize: BLKSIZE, }, Some(PdsFsEntry::Record(r)) => { - let col = self.inodes[r.parent].unwrap_collection(); - let did = self.inodes[col.parent].unwrap_did(); - let repo = &mut self.repos[did]; - let key = format!("{}/{}", col.nsid, r.rkey); - let size = self - .rt - .block_on(repo.get_raw::(&key)) - .ok() - .flatten() - .map_or(0, |v| serde_json::to_string_pretty(&v).unwrap().len()) - as u64; + let col = inodes[r.parent].unwrap_collection(); + let did = inodes[col.parent].unwrap_did().clone(); + let rkey = r.rkey.clone(); + let collection_nsid = col.nsid.clone(); + drop(inodes); + + // Check cache first + let size = { + let sizes = self.sizes.lock().unwrap(); + if let Some(&cached_size) = sizes.get(&(ino as usize)) { + cached_size + } else { + drop(sizes); + // Not in cache, try to fetch from repo + let mut repos = self.repos.lock().unwrap(); + let repo = &mut repos[&did]; + let key = format!("{}/{}", collection_nsid, rkey); + let size = self + .rt + .block_on(repo.get_raw::(&key)) + .ok() + .flatten() + .map_or(500, |v| serde_json::to_string_pretty(&v).unwrap().len()) + as u64; + // Cache it for next time + self.sizes.lock().unwrap().insert(ino as usize, size); + size + } + }; let blocks = ((size as u32 + BLKSIZE - 1) / BLKSIZE) as u64; // Decode TID to get creation timestamp - let timestamp = tid_to_timestamp(&r.rkey).unwrap_or(time::UNIX_EPOCH); + let timestamp = tid_to_timestamp(&rkey).unwrap_or(time::UNIX_EPOCH); fuser::FileAttr { ino, @@ -214,7 +245,7 @@ where gid: 20, rdev: 0, flags: 0, - blksize: 512, + blksize: BLKSIZE, } } _ => panic!("zero"), @@ -233,7 +264,8 @@ where _fh: Option, reply: fuser::ReplyAttr, ) { - if (ino as usize) < self.inodes.len() { + let len = self.inodes.lock().unwrap().len(); + if (ino as usize) < len { reply.attr(&TTL, &self.attr(ino as u64)) } else { reply.error(libc::ENOENT) @@ -248,11 +280,12 @@ where offset: i64, mut reply: fuser::ReplyDirectory, ) { - match self.inodes.get_index(ino as usize) { + let inodes = self.inodes.lock().unwrap(); + match inodes.get_index(ino as usize) { Some(PdsFsEntry::Root) => { let entries: Vec<_> = vec![(ino, ".".to_string()), (ino, "..".to_string())] .into_iter() - .chain(self.inodes.iter().enumerate().filter_map(|(i, e)| { + .chain(inodes.iter().enumerate().filter_map(|(i, e)| { if let PdsFsEntry::Did(did) = e { Some((i as u64, did.clone())) } else { @@ -260,6 +293,7 @@ where } })) .collect(); + drop(inodes); for (index, (inode_num, name)) in entries.into_iter().enumerate().skip(offset as usize) @@ -276,9 +310,9 @@ where reply.ok() } Some(PdsFsEntry::Did(_)) => { - let entries = vec![(ino, ".".to_string()), (1, "..".to_string())] + let entries: Vec<_> = vec![(ino, ".".to_string()), (1, "..".to_string())] .into_iter() - .chain(self.inodes.iter().enumerate().filter_map(|(i, e)| { + .chain(inodes.iter().enumerate().filter_map(|(i, e)| { if let PdsFsEntry::Collection(col) = e { if col.parent == ino as usize { Some((i as u64, col.nsid.clone())) @@ -289,11 +323,10 @@ where None } })) - .into_iter() - .enumerate() - .skip(offset as usize); + .collect(); + drop(inodes); - for (index, (inode_num, name)) in entries { + for (index, (inode_num, name)) in entries.into_iter().enumerate().skip(offset as usize) { let full = reply.add( inode_num, (index + 1) as i64, @@ -312,9 +345,10 @@ where reply.ok(); } Some(PdsFsEntry::Collection(c)) => { - let entries = [(ino, ".".to_string()), (c.parent as u64, "..".to_string())] + let parent_ino = c.parent; + let entries: Vec<_> = [(ino, ".".to_string()), (parent_ino as u64, "..".to_string())] .into_iter() - .chain(self.inodes.iter().enumerate().filter_map(|(i, e)| { + .chain(inodes.iter().enumerate().filter_map(|(i, e)| { if let PdsFsEntry::Record(record) = e { if record.parent == ino as usize { Some((i as u64, format!("{}.json", record.rkey))) @@ -325,11 +359,10 @@ where None } })) - .into_iter() - .enumerate() - .skip(offset as usize); + .collect(); + drop(inodes); - for (index, (inode_num, name)) in entries { + for (index, (inode_num, name)) in entries.into_iter().enumerate().skip(offset as usize) { let full = reply.add( inode_num, (index + 1) as i64, @@ -347,7 +380,10 @@ where reply.ok() } - _ => reply.error(libc::ENOENT), + _ => { + drop(inodes); + reply.error(libc::ENOENT) + } } } @@ -358,12 +394,15 @@ where name: &std::ffi::OsStr, reply: fuser::ReplyEntry, ) { - match self.inodes.get_index(parent as usize) { + let inodes = self.inodes.lock().unwrap(); + match inodes.get_index(parent as usize) { Some(PdsFsEntry::Root) => { let did = PdsFsEntry::Did(name.to_string_lossy().to_string()); - if let Some(ino) = self.inodes.get_index_of(&did) { + if let Some(ino) = inodes.get_index_of(&did) { + drop(inodes); reply.entry(&TTL, &self.attr(ino as u64), 0); } else { + drop(inodes); reply.error(libc::ENOENT) } } @@ -372,9 +411,11 @@ where parent: parent as usize, nsid: name.to_string_lossy().to_string(), }); - if let Some(ino) = self.inodes.get_index_of(&col) { + if let Some(ino) = inodes.get_index_of(&col) { + drop(inodes); reply.entry(&TTL, &self.attr(ino as u64), 0); } else { + drop(inodes); reply.error(libc::ENOENT) } } @@ -385,13 +426,18 @@ where parent: parent as usize, rkey, }); - if let Some(ino) = self.inodes.get_index_of(&record) { + if let Some(ino) = inodes.get_index_of(&record) { + drop(inodes); reply.entry(&TTL, &self.attr(ino as u64), 0); } else { + drop(inodes); reply.error(libc::ENOENT) } } - _ => reply.error(libc::ENOENT), + _ => { + drop(inodes); + reply.error(libc::ENOENT) + } } } @@ -406,16 +452,32 @@ where _lock: Option, reply: fuser::ReplyData, ) { - if let Some(PdsFsEntry::Record(r)) = self.inodes.get_index(ino as usize) { - let col = self.inodes[r.parent].unwrap_collection(); - let did = self.inodes[col.parent].unwrap_did(); - let repo = &mut self.repos[did]; + let inodes = self.inodes.lock().unwrap(); + if let Some(PdsFsEntry::Record(r)) = inodes.get_index(ino as usize) { + let col = inodes[r.parent].unwrap_collection(); + let did = inodes[col.parent].unwrap_did().clone(); let key = format!("{}/{}", col.nsid, r.rkey); + let cache_key = format!("{}/{}", did, key); + drop(inodes); + + // Check content cache first (for new records from firehose) + { + let cache = self.content_cache.lock().unwrap(); + if let Some(content) = cache.get(&cache_key) { + reply.data(&content.as_bytes()[offset as usize..]); + return; + } + } + // Fall back to repo + let mut repos = self.repos.lock().unwrap(); + let repo = &mut repos[&did]; if let Ok(Some(val)) = self.rt.block_on(repo.get_raw::(&key)) { - reply.data(&serde_json::to_string_pretty(&val).unwrap().as_bytes()[offset as usize..]); + reply.data(&serde_json::to_string(&val).unwrap().as_bytes()[offset as usize..]); return; } + } else { + drop(inodes); } reply.error(libc::ENOENT); } diff --git a/src/main.rs b/src/main.rs index 22a1e64..f54faf9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ mod client; mod error; +mod firehose; mod fs; mod resolver; @@ -12,7 +13,6 @@ use fuser::MountOption; use futures::{StreamExt, stream}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use std::{ - collections::HashMap, io::{Cursor, Write}, path::PathBuf, sync::Arc, @@ -58,7 +58,7 @@ fn main() { let id = r.resolve(&h).await?; let bytes = cached_download(&id, &b).await?; let repo = build_repo(bytes).await?; - Ok::<_, error::Error>((id.did, repo)) + Ok::<_, error::Error>((id.did.clone(), id.pds.clone(), repo)) } }) .collect::>(), @@ -67,17 +67,27 @@ fn main() { for e in errors { eprintln!("{:?}", e.as_ref().unwrap_err()); } - let repos = success + let repos_with_pds: Vec<_> = success .into_iter() .map(|s| s.unwrap()) - .collect::>(); + .collect(); // construct the fs let mut fs = fs::PdsFs::new(); - for (did, repo) in repos { + + // Extract (did, pds) pairs for WebSocket tasks before consuming repos + let did_pds_pairs: Vec<_> = repos_with_pds.iter() + .map(|(did, pds, _)| (did.clone(), pds.clone())) + .collect(); + + // Consume repos_with_pds to add repos to filesystem + for (did, _, repo) in repos_with_pds { rt.block_on(fs.add(did, repo)) } + // get shared state for WebSocket tasks + let (_repos_arc, inodes_arc, sizes_arc, content_cache_arc) = fs.get_shared_state(); + // mount let options = vec![ MountOption::RO, @@ -86,7 +96,33 @@ fn main() { MountOption::CUSTOM("local".to_string()), MountOption::CUSTOM("volname=pdsfs".to_string()), ]; - let join_handle = fuser::spawn_mount2(fs, &mountpoint, &options).unwrap(); + + // Create session and get notifier for Finder refresh + let session = fuser::Session::new(fs, &mountpoint, &options).unwrap(); + let notifier = session.notifier(); + let _bg = session.spawn().unwrap(); + + // spawn WebSocket subscription tasks for each DID using the runtime handle + let rt_handle = rt.handle().clone(); + for (did, pds) in did_pds_pairs { + let inodes_clone = Arc::clone(&inodes_arc); + let sizes_clone = Arc::clone(&sizes_arc); + let content_cache_clone = Arc::clone(&content_cache_arc); + let notifier_clone = notifier.clone(); + + rt_handle.spawn(async move { + if let Err(e) = firehose::subscribe_to_repo::>>>( + did, + pds, + inodes_clone, + sizes_clone, + content_cache_clone, + notifier_clone, + ).await { + eprintln!("WebSocket error: {:?}", e); + } + }); + } println!("mounted at {mountpoint:?}"); print!("hit enter to unmount and exit..."); @@ -96,9 +132,6 @@ fn main() { let mut input = String::new(); std::io::stdin().read_line(&mut input).unwrap(); - join_handle.join(); - std::fs::remove_dir(&mountpoint).unwrap(); - println!("unmounted {mountpoint:?}"); } -- 2.43.0