Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place
1use crate::blob_map; 2use crate::download; 3use crate::metadata::SiteMetadata; 4use crate::place_wisp::fs::*; 5use crate::subfs_utils; 6use jacquard::CowStr; 7use jacquard::prelude::IdentityResolver; 8use jacquard_common::types::string::Did; 9use jacquard_common::xrpc::XrpcExt; 10use jacquard_identity::PublicResolver; 11use miette::IntoDiagnostic; 12use std::collections::HashMap; 13use std::path::{Path, PathBuf}; 14use url::Url; 15 16/// Pull a site from the PDS to a local directory 17pub async fn pull_site( 18 input: CowStr<'static>, 19 rkey: CowStr<'static>, 20 output_dir: PathBuf, 21) -> miette::Result<()> { 22 println!("Pulling site {} from {}...", rkey, input); 23 24 // Resolve handle to DID if needed 25 let resolver = PublicResolver::default(); 26 let did = if input.starts_with("did:") { 27 Did::new(&input).into_diagnostic()? 28 } else { 29 // It's a handle, resolve it 30 let handle = jacquard_common::types::string::Handle::new(&input).into_diagnostic()?; 31 resolver.resolve_handle(&handle).await.into_diagnostic()? 32 }; 33 34 // Resolve PDS endpoint for the DID 35 let pds_url = resolver.pds_for_did(&did).await.into_diagnostic()?; 36 println!("Resolved PDS: {}", pds_url); 37 38 // Fetch the place.wisp.fs record 39 40 println!("Fetching record from PDS..."); 41 let client = reqwest::Client::new(); 42 43 // Use com.atproto.repo.getRecord 44 use jacquard::api::com_atproto::repo::get_record::GetRecord; 45 use jacquard_common::types::string::Rkey as RkeyType; 46 let rkey_parsed = RkeyType::new(&rkey).into_diagnostic()?; 47 48 use jacquard_common::types::ident::AtIdentifier; 49 use jacquard_common::types::string::RecordKey; 50 let request = GetRecord::new() 51 .repo(AtIdentifier::Did(did.clone())) 52 .collection(CowStr::from("place.wisp.fs")) 53 .rkey(RecordKey::from(rkey_parsed)) 54 .build(); 55 56 let response = client 57 .xrpc(pds_url.clone()) 58 .send(&request) 59 .await 60 .into_diagnostic()?; 61 62 let record_output = response.into_output().into_diagnostic()?; 63 let record_cid = record_output.cid.as_ref().map(|c| c.to_string()).unwrap_or_default(); 64 65 // Parse the record value as Fs 66 use jacquard_common::types::value::from_data; 67 let fs_record: Fs = from_data(&record_output.value).into_diagnostic()?; 68 69 let file_count = fs_record.file_count.map(|c| c.to_string()).unwrap_or_else(|| "?".to_string()); 70 println!("Found site '{}' with {} files (in main record)", fs_record.site, file_count); 71 72 // Check for and expand subfs nodes 73 let expanded_root = expand_subfs_in_pull(&fs_record.root, &pds_url, did.as_str()).await?; 74 let total_file_count = subfs_utils::count_files_in_directory(&expanded_root); 75 76 if total_file_count as i64 != fs_record.file_count.unwrap_or(0) { 77 println!("Total files after expanding subfs: {}", total_file_count); 78 } 79 80 // Load existing metadata for incremental updates 81 let existing_metadata = SiteMetadata::load(&output_dir)?; 82 let existing_file_cids = existing_metadata 83 .as_ref() 84 .map(|m| m.file_cids.clone()) 85 .unwrap_or_default(); 86 87 // Extract blob map from the expanded manifest 88 let new_blob_map = blob_map::extract_blob_map(&expanded_root); 89 let new_file_cids: HashMap<String, String> = new_blob_map 90 .iter() 91 .map(|(path, (_blob_ref, cid))| (path.clone(), cid.clone())) 92 .collect(); 93 94 // Clean up any leftover temp directories from previous failed attempts 95 let parent = output_dir.parent().unwrap_or_else(|| std::path::Path::new(".")); 96 let output_name = output_dir.file_name().unwrap_or_else(|| std::ffi::OsStr::new("site")).to_string_lossy(); 97 let temp_prefix = format!(".tmp-{}-", output_name); 98 99 if let Ok(entries) = parent.read_dir() { 100 for entry in entries.flatten() { 101 let name = entry.file_name(); 102 if name.to_string_lossy().starts_with(&temp_prefix) { 103 let _ = std::fs::remove_dir_all(entry.path()); 104 } 105 } 106 } 107 108 // Check if we need to update (verify files actually exist, not just metadata) 109 if let Some(metadata) = &existing_metadata { 110 if metadata.record_cid == record_cid { 111 // Verify that the output directory actually exists and has the expected files 112 let has_all_files = output_dir.exists() && { 113 // Count actual files on disk (excluding metadata) 114 let mut actual_file_count = 0; 115 if let Ok(entries) = std::fs::read_dir(&output_dir) { 116 for entry in entries.flatten() { 117 let name = entry.file_name(); 118 if !name.to_string_lossy().starts_with(".wisp-metadata") { 119 if entry.path().is_file() { 120 actual_file_count += 1; 121 } 122 } 123 } 124 } 125 126 // Compare with expected file count from metadata 127 let expected_count = metadata.file_cids.len(); 128 actual_file_count > 0 && actual_file_count >= expected_count 129 }; 130 131 if has_all_files { 132 println!("Site is already up to date!"); 133 return Ok(()); 134 } else { 135 println!("Site metadata exists but files are missing, re-downloading..."); 136 } 137 } 138 } 139 140 // Create temporary directory for atomic update 141 // Place temp dir in parent directory to avoid issues with non-existent output_dir 142 let parent = output_dir.parent().unwrap_or_else(|| std::path::Path::new(".")); 143 let temp_dir_name = format!( 144 ".tmp-{}-{}", 145 output_dir.file_name().unwrap_or_else(|| std::ffi::OsStr::new("site")).to_string_lossy(), 146 chrono::Utc::now().timestamp() 147 ); 148 let temp_dir = parent.join(temp_dir_name); 149 std::fs::create_dir_all(&temp_dir).into_diagnostic()?; 150 151 println!("Downloading files..."); 152 let mut downloaded = 0; 153 let mut reused = 0; 154 155 // Download files recursively (using expanded root) 156 let download_result = download_directory( 157 &expanded_root, 158 &temp_dir, 159 &pds_url, 160 did.as_str(), 161 &new_blob_map, 162 &existing_file_cids, 163 &output_dir, 164 String::new(), 165 &mut downloaded, 166 &mut reused, 167 ) 168 .await; 169 170 // If download failed, clean up temp directory 171 if let Err(e) = download_result { 172 let _ = std::fs::remove_dir_all(&temp_dir); 173 return Err(e); 174 } 175 176 println!( 177 "Downloaded {} files, reused {} files", 178 downloaded, reused 179 ); 180 181 // Save metadata 182 let metadata = SiteMetadata::new(record_cid, new_file_cids); 183 metadata.save(&temp_dir)?; 184 185 // Move files from temp to output directory 186 let output_abs = std::fs::canonicalize(&output_dir).unwrap_or_else(|_| output_dir.clone()); 187 let current_dir = std::env::current_dir().into_diagnostic()?; 188 189 // Special handling for pulling to current directory 190 if output_abs == current_dir { 191 // Move files from temp to current directory 192 for entry in std::fs::read_dir(&temp_dir).into_diagnostic()? { 193 let entry = entry.into_diagnostic()?; 194 let dest = current_dir.join(entry.file_name()); 195 196 // Remove existing file/dir if it exists 197 if dest.exists() { 198 if dest.is_dir() { 199 std::fs::remove_dir_all(&dest).into_diagnostic()?; 200 } else { 201 std::fs::remove_file(&dest).into_diagnostic()?; 202 } 203 } 204 205 // Move from temp to current dir 206 std::fs::rename(entry.path(), dest).into_diagnostic()?; 207 } 208 209 // Clean up temp directory 210 std::fs::remove_dir_all(&temp_dir).into_diagnostic()?; 211 } else { 212 // If output directory exists and has content, remove it first 213 if output_dir.exists() { 214 std::fs::remove_dir_all(&output_dir).into_diagnostic()?; 215 } 216 217 // Ensure parent directory exists 218 if let Some(parent) = output_dir.parent() { 219 if !parent.as_os_str().is_empty() && !parent.exists() { 220 std::fs::create_dir_all(parent).into_diagnostic()?; 221 } 222 } 223 224 // Rename temp to final location 225 match std::fs::rename(&temp_dir, &output_dir) { 226 Ok(_) => {}, 227 Err(e) => { 228 // Clean up temp directory on failure 229 let _ = std::fs::remove_dir_all(&temp_dir); 230 return Err(miette::miette!("Failed to move temp directory: {}", e)); 231 } 232 } 233 } 234 235 println!("✓ Site pulled successfully to {}", output_dir.display()); 236 237 Ok(()) 238} 239 240/// Recursively download a directory with concurrent downloads 241fn download_directory<'a>( 242 dir: &'a Directory<'_>, 243 output_dir: &'a Path, 244 pds_url: &'a Url, 245 did: &'a str, 246 new_blob_map: &'a HashMap<String, (jacquard_common::types::blob::BlobRef<'static>, String)>, 247 existing_file_cids: &'a HashMap<String, String>, 248 existing_output_dir: &'a Path, 249 path_prefix: String, 250 downloaded: &'a mut usize, 251 reused: &'a mut usize, 252) -> std::pin::Pin<Box<dyn std::future::Future<Output = miette::Result<()>> + Send + 'a>> { 253 Box::pin(async move { 254 use futures::stream::{self, StreamExt}; 255 256 // Collect download tasks and directory tasks separately 257 struct DownloadTask { 258 path: String, 259 output_path: PathBuf, 260 blob: jacquard_common::types::blob::BlobRef<'static>, 261 base64: bool, 262 gzip: bool, 263 } 264 265 struct CopyTask { 266 path: String, 267 from: PathBuf, 268 to: PathBuf, 269 } 270 271 let mut download_tasks = Vec::new(); 272 let mut copy_tasks = Vec::new(); 273 let mut dir_tasks = Vec::new(); 274 275 for entry in &dir.entries { 276 let entry_name = entry.name.as_str(); 277 let current_path = if path_prefix.is_empty() { 278 entry_name.to_string() 279 } else { 280 format!("{}/{}", path_prefix, entry_name) 281 }; 282 283 match &entry.node { 284 EntryNode::File(file) => { 285 let output_path = output_dir.join(entry_name); 286 287 // Check if file CID matches existing 288 let should_copy = if let Some((_blob_ref, new_cid)) = new_blob_map.get(&current_path) { 289 if let Some(existing_cid) = existing_file_cids.get(&current_path) { 290 if existing_cid == new_cid { 291 let existing_path = existing_output_dir.join(&current_path); 292 if existing_path.exists() { 293 copy_tasks.push(CopyTask { 294 path: current_path.clone(), 295 from: existing_path, 296 to: output_path.clone(), 297 }); 298 true 299 } else { 300 false 301 } 302 } else { 303 false 304 } 305 } else { 306 false 307 } 308 } else { 309 false 310 }; 311 312 if !should_copy { 313 use jacquard_common::IntoStatic; 314 // File needs to be downloaded 315 download_tasks.push(DownloadTask { 316 path: current_path, 317 output_path, 318 blob: file.blob.clone().into_static(), 319 base64: file.base64.unwrap_or(false), 320 gzip: file.encoding.as_ref().map(|e| e.as_str() == "gzip").unwrap_or(false), 321 }); 322 } 323 } 324 EntryNode::Directory(subdir) => { 325 let subdir_path = output_dir.join(entry_name); 326 dir_tasks.push((subdir.as_ref().clone(), subdir_path, current_path)); 327 } 328 EntryNode::Subfs(_) => { 329 println!(" ⚠ Skipping subfs node at {} (should have been expanded)", current_path); 330 } 331 EntryNode::Unknown(_) => { 332 println!(" ⚠ Skipping unknown node type for {}", current_path); 333 } 334 } 335 } 336 337 // Execute copy tasks (fast, do them all) 338 for task in copy_tasks { 339 std::fs::copy(&task.from, &task.to).into_diagnostic()?; 340 *reused += 1; 341 println!(" ✓ Reused {}", task.path); 342 } 343 344 // Execute download tasks with concurrency limit (20 concurrent downloads) 345 const DOWNLOAD_CONCURRENCY: usize = 20; 346 347 let pds_url_clone = pds_url.clone(); 348 let did_str = did.to_string(); 349 350 let download_results: Vec<miette::Result<(String, PathBuf, Vec<u8>)>> = stream::iter(download_tasks) 351 .map(|task| { 352 let pds = pds_url_clone.clone(); 353 let did_copy = did_str.clone(); 354 355 async move { 356 println!(" ↓ Downloading {}", task.path); 357 let data = download::download_and_decompress_blob( 358 &pds, 359 &task.blob, 360 &did_copy, 361 task.base64, 362 task.gzip, 363 ) 364 .await?; 365 366 Ok::<_, miette::Report>((task.path, task.output_path, data)) 367 } 368 }) 369 .buffer_unordered(DOWNLOAD_CONCURRENCY) 370 .collect() 371 .await; 372 373 // Write downloaded files to disk 374 for result in download_results { 375 let (path, output_path, data) = result?; 376 std::fs::write(&output_path, data).into_diagnostic()?; 377 *downloaded += 1; 378 println!(" ✓ Downloaded {}", path); 379 } 380 381 // Recursively process directories 382 for (subdir, subdir_path, current_path) in dir_tasks { 383 std::fs::create_dir_all(&subdir_path).into_diagnostic()?; 384 385 download_directory( 386 &subdir, 387 &subdir_path, 388 pds_url, 389 did, 390 new_blob_map, 391 existing_file_cids, 392 existing_output_dir, 393 current_path, 394 downloaded, 395 reused, 396 ) 397 .await?; 398 } 399 400 Ok(()) 401 }) 402} 403 404/// Expand subfs nodes in a directory tree by fetching and merging subfs records (RECURSIVELY) 405async fn expand_subfs_in_pull<'a>( 406 directory: &Directory<'a>, 407 pds_url: &Url, 408 _did: &str, 409) -> miette::Result<Directory<'static>> { 410 use crate::place_wisp::subfs::SubfsRecord; 411 use jacquard_common::types::value::from_data; 412 use jacquard_common::IntoStatic; 413 414 // Recursively fetch ALL subfs records (including nested ones) 415 let mut all_subfs_map: HashMap<String, crate::place_wisp::subfs::Directory> = HashMap::new(); 416 let mut to_fetch = subfs_utils::extract_subfs_uris(directory, String::new()); 417 418 if to_fetch.is_empty() { 419 return Ok((*directory).clone().into_static()); 420 } 421 422 println!("Found {} subfs records, fetching recursively...", to_fetch.len()); 423 let client = reqwest::Client::new(); 424 425 // Keep fetching until we've resolved all subfs (including nested ones) 426 let mut iteration = 0; 427 const MAX_ITERATIONS: usize = 10; // Prevent infinite loops 428 429 while !to_fetch.is_empty() && iteration < MAX_ITERATIONS { 430 iteration += 1; 431 println!(" Iteration {}: fetching {} subfs records...", iteration, to_fetch.len()); 432 433 let mut fetch_tasks = Vec::new(); 434 435 for (uri, path) in to_fetch.clone() { 436 let client = client.clone(); 437 let pds_url = pds_url.clone(); 438 439 fetch_tasks.push(async move { 440 let parts: Vec<&str> = uri.trim_start_matches("at://").split('/').collect(); 441 if parts.len() < 3 { 442 return Err(miette::miette!("Invalid subfs URI: {}", uri)); 443 } 444 445 let _did = parts[0]; 446 let collection = parts[1]; 447 let rkey = parts[2]; 448 449 if collection != "place.wisp.subfs" { 450 return Err(miette::miette!("Expected place.wisp.subfs collection, got: {}", collection)); 451 } 452 453 use jacquard::api::com_atproto::repo::get_record::GetRecord; 454 use jacquard_common::types::string::Rkey as RkeyType; 455 use jacquard_common::types::ident::AtIdentifier; 456 use jacquard_common::types::string::{RecordKey, Did as DidType}; 457 458 let rkey_parsed = RkeyType::new(rkey).into_diagnostic()?; 459 let did_parsed = DidType::new(_did).into_diagnostic()?; 460 461 let request = GetRecord::new() 462 .repo(AtIdentifier::Did(did_parsed)) 463 .collection(CowStr::from("place.wisp.subfs")) 464 .rkey(RecordKey::from(rkey_parsed)) 465 .build(); 466 467 let response = client 468 .xrpc(pds_url) 469 .send(&request) 470 .await 471 .into_diagnostic()?; 472 473 let record_output = response.into_output().into_diagnostic()?; 474 let subfs_record: SubfsRecord = from_data(&record_output.value).into_diagnostic()?; 475 let subfs_record_static = subfs_record.into_static(); 476 477 Ok::<_, miette::Report>((path, subfs_record_static)) 478 }); 479 } 480 481 let results: Vec<_> = futures::future::join_all(fetch_tasks).await; 482 483 // Process results and find nested subfs 484 let mut newly_fetched = Vec::new(); 485 for result in results { 486 match result { 487 Ok((path, record)) => { 488 println!(" ✓ Fetched subfs at {}", path); 489 490 // Check for nested subfs in this record 491 let nested_subfs = extract_subfs_from_subfs_dir(&record.root, path.clone()); 492 newly_fetched.extend(nested_subfs); 493 494 all_subfs_map.insert(path, record.root); 495 } 496 Err(e) => { 497 eprintln!(" ⚠️ Failed to fetch subfs: {}", e); 498 } 499 } 500 } 501 502 // Update to_fetch with only the NEW subfs we haven't fetched yet 503 to_fetch = newly_fetched 504 .into_iter() 505 .filter(|(uri, _)| !all_subfs_map.iter().any(|(k, _)| k == uri)) 506 .collect(); 507 } 508 509 if iteration >= MAX_ITERATIONS { 510 return Err(miette::miette!("Max iterations reached while fetching nested subfs")); 511 } 512 513 println!(" Total subfs records fetched: {}", all_subfs_map.len()); 514 515 // Now replace all subfs nodes with their content 516 Ok(replace_subfs_with_content(directory.clone(), &all_subfs_map, String::new())) 517} 518 519/// Extract subfs URIs from a subfs::Directory 520fn extract_subfs_from_subfs_dir( 521 directory: &crate::place_wisp::subfs::Directory, 522 current_path: String, 523) -> Vec<(String, String)> { 524 let mut uris = Vec::new(); 525 526 for entry in &directory.entries { 527 let full_path = if current_path.is_empty() { 528 entry.name.to_string() 529 } else { 530 format!("{}/{}", current_path, entry.name) 531 }; 532 533 match &entry.node { 534 crate::place_wisp::subfs::EntryNode::Subfs(subfs_node) => { 535 uris.push((subfs_node.subject.to_string(), full_path.clone())); 536 } 537 crate::place_wisp::subfs::EntryNode::Directory(subdir) => { 538 let nested = extract_subfs_from_subfs_dir(subdir, full_path); 539 uris.extend(nested); 540 } 541 _ => {} 542 } 543 } 544 545 uris 546} 547 548/// Recursively replace subfs nodes with their actual content 549fn replace_subfs_with_content( 550 directory: Directory, 551 subfs_map: &HashMap<String, crate::place_wisp::subfs::Directory>, 552 current_path: String, 553) -> Directory<'static> { 554 use jacquard_common::IntoStatic; 555 556 let new_entries: Vec<Entry<'static>> = directory 557 .entries 558 .into_iter() 559 .flat_map(|entry| { 560 let full_path = if current_path.is_empty() { 561 entry.name.to_string() 562 } else { 563 format!("{}/{}", current_path, entry.name) 564 }; 565 566 match entry.node { 567 EntryNode::Subfs(subfs_node) => { 568 // Check if we have this subfs record 569 if let Some(subfs_dir) = subfs_map.get(&full_path) { 570 let flat = subfs_node.flat.unwrap_or(true); // Default to flat merge 571 572 if flat { 573 // Flat merge: hoist subfs entries into parent 574 println!(" Merging subfs {} (flat)", full_path); 575 let converted_entries: Vec<Entry<'static>> = subfs_dir 576 .entries 577 .iter() 578 .map(|subfs_entry| convert_subfs_entry_to_fs(subfs_entry.clone().into_static())) 579 .collect(); 580 581 converted_entries 582 } else { 583 // Nested: create a directory with the subfs name 584 println!(" Merging subfs {} (nested)", full_path); 585 let converted_entries: Vec<Entry<'static>> = subfs_dir 586 .entries 587 .iter() 588 .map(|subfs_entry| convert_subfs_entry_to_fs(subfs_entry.clone().into_static())) 589 .collect(); 590 591 vec![Entry::new() 592 .name(entry.name.into_static()) 593 .node(EntryNode::Directory(Box::new( 594 Directory::new() 595 .r#type(CowStr::from("directory")) 596 .entries(converted_entries) 597 .build() 598 ))) 599 .build()] 600 } 601 } else { 602 // Subfs not found, skip with warning 603 eprintln!(" ⚠️ Subfs not found: {}", full_path); 604 vec![] 605 } 606 } 607 EntryNode::Directory(dir) => { 608 // Recursively process subdirectories 609 vec![Entry::new() 610 .name(entry.name.into_static()) 611 .node(EntryNode::Directory(Box::new( 612 replace_subfs_with_content(*dir, subfs_map, full_path) 613 ))) 614 .build()] 615 } 616 EntryNode::File(_) => { 617 vec![entry.into_static()] 618 } 619 EntryNode::Unknown(_) => { 620 vec![entry.into_static()] 621 } 622 } 623 }) 624 .collect(); 625 626 Directory::new() 627 .r#type(CowStr::from("directory")) 628 .entries(new_entries) 629 .build() 630} 631 632/// Convert a subfs entry to a fs entry (they have the same structure but different types) 633fn convert_subfs_entry_to_fs(subfs_entry: crate::place_wisp::subfs::Entry<'static>) -> Entry<'static> { 634 use jacquard_common::IntoStatic; 635 636 let node = match subfs_entry.node { 637 crate::place_wisp::subfs::EntryNode::File(file) => { 638 EntryNode::File(Box::new( 639 File::new() 640 .r#type(file.r#type.into_static()) 641 .blob(file.blob.into_static()) 642 .encoding(file.encoding.map(|e| e.into_static())) 643 .mime_type(file.mime_type.map(|m| m.into_static())) 644 .base64(file.base64) 645 .build() 646 )) 647 } 648 crate::place_wisp::subfs::EntryNode::Directory(dir) => { 649 let converted_entries: Vec<Entry<'static>> = dir 650 .entries 651 .into_iter() 652 .map(|e| convert_subfs_entry_to_fs(e.into_static())) 653 .collect(); 654 655 EntryNode::Directory(Box::new( 656 Directory::new() 657 .r#type(dir.r#type.into_static()) 658 .entries(converted_entries) 659 .build() 660 )) 661 } 662 crate::place_wisp::subfs::EntryNode::Subfs(_nested_subfs) => { 663 // Nested subfs should have been expanded already - if we get here, it means expansion failed 664 // Treat it like a directory reference that should have been expanded 665 eprintln!(" ⚠️ Warning: unexpanded nested subfs at path, treating as empty directory"); 666 EntryNode::Directory(Box::new( 667 Directory::new() 668 .r#type(CowStr::from("directory")) 669 .entries(vec![]) 670 .build() 671 )) 672 } 673 crate::place_wisp::subfs::EntryNode::Unknown(unknown) => { 674 EntryNode::Unknown(unknown) 675 } 676 }; 677 678 Entry::new() 679 .name(subfs_entry.name.into_static()) 680 .node(node) 681 .build() 682} 683