Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place
at main 18 kB view raw
1use jacquard_common::types::string::AtUri; 2use jacquard_common::types::blob::BlobRef; 3use jacquard_common::IntoStatic; 4use jacquard::client::{Agent, AgentSession, AgentSessionExt}; 5use jacquard::prelude::IdentityResolver; 6use miette::IntoDiagnostic; 7use std::collections::HashMap; 8 9use crate::place_wisp::fs::{Directory as FsDirectory, EntryNode as FsEntryNode}; 10use crate::place_wisp::subfs::SubfsRecord; 11 12/// Extract all subfs URIs from a directory tree with their mount paths 13pub fn extract_subfs_uris(directory: &FsDirectory, current_path: String) -> Vec<(String, String)> { 14 let mut uris = Vec::new(); 15 16 for entry in &directory.entries { 17 let full_path = if current_path.is_empty() { 18 entry.name.to_string() 19 } else { 20 format!("{}/{}", current_path, entry.name) 21 }; 22 23 match &entry.node { 24 FsEntryNode::Subfs(subfs_node) => { 25 // Found a subfs node - store its URI and mount path 26 uris.push((subfs_node.subject.to_string(), full_path.clone())); 27 } 28 FsEntryNode::Directory(subdir) => { 29 // Recursively search subdirectories 30 let sub_uris = extract_subfs_uris(subdir, full_path); 31 uris.extend(sub_uris); 32 } 33 FsEntryNode::File(_) => { 34 // Files don't contain subfs 35 } 36 FsEntryNode::Unknown(_) => { 37 // Skip unknown nodes 38 } 39 } 40 } 41 42 uris 43} 44 45/// Fetch a subfs record from the PDS 46pub async fn fetch_subfs_record( 47 agent: &Agent<impl AgentSession + IdentityResolver>, 48 uri: &str, 49) -> miette::Result<SubfsRecord<'static>> { 50 // Parse URI: at://did/collection/rkey 51 let parts: Vec<&str> = uri.trim_start_matches("at://").split('/').collect(); 52 53 if parts.len() < 3 { 54 return Err(miette::miette!("Invalid subfs URI: {}", uri)); 55 } 56 57 let _did = parts[0]; 58 let collection = parts[1]; 59 let _rkey = parts[2]; 60 61 if collection != "place.wisp.subfs" { 62 return Err(miette::miette!("Expected place.wisp.subfs collection, got: {}", collection)); 63 } 64 65 // Construct AT-URI for fetching 66 let at_uri = AtUri::new(uri).into_diagnostic()?; 67 68 // Fetch the record 69 let response = agent.get_record::<SubfsRecord>(&at_uri).await.into_diagnostic()?; 70 let record_output = response.into_output().into_diagnostic()?; 71 72 Ok(record_output.value.into_static()) 73} 74 75/// Recursively fetch all subfs records (including nested ones) 76/// Returns a list of (mount_path, SubfsRecord) tuples 77/// Note: Multiple records can have the same mount_path (for flat-merged chunks) 78pub async fn fetch_all_subfs_records_recursive( 79 agent: &Agent<impl AgentSession + IdentityResolver>, 80 initial_uris: Vec<(String, String)>, 81) -> miette::Result<Vec<(String, SubfsRecord<'static>)>> { 82 use futures::stream::{self, StreamExt}; 83 84 let mut all_subfs: Vec<(String, SubfsRecord<'static>)> = Vec::new(); 85 let mut fetched_uris: std::collections::HashSet<String> = std::collections::HashSet::new(); 86 let mut to_fetch = initial_uris; 87 88 if to_fetch.is_empty() { 89 return Ok(all_subfs); 90 } 91 92 println!("Found {} subfs records, fetching recursively...", to_fetch.len()); 93 94 let mut iteration = 0; 95 const MAX_ITERATIONS: usize = 10; 96 97 while !to_fetch.is_empty() && iteration < MAX_ITERATIONS { 98 iteration += 1; 99 println!(" Iteration {}: fetching {} subfs records...", iteration, to_fetch.len()); 100 101 let subfs_results: Vec<_> = stream::iter(to_fetch.clone()) 102 .map(|(uri, mount_path)| async move { 103 match fetch_subfs_record(agent, &uri).await { 104 Ok(record) => Some((mount_path, record, uri)), 105 Err(e) => { 106 eprintln!(" ⚠️ Failed to fetch subfs {}: {}", uri, e); 107 None 108 } 109 } 110 }) 111 .buffer_unordered(5) 112 .collect() 113 .await; 114 115 // Process results and find nested subfs 116 let mut newly_found_uris = Vec::new(); 117 for result in subfs_results { 118 if let Some((mount_path, record, uri)) = result { 119 println!(" ✓ Fetched subfs at {}", mount_path); 120 121 // Extract nested subfs URIs from this record 122 let nested_uris = extract_subfs_uris_from_subfs_dir(&record.root, mount_path.clone()); 123 newly_found_uris.extend(nested_uris); 124 125 all_subfs.push((mount_path, record)); 126 fetched_uris.insert(uri); 127 } 128 } 129 130 // Filter out already-fetched URIs (based on URI, not path) 131 to_fetch = newly_found_uris 132 .into_iter() 133 .filter(|(uri, _)| !fetched_uris.contains(uri)) 134 .collect(); 135 } 136 137 if iteration >= MAX_ITERATIONS { 138 eprintln!("⚠️ Max iterations reached while fetching nested subfs"); 139 } 140 141 println!(" Total subfs records fetched: {}", all_subfs.len()); 142 143 Ok(all_subfs) 144} 145 146/// Extract subfs URIs from a subfs::Directory 147fn extract_subfs_uris_from_subfs_dir( 148 directory: &crate::place_wisp::subfs::Directory, 149 current_path: String, 150) -> Vec<(String, String)> { 151 let mut uris = Vec::new(); 152 153 for entry in &directory.entries { 154 match &entry.node { 155 crate::place_wisp::subfs::EntryNode::Subfs(subfs_node) => { 156 // Check if this is a chunk entry (chunk0, chunk1, etc.) 157 // Chunks should be flat-merged, so use the parent's path 158 let mount_path = if entry.name.starts_with("chunk") && 159 entry.name.chars().skip(5).all(|c| c.is_ascii_digit()) { 160 // This is a chunk - use parent's path for flat merge 161 println!(" → Found chunk {} at {}, will flat-merge to {}", entry.name, current_path, current_path); 162 current_path.clone() 163 } else { 164 // Normal subfs - append name to path 165 if current_path.is_empty() { 166 entry.name.to_string() 167 } else { 168 format!("{}/{}", current_path, entry.name) 169 } 170 }; 171 172 uris.push((subfs_node.subject.to_string(), mount_path)); 173 } 174 crate::place_wisp::subfs::EntryNode::Directory(subdir) => { 175 let full_path = if current_path.is_empty() { 176 entry.name.to_string() 177 } else { 178 format!("{}/{}", current_path, entry.name) 179 }; 180 let nested = extract_subfs_uris_from_subfs_dir(subdir, full_path); 181 uris.extend(nested); 182 } 183 _ => {} 184 } 185 } 186 187 uris 188} 189 190/// Merge blob maps from subfs records into the main blob map (RECURSIVE) 191/// Returns the total number of blobs merged from all subfs records 192pub async fn merge_subfs_blob_maps( 193 agent: &Agent<impl AgentSession + IdentityResolver>, 194 subfs_uris: Vec<(String, String)>, 195 main_blob_map: &mut HashMap<String, (BlobRef<'static>, String)>, 196) -> miette::Result<usize> { 197 // Fetch all subfs records recursively 198 let all_subfs = fetch_all_subfs_records_recursive(agent, subfs_uris).await?; 199 200 let mut total_merged = 0; 201 202 // Extract blobs from all fetched subfs records 203 // Skip parent records that only contain chunk references (no actual files) 204 for (mount_path, subfs_record) in all_subfs { 205 // Check if this record only contains chunk subfs references (no files) 206 let only_has_chunks = subfs_record.root.entries.iter().all(|e| { 207 matches!(&e.node, crate::place_wisp::subfs::EntryNode::Subfs(_)) && 208 e.name.starts_with("chunk") && 209 e.name.chars().skip(5).all(|c| c.is_ascii_digit()) 210 }); 211 212 if only_has_chunks && !subfs_record.root.entries.is_empty() { 213 // This is a parent containing only chunks - skip it, blobs are in the chunks 214 println!(" → Skipping parent subfs at {} ({} chunks, no files)", mount_path, subfs_record.root.entries.len()); 215 continue; 216 } 217 218 let subfs_blob_map = extract_subfs_blobs(&subfs_record.root, mount_path.clone()); 219 let count = subfs_blob_map.len(); 220 221 for (path, blob_info) in subfs_blob_map { 222 main_blob_map.insert(path, blob_info); 223 } 224 225 total_merged += count; 226 println!(" ✓ Merged {} blobs from subfs at {}", count, mount_path); 227 } 228 229 Ok(total_merged) 230} 231 232/// Extract blobs from a subfs directory (works with subfs::Directory) 233/// Returns a map of file paths to their blob refs and CIDs 234fn extract_subfs_blobs( 235 directory: &crate::place_wisp::subfs::Directory, 236 current_path: String, 237) -> HashMap<String, (BlobRef<'static>, String)> { 238 let mut blob_map = HashMap::new(); 239 240 for entry in &directory.entries { 241 let full_path = if current_path.is_empty() { 242 entry.name.to_string() 243 } else { 244 format!("{}/{}", current_path, entry.name) 245 }; 246 247 match &entry.node { 248 crate::place_wisp::subfs::EntryNode::File(file_node) => { 249 let blob_ref = &file_node.blob; 250 let cid_string = blob_ref.blob().r#ref.to_string(); 251 blob_map.insert( 252 full_path, 253 (blob_ref.clone().into_static(), cid_string) 254 ); 255 } 256 crate::place_wisp::subfs::EntryNode::Directory(subdir) => { 257 let sub_map = extract_subfs_blobs(subdir, full_path); 258 blob_map.extend(sub_map); 259 } 260 crate::place_wisp::subfs::EntryNode::Subfs(_nested_subfs) => { 261 // Nested subfs - these should be resolved recursively in the main flow 262 // For now, we skip them (they'll be fetched separately) 263 eprintln!(" ⚠️ Found nested subfs at {}, skipping (should be fetched separately)", full_path); 264 } 265 crate::place_wisp::subfs::EntryNode::Unknown(_) => { 266 // Skip unknown nodes 267 } 268 } 269 } 270 271 blob_map 272} 273 274/// Count total files in a directory tree 275pub fn count_files_in_directory(directory: &FsDirectory) -> usize { 276 let mut count = 0; 277 278 for entry in &directory.entries { 279 match &entry.node { 280 FsEntryNode::File(_) => count += 1, 281 FsEntryNode::Directory(subdir) => { 282 count += count_files_in_directory(subdir); 283 } 284 FsEntryNode::Subfs(_) => { 285 // Subfs nodes don't count towards the main manifest file count 286 } 287 FsEntryNode::Unknown(_) => {} 288 } 289 } 290 291 count 292} 293 294/// Estimate JSON size of a directory tree 295pub fn estimate_directory_size(directory: &FsDirectory) -> usize { 296 // Serialize to JSON and measure 297 match serde_json::to_string(directory) { 298 Ok(json) => json.len(), 299 Err(_) => 0, 300 } 301} 302 303/// Information about a directory that could be split into a subfs record 304#[derive(Debug)] 305pub struct SplittableDirectory { 306 pub path: String, 307 pub directory: FsDirectory<'static>, 308 pub size: usize, 309 pub file_count: usize, 310} 311 312/// Find large directories that could be split into subfs records 313/// Returns directories sorted by size (largest first) 314pub fn find_large_directories(directory: &FsDirectory, current_path: String) -> Vec<SplittableDirectory> { 315 let mut result = Vec::new(); 316 317 for entry in &directory.entries { 318 if let FsEntryNode::Directory(subdir) = &entry.node { 319 let dir_path = if current_path.is_empty() { 320 entry.name.to_string() 321 } else { 322 format!("{}/{}", current_path, entry.name) 323 }; 324 325 let size = estimate_directory_size(subdir); 326 let file_count = count_files_in_directory(subdir); 327 328 result.push(SplittableDirectory { 329 path: dir_path.clone(), 330 directory: (*subdir.clone()).into_static(), 331 size, 332 file_count, 333 }); 334 335 // Recursively find subdirectories 336 let subdirs = find_large_directories(subdir, dir_path); 337 result.extend(subdirs); 338 } 339 } 340 341 // Sort by size (largest first) 342 result.sort_by(|a, b| b.size.cmp(&a.size)); 343 344 result 345} 346 347/// Replace a directory with a subfs node in the tree 348pub fn replace_directory_with_subfs( 349 directory: FsDirectory<'static>, 350 target_path: &str, 351 subfs_uri: &str, 352 flat: bool, 353) -> miette::Result<FsDirectory<'static>> { 354 use jacquard_common::CowStr; 355 use crate::place_wisp::fs::{Entry, Subfs}; 356 357 let path_parts: Vec<&str> = target_path.split('/').collect(); 358 359 if path_parts.is_empty() { 360 return Err(miette::miette!("Cannot replace root directory")); 361 } 362 363 // Parse the subfs URI and make it owned/'static 364 let at_uri = AtUri::new_cow(jacquard_common::CowStr::from(subfs_uri.to_string())).into_diagnostic()?; 365 366 // If this is a root-level directory 367 if path_parts.len() == 1 { 368 let target_name = path_parts[0]; 369 let new_entries: Vec<Entry> = directory.entries.into_iter().map(|entry| { 370 if entry.name == target_name { 371 // Replace this directory with a subfs node 372 Entry::new() 373 .name(entry.name) 374 .node(FsEntryNode::Subfs(Box::new( 375 Subfs::new() 376 .r#type(CowStr::from("subfs")) 377 .subject(at_uri.clone()) 378 .flat(Some(flat)) 379 .build() 380 ))) 381 .build() 382 } else { 383 entry 384 } 385 }).collect(); 386 387 return Ok(FsDirectory::new() 388 .r#type(CowStr::from("directory")) 389 .entries(new_entries) 390 .build()); 391 } 392 393 // Recursively navigate to parent directory 394 let first_part = path_parts[0]; 395 let remaining_path = path_parts[1..].join("/"); 396 397 let new_entries: Vec<Entry> = directory.entries.into_iter().filter_map(|entry| { 398 if entry.name == first_part { 399 if let FsEntryNode::Directory(subdir) = entry.node { 400 // Recursively process this subdirectory 401 match replace_directory_with_subfs((*subdir).into_static(), &remaining_path, subfs_uri, flat) { 402 Ok(updated_subdir) => { 403 Some(Entry::new() 404 .name(entry.name) 405 .node(FsEntryNode::Directory(Box::new(updated_subdir))) 406 .build()) 407 } 408 Err(_) => None, // Skip entries that fail to update 409 } 410 } else { 411 Some(entry) 412 } 413 } else { 414 Some(entry) 415 } 416 }).collect(); 417 418 Ok(FsDirectory::new() 419 .r#type(CowStr::from("directory")) 420 .entries(new_entries) 421 .build()) 422} 423 424/// Delete a subfs record from the PDS 425pub async fn delete_subfs_record( 426 agent: &Agent<impl AgentSession + IdentityResolver>, 427 uri: &str, 428) -> miette::Result<()> { 429 use jacquard_common::types::uri::RecordUri; 430 431 // Construct AT-URI and convert to RecordUri 432 let at_uri = AtUri::new(uri).into_diagnostic()?; 433 let record_uri: RecordUri<'_, crate::place_wisp::subfs::SubfsRecordRecord> = RecordUri::try_from_uri(at_uri).into_diagnostic()?; 434 435 let rkey = record_uri.rkey() 436 .ok_or_else(|| miette::miette!("Invalid subfs URI: missing rkey"))? 437 .clone(); 438 439 agent.delete_record::<SubfsRecord>(rkey).await.into_diagnostic()?; 440 441 Ok(()) 442} 443 444/// Split a large directory into multiple smaller chunks 445/// Returns a list of chunk directories, each small enough to fit in a subfs record 446pub fn split_directory_into_chunks( 447 directory: &FsDirectory, 448 max_size: usize, 449) -> Vec<FsDirectory<'static>> { 450 use jacquard_common::CowStr; 451 452 let mut chunks = Vec::new(); 453 let mut current_chunk_entries = Vec::new(); 454 let mut current_chunk_size = 100; // Base size for directory structure 455 456 for entry in &directory.entries { 457 // Estimate the size of this entry 458 let entry_size = estimate_entry_size(entry); 459 460 // If adding this entry would exceed the max size, start a new chunk 461 if !current_chunk_entries.is_empty() && (current_chunk_size + entry_size > max_size) { 462 // Create a chunk from current entries 463 let chunk = FsDirectory::new() 464 .r#type(CowStr::from("directory")) 465 .entries(current_chunk_entries.clone()) 466 .build(); 467 468 chunks.push(chunk); 469 470 // Start new chunk 471 current_chunk_entries.clear(); 472 current_chunk_size = 100; 473 } 474 475 current_chunk_entries.push(entry.clone().into_static()); 476 current_chunk_size += entry_size; 477 } 478 479 // Add the last chunk if it has any entries 480 if !current_chunk_entries.is_empty() { 481 let chunk = FsDirectory::new() 482 .r#type(CowStr::from("directory")) 483 .entries(current_chunk_entries) 484 .build(); 485 chunks.push(chunk); 486 } 487 488 chunks 489} 490 491/// Estimate the JSON size of a single entry 492fn estimate_entry_size(entry: &crate::place_wisp::fs::Entry) -> usize { 493 match serde_json::to_string(entry) { 494 Ok(json) => json.len(), 495 Err(_) => 500, // Conservative estimate if serialization fails 496 } 497}