Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place
1use jacquard_common::types::string::AtUri; 2use jacquard_common::types::blob::BlobRef; 3use jacquard_common::IntoStatic; 4use jacquard::client::{Agent, AgentSession, AgentSessionExt}; 5use jacquard::prelude::IdentityResolver; 6use miette::IntoDiagnostic; 7use std::collections::HashMap; 8 9use crate::place_wisp::fs::{Directory as FsDirectory, EntryNode as FsEntryNode}; 10use crate::place_wisp::subfs::SubfsRecord; 11 12/// Extract all subfs URIs from a directory tree with their mount paths 13pub fn extract_subfs_uris(directory: &FsDirectory, current_path: String) -> Vec<(String, String)> { 14 let mut uris = Vec::new(); 15 16 for entry in &directory.entries { 17 let full_path = if current_path.is_empty() { 18 entry.name.to_string() 19 } else { 20 format!("{}/{}", current_path, entry.name) 21 }; 22 23 match &entry.node { 24 FsEntryNode::Subfs(subfs_node) => { 25 // Found a subfs node - store its URI and mount path 26 uris.push((subfs_node.subject.to_string(), full_path.clone())); 27 } 28 FsEntryNode::Directory(subdir) => { 29 // Recursively search subdirectories 30 let sub_uris = extract_subfs_uris(subdir, full_path); 31 uris.extend(sub_uris); 32 } 33 FsEntryNode::File(_) => { 34 // Files don't contain subfs 35 } 36 FsEntryNode::Unknown(_) => { 37 // Skip unknown nodes 38 } 39 } 40 } 41 42 uris 43} 44 45/// Fetch a subfs record from the PDS 46pub async fn fetch_subfs_record( 47 agent: &Agent<impl AgentSession + IdentityResolver>, 48 uri: &str, 49) -> miette::Result<SubfsRecord<'static>> { 50 // Parse URI: at://did/collection/rkey 51 let parts: Vec<&str> = uri.trim_start_matches("at://").split('/').collect(); 52 53 if parts.len() < 3 { 54 return Err(miette::miette!("Invalid subfs URI: {}", uri)); 55 } 56 57 let _did = parts[0]; 58 let collection = parts[1]; 59 let _rkey = parts[2]; 60 61 if collection != "place.wisp.subfs" { 62 return Err(miette::miette!("Expected place.wisp.subfs collection, got: {}", collection)); 63 } 64 65 // Construct AT-URI for fetching 66 let at_uri = AtUri::new(uri).into_diagnostic()?; 67 68 // Fetch the record 69 let response = agent.get_record::<SubfsRecord>(&at_uri).await.into_diagnostic()?; 70 let record_output = response.into_output().into_diagnostic()?; 71 72 Ok(record_output.value.into_static()) 73} 74 75/// Merge blob maps from subfs records into the main blob map 76/// Returns the total number of blobs merged from all subfs records 77pub async fn merge_subfs_blob_maps( 78 agent: &Agent<impl AgentSession + IdentityResolver>, 79 subfs_uris: Vec<(String, String)>, 80 main_blob_map: &mut HashMap<String, (BlobRef<'static>, String)>, 81) -> miette::Result<usize> { 82 let mut total_merged = 0; 83 84 println!("Fetching {} subfs records for blob reuse...", subfs_uris.len()); 85 86 // Fetch all subfs records in parallel (but with some concurrency limit) 87 use futures::stream::{self, StreamExt}; 88 89 let subfs_results: Vec<_> = stream::iter(subfs_uris) 90 .map(|(uri, mount_path)| async move { 91 match fetch_subfs_record(agent, &uri).await { 92 Ok(record) => Some((record, mount_path)), 93 Err(e) => { 94 eprintln!(" ⚠️ Failed to fetch subfs {}: {}", uri, e); 95 None 96 } 97 } 98 }) 99 .buffer_unordered(5) 100 .collect() 101 .await; 102 103 // Convert subfs Directory to fs Directory for blob extraction 104 // Note: We need to extract blobs from the subfs record's root 105 for result in subfs_results { 106 if let Some((subfs_record, mount_path)) = result { 107 // Extract blobs from this subfs record's root 108 // The blob_map module works with fs::Directory, but subfs::Directory has the same structure 109 // We need to convert or work directly with the entries 110 111 let subfs_blob_map = extract_subfs_blobs(&subfs_record.root, mount_path.clone()); 112 let count = subfs_blob_map.len(); 113 114 for (path, blob_info) in subfs_blob_map { 115 main_blob_map.insert(path, blob_info); 116 } 117 118 total_merged += count; 119 println!(" ✓ Merged {} blobs from subfs at {}", count, mount_path); 120 } 121 } 122 123 Ok(total_merged) 124} 125 126/// Extract blobs from a subfs directory (works with subfs::Directory) 127/// Returns a map of file paths to their blob refs and CIDs 128fn extract_subfs_blobs( 129 directory: &crate::place_wisp::subfs::Directory, 130 current_path: String, 131) -> HashMap<String, (BlobRef<'static>, String)> { 132 let mut blob_map = HashMap::new(); 133 134 for entry in &directory.entries { 135 let full_path = if current_path.is_empty() { 136 entry.name.to_string() 137 } else { 138 format!("{}/{}", current_path, entry.name) 139 }; 140 141 match &entry.node { 142 crate::place_wisp::subfs::EntryNode::File(file_node) => { 143 let blob_ref = &file_node.blob; 144 let cid_string = blob_ref.blob().r#ref.to_string(); 145 blob_map.insert( 146 full_path, 147 (blob_ref.clone().into_static(), cid_string) 148 ); 149 } 150 crate::place_wisp::subfs::EntryNode::Directory(subdir) => { 151 let sub_map = extract_subfs_blobs(subdir, full_path); 152 blob_map.extend(sub_map); 153 } 154 crate::place_wisp::subfs::EntryNode::Subfs(_nested_subfs) => { 155 // Nested subfs - these should be resolved recursively in the main flow 156 // For now, we skip them (they'll be fetched separately) 157 eprintln!(" ⚠️ Found nested subfs at {}, skipping (should be fetched separately)", full_path); 158 } 159 crate::place_wisp::subfs::EntryNode::Unknown(_) => { 160 // Skip unknown nodes 161 } 162 } 163 } 164 165 blob_map 166} 167 168/// Count total files in a directory tree 169pub fn count_files_in_directory(directory: &FsDirectory) -> usize { 170 let mut count = 0; 171 172 for entry in &directory.entries { 173 match &entry.node { 174 FsEntryNode::File(_) => count += 1, 175 FsEntryNode::Directory(subdir) => { 176 count += count_files_in_directory(subdir); 177 } 178 FsEntryNode::Subfs(_) => { 179 // Subfs nodes don't count towards the main manifest file count 180 } 181 FsEntryNode::Unknown(_) => {} 182 } 183 } 184 185 count 186} 187 188/// Estimate JSON size of a directory tree 189pub fn estimate_directory_size(directory: &FsDirectory) -> usize { 190 // Serialize to JSON and measure 191 match serde_json::to_string(directory) { 192 Ok(json) => json.len(), 193 Err(_) => 0, 194 } 195} 196 197/// Information about a directory that could be split into a subfs record 198#[derive(Debug)] 199pub struct SplittableDirectory { 200 pub path: String, 201 pub directory: FsDirectory<'static>, 202 pub size: usize, 203 pub file_count: usize, 204} 205 206/// Find large directories that could be split into subfs records 207/// Returns directories sorted by size (largest first) 208pub fn find_large_directories(directory: &FsDirectory, current_path: String) -> Vec<SplittableDirectory> { 209 let mut result = Vec::new(); 210 211 for entry in &directory.entries { 212 if let FsEntryNode::Directory(subdir) = &entry.node { 213 let dir_path = if current_path.is_empty() { 214 entry.name.to_string() 215 } else { 216 format!("{}/{}", current_path, entry.name) 217 }; 218 219 let size = estimate_directory_size(subdir); 220 let file_count = count_files_in_directory(subdir); 221 222 result.push(SplittableDirectory { 223 path: dir_path.clone(), 224 directory: (*subdir.clone()).into_static(), 225 size, 226 file_count, 227 }); 228 229 // Recursively find subdirectories 230 let subdirs = find_large_directories(subdir, dir_path); 231 result.extend(subdirs); 232 } 233 } 234 235 // Sort by size (largest first) 236 result.sort_by(|a, b| b.size.cmp(&a.size)); 237 238 result 239} 240 241/// Replace a directory with a subfs node in the tree 242pub fn replace_directory_with_subfs( 243 directory: FsDirectory<'static>, 244 target_path: &str, 245 subfs_uri: &str, 246 flat: bool, 247) -> miette::Result<FsDirectory<'static>> { 248 use jacquard_common::CowStr; 249 use crate::place_wisp::fs::{Entry, Subfs}; 250 251 let path_parts: Vec<&str> = target_path.split('/').collect(); 252 253 if path_parts.is_empty() { 254 return Err(miette::miette!("Cannot replace root directory")); 255 } 256 257 // Parse the subfs URI and make it owned/'static 258 let at_uri = AtUri::new_cow(jacquard_common::CowStr::from(subfs_uri.to_string())).into_diagnostic()?; 259 260 // If this is a root-level directory 261 if path_parts.len() == 1 { 262 let target_name = path_parts[0]; 263 let new_entries: Vec<Entry> = directory.entries.into_iter().map(|entry| { 264 if entry.name == target_name { 265 // Replace this directory with a subfs node 266 Entry::new() 267 .name(entry.name) 268 .node(FsEntryNode::Subfs(Box::new( 269 Subfs::new() 270 .r#type(CowStr::from("subfs")) 271 .subject(at_uri.clone()) 272 .flat(Some(flat)) 273 .build() 274 ))) 275 .build() 276 } else { 277 entry 278 } 279 }).collect(); 280 281 return Ok(FsDirectory::new() 282 .r#type(CowStr::from("directory")) 283 .entries(new_entries) 284 .build()); 285 } 286 287 // Recursively navigate to parent directory 288 let first_part = path_parts[0]; 289 let remaining_path = path_parts[1..].join("/"); 290 291 let new_entries: Vec<Entry> = directory.entries.into_iter().filter_map(|entry| { 292 if entry.name == first_part { 293 if let FsEntryNode::Directory(subdir) = entry.node { 294 // Recursively process this subdirectory 295 match replace_directory_with_subfs((*subdir).into_static(), &remaining_path, subfs_uri, flat) { 296 Ok(updated_subdir) => { 297 Some(Entry::new() 298 .name(entry.name) 299 .node(FsEntryNode::Directory(Box::new(updated_subdir))) 300 .build()) 301 } 302 Err(_) => None, // Skip entries that fail to update 303 } 304 } else { 305 Some(entry) 306 } 307 } else { 308 Some(entry) 309 } 310 }).collect(); 311 312 Ok(FsDirectory::new() 313 .r#type(CowStr::from("directory")) 314 .entries(new_entries) 315 .build()) 316} 317 318/// Delete a subfs record from the PDS 319pub async fn delete_subfs_record( 320 agent: &Agent<impl AgentSession + IdentityResolver>, 321 uri: &str, 322) -> miette::Result<()> { 323 use jacquard_common::types::uri::RecordUri; 324 325 // Construct AT-URI and convert to RecordUri 326 let at_uri = AtUri::new(uri).into_diagnostic()?; 327 let record_uri: RecordUri<'_, crate::place_wisp::subfs::SubfsRecordRecord> = RecordUri::try_from_uri(at_uri).into_diagnostic()?; 328 329 let rkey = record_uri.rkey() 330 .ok_or_else(|| miette::miette!("Invalid subfs URI: missing rkey"))? 331 .clone(); 332 333 agent.delete_record::<SubfsRecord>(rkey).await.into_diagnostic()?; 334 335 Ok(()) 336}