Watch live changes from the firehose #11

closed
opened by danabra.mov targeting main from danabra.mov/pdsfs: firehose
Changed files
+543 -67
src
+101 -7
Cargo.lock
···
"windows-sys 0.59.0",
]
+
[[package]]
+
name = "anyhow"
+
version = "1.0.100"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61"
+
[[package]]
name = "async-channel"
version = "1.9.0"
···
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43"
+
[[package]]
+
name = "byteorder"
+
version = "1.5.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
+
[[package]]
name = "bytes"
version = "1.10.1"
···
checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad"
dependencies = [
"libc",
-
"windows-sys 0.59.0",
+
"windows-sys 0.60.2",
]
[[package]]
···
"idna",
"ipnet",
"once_cell",
-
"rand",
+
"rand 0.9.2",
"ring",
"thiserror 2.0.12",
"tinyvec",
···
"moka",
"once_cell",
"parking_lot",
-
"rand",
+
"rand 0.9.2",
"resolv-conf",
"smallvec",
"thiserror 2.0.12",
···
name = "pdsfs"
version = "0.1.0"
dependencies = [
+
"anyhow",
"atrium-api",
"atrium-common",
"atrium-identity",
···
"serde_json",
"thiserror 2.0.12",
"tokio",
+
"tokio-tungstenite",
"xdg",
···
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
+
[[package]]
+
name = "rand"
+
version = "0.8.5"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
+
dependencies = [
+
"libc",
+
"rand_chacha 0.3.1",
+
"rand_core 0.6.4",
+
]
+
[[package]]
name = "rand"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1"
dependencies = [
-
"rand_chacha",
-
"rand_core",
+
"rand_chacha 0.9.0",
+
"rand_core 0.9.3",
+
]
+
+
[[package]]
+
name = "rand_chacha"
+
version = "0.3.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
+
dependencies = [
+
"ppv-lite86",
+
"rand_core 0.6.4",
[[package]]
···
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
dependencies = [
"ppv-lite86",
-
"rand_core",
+
"rand_core 0.9.3",
+
]
+
+
[[package]]
+
name = "rand_core"
+
version = "0.6.4"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
+
dependencies = [
+
"getrandom 0.2.16",
[[package]]
···
"errno",
"libc",
"linux-raw-sys",
-
"windows-sys 0.59.0",
+
"windows-sys 0.60.2",
[[package]]
···
"serde",
+
[[package]]
+
name = "sha1"
+
version = "0.10.6"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
+
dependencies = [
+
"cfg-if",
+
"cpufeatures",
+
"digest",
+
]
+
[[package]]
name = "sha2"
version = "0.10.9"
···
"tokio",
+
[[package]]
+
name = "tokio-tungstenite"
+
version = "0.24.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9"
+
dependencies = [
+
"futures-util",
+
"log",
+
"native-tls",
+
"tokio",
+
"tokio-native-tls",
+
"tungstenite",
+
]
+
[[package]]
name = "tokio-util"
version = "0.7.15"
···
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
+
[[package]]
+
name = "tungstenite"
+
version = "0.24.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a"
+
dependencies = [
+
"byteorder",
+
"bytes",
+
"data-encoding",
+
"http 1.3.1",
+
"httparse",
+
"log",
+
"native-tls",
+
"rand 0.8.5",
+
"sha1",
+
"thiserror 1.0.69",
+
"utf-8",
+
]
+
[[package]]
name = "typenum"
version = "1.18.0"
···
"percent-encoding",
+
[[package]]
+
name = "utf-8"
+
version = "0.7.6"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
+
[[package]]
name = "utf8_iter"
version = "1.0.4"
+4 -2
Cargo.toml
···
edition = "2024"
[dependencies]
+
anyhow = "1.0"
atrium-api = "0.25.4"
atrium-common = "0.1.2"
atrium-identity = "0.1.5"
···
atrium-xrpc = "0.12.3"
atrium-xrpc-client = { version = "0.5.14", features=["isahc"] }
clap = { version = "4.5.41", features = ["cargo"] }
-
fuser = "0.15.1"
+
fuser = { version = "0.15.1", features = ["abi-7-18"] }
futures = "0.3.31"
hickory-resolver = "0.25.2"
indexmap = "2.10.0"
···
serde_ipld_dagcbor = "0.6.3"
serde_json = "1.0.141"
thiserror = "2.0.12"
-
tokio = { version = "1.46.1", features = ["fs"] }
+
tokio = { version = "1.46.1", features = ["fs", "sync", "rt-multi-thread"] }
+
tokio-tungstenite = { version = "0.24", features = ["native-tls"] }
xdg = "3.0.0"
+318
src/firehose.rs
···
+
use anyhow::{anyhow, Result};
+
use atrium_api::com::atproto::sync::subscribe_repos::{Commit, NSID};
+
use atrium_api::client::AtpServiceClient;
+
use atrium_api::com;
+
use atrium_api::types;
+
use atrium_xrpc_client::isahc::IsahcClient;
+
use futures::StreamExt;
+
use ipld_core::ipld::Ipld;
+
use std::io::Cursor;
+
use std::sync::{Arc, Mutex};
+
use tokio_tungstenite::connect_async;
+
use tokio_tungstenite::tungstenite::Message;
+
+
use crate::fs::{PdsFsCollection, PdsFsEntry, PdsFsRecord};
+
use indexmap::{IndexMap, IndexSet};
+
+
/// Frame header types for WebSocket messages
+
#[derive(Debug, Clone, PartialEq, Eq)]
+
enum FrameHeader {
+
Message(Option<String>),
+
Error,
+
}
+
+
impl TryFrom<Ipld> for FrameHeader {
+
type Error = anyhow::Error;
+
+
fn try_from(value: Ipld) -> Result<Self> {
+
if let Ipld::Map(map) = value {
+
if let Some(Ipld::Integer(i)) = map.get("op") {
+
match i {
+
1 => {
+
let t = if let Some(Ipld::String(s)) = map.get("t") {
+
Some(s.clone())
+
} else {
+
None
+
};
+
return Ok(FrameHeader::Message(t));
+
}
+
-1 => return Ok(FrameHeader::Error),
+
_ => {}
+
}
+
}
+
}
+
Err(anyhow!("invalid frame type"))
+
}
+
}
+
+
/// Frame types for parsed WebSocket messages
+
#[derive(Debug, Clone, PartialEq, Eq)]
+
pub enum Frame {
+
Message(Option<String>, MessageFrame),
+
Error(ErrorFrame),
+
}
+
+
#[derive(Debug, Clone, PartialEq, Eq)]
+
pub struct MessageFrame {
+
pub body: Vec<u8>,
+
}
+
+
#[derive(Debug, Clone, PartialEq, Eq)]
+
pub struct ErrorFrame {}
+
+
impl TryFrom<&[u8]> for Frame {
+
type Error = anyhow::Error;
+
+
fn try_from(value: &[u8]) -> Result<Self> {
+
let mut cursor = Cursor::new(value);
+
let (left, right) = match serde_ipld_dagcbor::from_reader::<Ipld, _>(&mut cursor) {
+
Err(serde_ipld_dagcbor::DecodeError::TrailingData) => {
+
value.split_at(cursor.position() as usize)
+
}
+
_ => {
+
return Err(anyhow!("invalid frame type"));
+
}
+
};
+
let header = FrameHeader::try_from(serde_ipld_dagcbor::from_slice::<Ipld>(left)?)?;
+
if let FrameHeader::Message(t) = &header {
+
Ok(Frame::Message(t.clone(), MessageFrame { body: right.to_vec() }))
+
} else {
+
Ok(Frame::Error(ErrorFrame {}))
+
}
+
}
+
}
+
+
/// Subscribe to a repo's firehose and update inodes on changes
+
pub async fn subscribe_to_repo<R>(
+
did: String,
+
pds: String,
+
inodes: Arc<Mutex<IndexSet<PdsFsEntry>>>,
+
sizes: Arc<Mutex<IndexMap<usize, u64>>>,
+
content_cache: Arc<Mutex<IndexMap<String, String>>>,
+
notifier: fuser::Notifier,
+
) -> Result<()>
+
where
+
R: atrium_repo::blockstore::AsyncBlockStoreRead,
+
{
+
// Strip https:// or http:// prefix from PDS URL if present
+
let pds_host = pds.trim_start_matches("https://").trim_start_matches("http://");
+
let url = format!("wss://{}/xrpc/{}", pds_host, NSID);
+
println!("Connecting to firehose: {}", url);
+
+
let (mut stream, _) = connect_async(url).await?;
+
println!("Connected to firehose for {}", did);
+
+
loop {
+
match stream.next().await {
+
Some(Ok(Message::Binary(data))) => {
+
if let Ok(Frame::Message(Some(t), msg)) = Frame::try_from(data.as_slice()) {
+
if t.as_str() == "#commit" {
+
if let Ok(commit) = serde_ipld_dagcbor::from_reader::<Commit, _>(msg.body.as_slice()) {
+
// Only process commits for our DID
+
if commit.repo.as_str() == did {
+
if let Err(e) = handle_commit(&commit, &inodes, &sizes, &content_cache, &did, &pds, &notifier).await {
+
eprintln!("Error handling commit: {:?}", e);
+
}
+
}
+
}
+
}
+
}
+
}
+
Some(Ok(_)) => {} // Ignore other message types
+
Some(Err(e)) => {
+
eprintln!("WebSocket error: {}", e);
+
break;
+
}
+
None => {
+
eprintln!("WebSocket closed");
+
break;
+
}
+
}
+
}
+
+
Ok(())
+
}
+
+
/// Handle a commit by updating the inode tree and notifying Finder
+
async fn handle_commit(
+
commit: &Commit,
+
inodes: &Arc<Mutex<IndexSet<PdsFsEntry>>>,
+
sizes: &Arc<Mutex<IndexMap<usize, u64>>>,
+
content_cache: &Arc<Mutex<IndexMap<String, String>>>,
+
did: &str,
+
pds: &str,
+
notifier: &fuser::Notifier,
+
) -> Result<()> {
+
// Find the DID inode
+
let did_entry = PdsFsEntry::Did(did.to_string());
+
let did_inode = {
+
let inodes_lock = inodes.lock().unwrap();
+
inodes_lock.get_index_of(&did_entry)
+
};
+
+
let Some(did_inode) = did_inode else {
+
return Err(anyhow!("DID not found in inodes"));
+
};
+
+
for op in &commit.ops {
+
let Some((collection, rkey)) = op.path.split_once('/') else {
+
continue;
+
};
+
+
match op.action.as_str() {
+
"create" => {
+
// Fetch the record from PDS
+
let record_key = format!("{}/{}", collection, rkey);
+
let cache_key = format!("{}/{}", did, record_key);
+
+
// Fetch record content from PDS
+
match fetch_record(pds, did, collection, rkey).await {
+
Ok(content) => {
+
let content_len = content.len() as u64;
+
+
// Add the record to inodes
+
let (collection_inode, record_inode) = {
+
let mut inodes_lock = inodes.lock().unwrap();
+
+
// Ensure collection exists
+
let collection_entry = PdsFsEntry::Collection(PdsFsCollection {
+
parent: did_inode,
+
nsid: collection.to_string(),
+
});
+
let (collection_inode, _) = inodes_lock.insert_full(collection_entry);
+
+
// Add the record
+
let record_entry = PdsFsEntry::Record(PdsFsRecord {
+
parent: collection_inode,
+
rkey: rkey.to_string(),
+
});
+
let (record_inode, _) = inodes_lock.insert_full(record_entry);
+
(collection_inode, record_inode)
+
};
+
+
// Cache the content and size
+
content_cache.lock().unwrap().insert(cache_key, content);
+
sizes.lock().unwrap().insert(record_inode, content_len);
+
+
// Notify Finder about the new file (release lock first)
+
let filename = format!("{}.json", rkey);
+
if let Err(e) = notifier.inval_entry(collection_inode as u64, filename.as_ref()) {
+
eprintln!("Failed to invalidate entry for {}: {}", filename, e);
+
}
+
+
println!("Created: {}/{}", collection, rkey);
+
}
+
Err(e) => {
+
eprintln!("Failed to fetch record {}/{}: {}", collection, rkey, e);
+
}
+
}
+
}
+
"delete" => {
+
// Get inodes before removing
+
let (collection_inode_opt, child_inode_opt) = {
+
let mut inodes_lock = inodes.lock().unwrap();
+
+
// Find the collection
+
let collection_entry = PdsFsEntry::Collection(PdsFsCollection {
+
parent: did_inode,
+
nsid: collection.to_string(),
+
});
+
let collection_inode = inodes_lock.get_index_of(&collection_entry);
+
+
// Find and remove the record
+
let child_inode = if let Some(coll_ino) = collection_inode {
+
let record_entry = PdsFsEntry::Record(PdsFsRecord {
+
parent: coll_ino,
+
rkey: rkey.to_string(),
+
});
+
let child_ino = inodes_lock.get_index_of(&record_entry);
+
inodes_lock.shift_remove(&record_entry);
+
child_ino
+
} else {
+
None
+
};
+
+
(collection_inode, child_inode)
+
};
+
+
// Notify Finder about the deletion (release lock first)
+
if let (Some(coll_ino), Some(child_ino)) = (collection_inode_opt, child_inode_opt) {
+
// Remove from caches
+
sizes.lock().unwrap().shift_remove(&child_ino);
+
let cache_key = format!("{}/{}/{}", did, collection, rkey);
+
content_cache.lock().unwrap().shift_remove(&cache_key);
+
+
let filename = format!("{}.json", rkey);
+
if let Err(e) = notifier.delete(coll_ino as u64, child_ino as u64, filename.as_ref()) {
+
eprintln!("Failed to notify deletion for {}: {}", filename, e);
+
}
+
}
+
+
println!("Deleted: {}/{}", collection, rkey);
+
}
+
"update" => {
+
// For updates, invalidate the inode so content is re-fetched
+
let record_inode_opt = {
+
let inodes_lock = inodes.lock().unwrap();
+
let collection_entry = PdsFsEntry::Collection(PdsFsCollection {
+
parent: did_inode,
+
nsid: collection.to_string(),
+
});
+
+
if let Some(collection_inode) = inodes_lock.get_index_of(&collection_entry) {
+
let record_entry = PdsFsEntry::Record(PdsFsRecord {
+
parent: collection_inode,
+
rkey: rkey.to_string(),
+
});
+
inodes_lock.get_index_of(&record_entry)
+
} else {
+
None
+
}
+
};
+
+
// Notify Finder to invalidate the inode (release lock first)
+
if let Some(record_ino) = record_inode_opt {
+
// Clear caches so content is recalculated
+
sizes.lock().unwrap().shift_remove(&record_ino);
+
let cache_key = format!("{}/{}/{}", did, collection, rkey);
+
content_cache.lock().unwrap().shift_remove(&cache_key);
+
+
// Invalidate the entire inode (metadata and all data)
+
if let Err(e) = notifier.inval_inode(record_ino as u64, 0, 0) {
+
eprintln!("Failed to invalidate inode for {}/{}: {}", collection, rkey, e);
+
}
+
}
+
+
println!("Updated: {}/{}", collection, rkey);
+
}
+
_ => {}
+
}
+
}
+
+
Ok(())
+
}
+
+
/// Fetch a record from the PDS
+
async fn fetch_record(pds: &str, did: &str, collection: &str, rkey: &str) -> Result<String> {
+
let client = AtpServiceClient::new(IsahcClient::new(pds));
+
let did = types::string::Did::new(did.to_string()).map_err(|e| anyhow!(e))?;
+
let collection_nsid = types::string::Nsid::new(collection.to_string()).map_err(|e| anyhow!(e))?;
+
let record_key = types::string::RecordKey::new(rkey.to_string()).map_err(|e| anyhow!(e))?;
+
+
let response = client
+
.service
+
.com
+
.atproto
+
.repo
+
.get_record(com::atproto::repo::get_record::Parameters::from(
+
com::atproto::repo::get_record::ParametersData {
+
cid: None,
+
collection: collection_nsid,
+
repo: types::string::AtIdentifier::Did(did),
+
rkey: record_key,
+
}
+
))
+
.await?;
+
+
Ok(serde_json::to_string_pretty(&response.value)?)
+
}
+120 -58
src/fs.rs
···
+
use std::sync::{Arc, Mutex};
use std::time::{self, SystemTime, UNIX_EPOCH, Duration};
use atrium_repo::{Repository, blockstore::AsyncBlockStoreRead};
···
}
pub struct PdsFs<R> {
-
repos: IndexMap<String, Repository<R>>,
-
inodes: IndexSet<PdsFsEntry>,
+
repos: Arc<Mutex<IndexMap<String, Repository<R>>>>,
+
inodes: Arc<Mutex<IndexSet<PdsFsEntry>>>,
+
sizes: Arc<Mutex<IndexMap<Inode, u64>>>,
+
content_cache: Arc<Mutex<IndexMap<String, String>>>,
rt: tokio::runtime::Runtime,
}
···
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PdsFsCollection {
-
parent: Inode,
-
nsid: String,
+
pub parent: Inode,
+
pub nsid: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PdsFsRecord {
-
parent: Inode,
-
rkey: String,
+
pub parent: Inode,
+
pub rkey: String,
}
// impl PdsFsRecord {
···
{
pub fn new() -> Self {
PdsFs {
-
repos: Default::default(),
-
inodes: IndexSet::from([PdsFsEntry::Zero, PdsFsEntry::Root]),
+
repos: Arc::new(Mutex::new(Default::default())),
+
inodes: Arc::new(Mutex::new(IndexSet::from([PdsFsEntry::Zero, PdsFsEntry::Root]))),
+
sizes: Arc::new(Mutex::new(Default::default())),
+
content_cache: Arc::new(Mutex::new(Default::default())),
rt: tokio::runtime::Runtime::new().unwrap(),
}
}
+
pub fn get_shared_state(&self) -> (Arc<Mutex<IndexMap<String, Repository<R>>>>, Arc<Mutex<IndexSet<PdsFsEntry>>>, Arc<Mutex<IndexMap<Inode, u64>>>, Arc<Mutex<IndexMap<String, String>>>) {
+
(Arc::clone(&self.repos), Arc::clone(&self.inodes), Arc::clone(&self.sizes), Arc::clone(&self.content_cache))
+
}
+
pub async fn add(&mut self, did: String, mut repo: Repository<R>) {
let mut mst = repo.tree();
-
let (did_inode, _) = self.inodes.insert_full(PdsFsEntry::Did(did.clone()));
+
let did_inode = {
+
let mut inodes = self.inodes.lock().unwrap();
+
let (did_inode, _) = inodes.insert_full(PdsFsEntry::Did(did.clone()));
+
did_inode
+
};
let mut keys = Box::pin(mst.keys());
while let Some(Ok(key)) = keys.next().await {
if let Some((collection_name, rkey)) = key.split_once("/") {
-
let (collection_inode, _) =
-
self.inodes
-
.insert_full(PdsFsEntry::Collection(PdsFsCollection {
-
parent: did_inode,
-
nsid: collection_name.to_owned(),
-
}));
-
-
self.inodes.insert(PdsFsEntry::Record(PdsFsRecord {
+
let mut inodes = self.inodes.lock().unwrap();
+
let (collection_inode, _) = inodes.insert_full(PdsFsEntry::Collection(PdsFsCollection {
+
parent: did_inode,
+
nsid: collection_name.to_owned(),
+
}));
+
+
inodes.insert(PdsFsEntry::Record(PdsFsRecord {
parent: collection_inode,
rkey: rkey.to_owned(),
}));
···
drop(keys);
drop(mst);
-
self.repos.insert(did, repo);
+
self.repos.lock().unwrap().insert(did, repo);
}
fn attr(&mut self, ino: u64) -> fuser::FileAttr {
-
match self.inodes.get_index(ino as usize) {
+
let inodes = self.inodes.lock().unwrap();
+
match inodes.get_index(ino as usize) {
Some(PdsFsEntry::Root) => ROOTDIR_ATTR,
Some(PdsFsEntry::Collection(_)) => fuser::FileAttr {
ino,
···
blksize: BLKSIZE,
},
Some(PdsFsEntry::Record(r)) => {
-
let col = self.inodes[r.parent].unwrap_collection();
-
let did = self.inodes[col.parent].unwrap_did();
-
let repo = &mut self.repos[did];
-
let key = format!("{}/{}", col.nsid, r.rkey);
-
let size = self
-
.rt
-
.block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key))
-
.ok()
-
.flatten()
-
.map_or(0, |v| serde_json::to_string_pretty(&v).unwrap().len())
-
as u64;
+
let col = inodes[r.parent].unwrap_collection();
+
let did = inodes[col.parent].unwrap_did().clone();
+
let rkey = r.rkey.clone();
+
let collection_nsid = col.nsid.clone();
+
drop(inodes);
+
+
// Check cache first
+
let size = {
+
let sizes = self.sizes.lock().unwrap();
+
if let Some(&cached_size) = sizes.get(&(ino as usize)) {
+
cached_size
+
} else {
+
drop(sizes);
+
// Not in cache, try to fetch from repo
+
let mut repos = self.repos.lock().unwrap();
+
let repo = &mut repos[&did];
+
let key = format!("{}/{}", collection_nsid, rkey);
+
let size = self
+
.rt
+
.block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key))
+
.ok()
+
.flatten()
+
.map_or(500, |v| serde_json::to_string_pretty(&v).unwrap().len())
+
as u64;
+
// Cache it for next time
+
self.sizes.lock().unwrap().insert(ino as usize, size);
+
size
+
}
+
};
let blocks = ((size as u32 + BLKSIZE - 1) / BLKSIZE) as u64;
// Decode TID to get creation timestamp
-
let timestamp = tid_to_timestamp(&r.rkey).unwrap_or(time::UNIX_EPOCH);
+
let timestamp = tid_to_timestamp(&rkey).unwrap_or(time::UNIX_EPOCH);
fuser::FileAttr {
ino,
···
gid: 20,
rdev: 0,
flags: 0,
-
blksize: 512,
+
blksize: BLKSIZE,
}
}
_ => panic!("zero"),
···
_fh: Option<u64>,
reply: fuser::ReplyAttr,
) {
-
if (ino as usize) < self.inodes.len() {
+
let len = self.inodes.lock().unwrap().len();
+
if (ino as usize) < len {
reply.attr(&TTL, &self.attr(ino as u64))
} else {
reply.error(libc::ENOENT)
···
offset: i64,
mut reply: fuser::ReplyDirectory,
) {
-
match self.inodes.get_index(ino as usize) {
+
let inodes = self.inodes.lock().unwrap();
+
match inodes.get_index(ino as usize) {
Some(PdsFsEntry::Root) => {
let entries: Vec<_> = vec![(ino, ".".to_string()), (ino, "..".to_string())]
.into_iter()
-
.chain(self.inodes.iter().enumerate().filter_map(|(i, e)| {
+
.chain(inodes.iter().enumerate().filter_map(|(i, e)| {
if let PdsFsEntry::Did(did) = e {
Some((i as u64, did.clone()))
} else {
···
}
}))
.collect();
+
drop(inodes);
for (index, (inode_num, name)) in
entries.into_iter().enumerate().skip(offset as usize)
···
reply.ok()
}
Some(PdsFsEntry::Did(_)) => {
-
let entries = vec![(ino, ".".to_string()), (1, "..".to_string())]
+
let entries: Vec<_> = vec![(ino, ".".to_string()), (1, "..".to_string())]
.into_iter()
-
.chain(self.inodes.iter().enumerate().filter_map(|(i, e)| {
+
.chain(inodes.iter().enumerate().filter_map(|(i, e)| {
if let PdsFsEntry::Collection(col) = e {
if col.parent == ino as usize {
Some((i as u64, col.nsid.clone()))
···
None
}
}))
-
.into_iter()
-
.enumerate()
-
.skip(offset as usize);
+
.collect();
+
drop(inodes);
-
for (index, (inode_num, name)) in entries {
+
for (index, (inode_num, name)) in entries.into_iter().enumerate().skip(offset as usize) {
let full = reply.add(
inode_num,
(index + 1) as i64,
···
reply.ok();
}
Some(PdsFsEntry::Collection(c)) => {
-
let entries = [(ino, ".".to_string()), (c.parent as u64, "..".to_string())]
+
let parent_ino = c.parent;
+
let entries: Vec<_> = [(ino, ".".to_string()), (parent_ino as u64, "..".to_string())]
.into_iter()
-
.chain(self.inodes.iter().enumerate().filter_map(|(i, e)| {
+
.chain(inodes.iter().enumerate().filter_map(|(i, e)| {
if let PdsFsEntry::Record(record) = e {
if record.parent == ino as usize {
Some((i as u64, format!("{}.json", record.rkey)))
···
None
}
}))
-
.into_iter()
-
.enumerate()
-
.skip(offset as usize);
+
.collect();
+
drop(inodes);
-
for (index, (inode_num, name)) in entries {
+
for (index, (inode_num, name)) in entries.into_iter().enumerate().skip(offset as usize) {
let full = reply.add(
inode_num,
(index + 1) as i64,
···
reply.ok()
}
-
_ => reply.error(libc::ENOENT),
+
_ => {
+
drop(inodes);
+
reply.error(libc::ENOENT)
+
}
}
}
···
name: &std::ffi::OsStr,
reply: fuser::ReplyEntry,
) {
-
match self.inodes.get_index(parent as usize) {
+
let inodes = self.inodes.lock().unwrap();
+
match inodes.get_index(parent as usize) {
Some(PdsFsEntry::Root) => {
let did = PdsFsEntry::Did(name.to_string_lossy().to_string());
-
if let Some(ino) = self.inodes.get_index_of(&did) {
+
if let Some(ino) = inodes.get_index_of(&did) {
+
drop(inodes);
reply.entry(&TTL, &self.attr(ino as u64), 0);
} else {
+
drop(inodes);
reply.error(libc::ENOENT)
}
}
···
parent: parent as usize,
nsid: name.to_string_lossy().to_string(),
});
-
if let Some(ino) = self.inodes.get_index_of(&col) {
+
if let Some(ino) = inodes.get_index_of(&col) {
+
drop(inodes);
reply.entry(&TTL, &self.attr(ino as u64), 0);
} else {
+
drop(inodes);
reply.error(libc::ENOENT)
}
}
···
parent: parent as usize,
rkey,
});
-
if let Some(ino) = self.inodes.get_index_of(&record) {
+
if let Some(ino) = inodes.get_index_of(&record) {
+
drop(inodes);
reply.entry(&TTL, &self.attr(ino as u64), 0);
} else {
+
drop(inodes);
reply.error(libc::ENOENT)
}
}
-
_ => reply.error(libc::ENOENT),
+
_ => {
+
drop(inodes);
+
reply.error(libc::ENOENT)
+
}
}
}
···
_lock: Option<u64>,
reply: fuser::ReplyData,
) {
-
if let Some(PdsFsEntry::Record(r)) = self.inodes.get_index(ino as usize) {
-
let col = self.inodes[r.parent].unwrap_collection();
-
let did = self.inodes[col.parent].unwrap_did();
-
let repo = &mut self.repos[did];
+
let inodes = self.inodes.lock().unwrap();
+
if let Some(PdsFsEntry::Record(r)) = inodes.get_index(ino as usize) {
+
let col = inodes[r.parent].unwrap_collection();
+
let did = inodes[col.parent].unwrap_did().clone();
let key = format!("{}/{}", col.nsid, r.rkey);
+
let cache_key = format!("{}/{}", did, key);
+
drop(inodes);
+
+
// Check content cache first (for new records from firehose)
+
{
+
let cache = self.content_cache.lock().unwrap();
+
if let Some(content) = cache.get(&cache_key) {
+
reply.data(&content.as_bytes()[offset as usize..]);
+
return;
+
}
+
}
+
// Fall back to repo
+
let mut repos = self.repos.lock().unwrap();
+
let repo = &mut repos[&did];
if let Ok(Some(val)) = self.rt.block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key)) {
-
reply.data(&serde_json::to_string_pretty(&val).unwrap().as_bytes()[offset as usize..]);
+
reply.data(&serde_json::to_string(&val).unwrap().as_bytes()[offset as usize..]);
return;
}
+
} else {
+
drop(inodes);
}
reply.error(libc::ENOENT);
}