Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place
at main 25 kB view raw
1use crate::blob_map; 2use crate::download; 3use crate::metadata::SiteMetadata; 4use crate::place_wisp::fs::*; 5use crate::subfs_utils; 6use jacquard::CowStr; 7use jacquard::prelude::IdentityResolver; 8use jacquard_common::types::string::Did; 9use jacquard_common::xrpc::XrpcExt; 10use jacquard_identity::PublicResolver; 11use miette::IntoDiagnostic; 12use std::collections::HashMap; 13use std::path::{Path, PathBuf}; 14use url::Url; 15 16/// Pull a site from the PDS to a local directory 17pub async fn pull_site( 18 input: CowStr<'static>, 19 rkey: CowStr<'static>, 20 output_dir: PathBuf, 21) -> miette::Result<()> { 22 println!("Pulling site {} from {}...", rkey, input); 23 24 // Resolve handle to DID if needed 25 let resolver = PublicResolver::default(); 26 let did = if input.starts_with("did:") { 27 Did::new(&input).into_diagnostic()? 28 } else { 29 // It's a handle, resolve it 30 let handle = jacquard_common::types::string::Handle::new(&input).into_diagnostic()?; 31 resolver.resolve_handle(&handle).await.into_diagnostic()? 32 }; 33 34 // Resolve PDS endpoint for the DID 35 let pds_url = resolver.pds_for_did(&did).await.into_diagnostic()?; 36 println!("Resolved PDS: {}", pds_url); 37 38 // Create a temporary agent for fetching records (no auth needed for public reads) 39 println!("Fetching record from PDS..."); 40 let client = reqwest::Client::new(); 41 42 // Use com.atproto.repo.getRecord 43 use jacquard::api::com_atproto::repo::get_record::GetRecord; 44 use jacquard_common::types::string::Rkey as RkeyType; 45 let rkey_parsed = RkeyType::new(&rkey).into_diagnostic()?; 46 47 use jacquard_common::types::ident::AtIdentifier; 48 use jacquard_common::types::string::RecordKey; 49 let request = GetRecord::new() 50 .repo(AtIdentifier::Did(did.clone())) 51 .collection(CowStr::from("place.wisp.fs")) 52 .rkey(RecordKey::from(rkey_parsed)) 53 .build(); 54 55 let response = client 56 .xrpc(pds_url.clone()) 57 .send(&request) 58 .await 59 .into_diagnostic()?; 60 61 let record_output = response.into_output().into_diagnostic()?; 62 let record_cid = record_output.cid.as_ref().map(|c| c.to_string()).unwrap_or_default(); 63 64 // Parse the record value as Fs 65 use jacquard_common::types::value::from_data; 66 let fs_record: Fs = from_data(&record_output.value).into_diagnostic()?; 67 68 let file_count = fs_record.file_count.map(|c| c.to_string()).unwrap_or_else(|| "?".to_string()); 69 println!("Found site '{}' with {} files (in main record)", fs_record.site, file_count); 70 71 // Check for and expand subfs nodes 72 // Note: We use a custom expand function for pull since we don't have an Agent 73 let expanded_root = expand_subfs_in_pull_with_client(&fs_record.root, &client, &pds_url).await?; 74 let total_file_count = subfs_utils::count_files_in_directory(&expanded_root); 75 76 if total_file_count as i64 != fs_record.file_count.unwrap_or(0) { 77 println!("Total files after expanding subfs: {}", total_file_count); 78 } 79 80 // Load existing metadata for incremental updates 81 let existing_metadata = SiteMetadata::load(&output_dir)?; 82 let existing_file_cids = existing_metadata 83 .as_ref() 84 .map(|m| m.file_cids.clone()) 85 .unwrap_or_default(); 86 87 // Extract blob map from the expanded manifest 88 let new_blob_map = blob_map::extract_blob_map(&expanded_root); 89 let new_file_cids: HashMap<String, String> = new_blob_map 90 .iter() 91 .map(|(path, (_blob_ref, cid))| (path.clone(), cid.clone())) 92 .collect(); 93 94 // Clean up any leftover temp directories from previous failed attempts 95 let parent = output_dir.parent().unwrap_or_else(|| std::path::Path::new(".")); 96 let output_name = output_dir.file_name().unwrap_or_else(|| std::ffi::OsStr::new("site")).to_string_lossy(); 97 let temp_prefix = format!(".tmp-{}-", output_name); 98 99 if let Ok(entries) = parent.read_dir() { 100 for entry in entries.flatten() { 101 let name = entry.file_name(); 102 if name.to_string_lossy().starts_with(&temp_prefix) { 103 let _ = std::fs::remove_dir_all(entry.path()); 104 } 105 } 106 } 107 108 // Check if we need to update (verify files actually exist, not just metadata) 109 if let Some(metadata) = &existing_metadata { 110 if metadata.record_cid == record_cid { 111 // Verify that the output directory actually exists and has the expected files 112 let has_all_files = output_dir.exists() && { 113 // Count actual files on disk (excluding metadata) 114 let mut actual_file_count = 0; 115 if let Ok(entries) = std::fs::read_dir(&output_dir) { 116 for entry in entries.flatten() { 117 let name = entry.file_name(); 118 if !name.to_string_lossy().starts_with(".wisp-metadata") { 119 if entry.path().is_file() { 120 actual_file_count += 1; 121 } 122 } 123 } 124 } 125 126 // Compare with expected file count from metadata 127 let expected_count = metadata.file_cids.len(); 128 actual_file_count > 0 && actual_file_count >= expected_count 129 }; 130 131 if has_all_files { 132 println!("Site is already up to date!"); 133 return Ok(()); 134 } else { 135 println!("Site metadata exists but files are missing, re-downloading..."); 136 } 137 } 138 } 139 140 // Create temporary directory for atomic update 141 // Place temp dir in parent directory to avoid issues with non-existent output_dir 142 let parent = output_dir.parent().unwrap_or_else(|| std::path::Path::new(".")); 143 let temp_dir_name = format!( 144 ".tmp-{}-{}", 145 output_dir.file_name().unwrap_or_else(|| std::ffi::OsStr::new("site")).to_string_lossy(), 146 chrono::Utc::now().timestamp() 147 ); 148 let temp_dir = parent.join(temp_dir_name); 149 std::fs::create_dir_all(&temp_dir).into_diagnostic()?; 150 151 println!("Downloading files..."); 152 let mut downloaded = 0; 153 let mut reused = 0; 154 155 // Download files recursively (using expanded root) 156 let download_result = download_directory( 157 &expanded_root, 158 &temp_dir, 159 &pds_url, 160 did.as_str(), 161 &new_blob_map, 162 &existing_file_cids, 163 &output_dir, 164 String::new(), 165 &mut downloaded, 166 &mut reused, 167 ) 168 .await; 169 170 // If download failed, clean up temp directory 171 if let Err(e) = download_result { 172 let _ = std::fs::remove_dir_all(&temp_dir); 173 return Err(e); 174 } 175 176 println!( 177 "Downloaded {} files, reused {} files", 178 downloaded, reused 179 ); 180 181 // Save metadata 182 let metadata = SiteMetadata::new(record_cid, new_file_cids); 183 metadata.save(&temp_dir)?; 184 185 // Move files from temp to output directory 186 let output_abs = std::fs::canonicalize(&output_dir).unwrap_or_else(|_| output_dir.clone()); 187 let current_dir = std::env::current_dir().into_diagnostic()?; 188 189 // Special handling for pulling to current directory 190 if output_abs == current_dir { 191 // Move files from temp to current directory 192 for entry in std::fs::read_dir(&temp_dir).into_diagnostic()? { 193 let entry = entry.into_diagnostic()?; 194 let dest = current_dir.join(entry.file_name()); 195 196 // Remove existing file/dir if it exists 197 if dest.exists() { 198 if dest.is_dir() { 199 std::fs::remove_dir_all(&dest).into_diagnostic()?; 200 } else { 201 std::fs::remove_file(&dest).into_diagnostic()?; 202 } 203 } 204 205 // Move from temp to current dir 206 std::fs::rename(entry.path(), dest).into_diagnostic()?; 207 } 208 209 // Clean up temp directory 210 std::fs::remove_dir_all(&temp_dir).into_diagnostic()?; 211 } else { 212 // If output directory exists and has content, remove it first 213 if output_dir.exists() { 214 std::fs::remove_dir_all(&output_dir).into_diagnostic()?; 215 } 216 217 // Ensure parent directory exists 218 if let Some(parent) = output_dir.parent() { 219 if !parent.as_os_str().is_empty() && !parent.exists() { 220 std::fs::create_dir_all(parent).into_diagnostic()?; 221 } 222 } 223 224 // Rename temp to final location 225 match std::fs::rename(&temp_dir, &output_dir) { 226 Ok(_) => {}, 227 Err(e) => { 228 // Clean up temp directory on failure 229 let _ = std::fs::remove_dir_all(&temp_dir); 230 return Err(miette::miette!("Failed to move temp directory: {}", e)); 231 } 232 } 233 } 234 235 println!("✓ Site pulled successfully to {}", output_dir.display()); 236 237 Ok(()) 238} 239 240/// Recursively download a directory with concurrent downloads 241fn download_directory<'a>( 242 dir: &'a Directory<'_>, 243 output_dir: &'a Path, 244 pds_url: &'a Url, 245 did: &'a str, 246 new_blob_map: &'a HashMap<String, (jacquard_common::types::blob::BlobRef<'static>, String)>, 247 existing_file_cids: &'a HashMap<String, String>, 248 existing_output_dir: &'a Path, 249 path_prefix: String, 250 downloaded: &'a mut usize, 251 reused: &'a mut usize, 252) -> std::pin::Pin<Box<dyn std::future::Future<Output = miette::Result<()>> + Send + 'a>> { 253 Box::pin(async move { 254 use futures::stream::{self, StreamExt}; 255 256 // Collect download tasks and directory tasks separately 257 struct DownloadTask { 258 path: String, 259 output_path: PathBuf, 260 blob: jacquard_common::types::blob::BlobRef<'static>, 261 base64: bool, 262 gzip: bool, 263 } 264 265 struct CopyTask { 266 path: String, 267 from: PathBuf, 268 to: PathBuf, 269 } 270 271 let mut download_tasks = Vec::new(); 272 let mut copy_tasks = Vec::new(); 273 let mut dir_tasks = Vec::new(); 274 275 for entry in &dir.entries { 276 let entry_name = entry.name.as_str(); 277 let current_path = if path_prefix.is_empty() { 278 entry_name.to_string() 279 } else { 280 format!("{}/{}", path_prefix, entry_name) 281 }; 282 283 match &entry.node { 284 EntryNode::File(file) => { 285 let output_path = output_dir.join(entry_name); 286 287 // Check if file CID matches existing 288 let should_copy = if let Some((_blob_ref, new_cid)) = new_blob_map.get(&current_path) { 289 if let Some(existing_cid) = existing_file_cids.get(&current_path) { 290 if existing_cid == new_cid { 291 let existing_path = existing_output_dir.join(&current_path); 292 if existing_path.exists() { 293 copy_tasks.push(CopyTask { 294 path: current_path.clone(), 295 from: existing_path, 296 to: output_path.clone(), 297 }); 298 true 299 } else { 300 false 301 } 302 } else { 303 false 304 } 305 } else { 306 false 307 } 308 } else { 309 false 310 }; 311 312 if !should_copy { 313 use jacquard_common::IntoStatic; 314 // File needs to be downloaded 315 download_tasks.push(DownloadTask { 316 path: current_path, 317 output_path, 318 blob: file.blob.clone().into_static(), 319 base64: file.base64.unwrap_or(false), 320 gzip: file.encoding.as_ref().map(|e| e.as_str() == "gzip").unwrap_or(false), 321 }); 322 } 323 } 324 EntryNode::Directory(subdir) => { 325 let subdir_path = output_dir.join(entry_name); 326 dir_tasks.push((subdir.as_ref().clone(), subdir_path, current_path)); 327 } 328 EntryNode::Subfs(_) => { 329 println!(" ⚠ Skipping subfs node at {} (should have been expanded)", current_path); 330 } 331 EntryNode::Unknown(_) => { 332 println!(" ⚠ Skipping unknown node type for {}", current_path); 333 } 334 } 335 } 336 337 // Execute copy tasks (fast, do them all) 338 for task in copy_tasks { 339 std::fs::copy(&task.from, &task.to).into_diagnostic()?; 340 *reused += 1; 341 println!(" ✓ Reused {}", task.path); 342 } 343 344 // Execute download tasks with concurrency limit (20 concurrent downloads) 345 const DOWNLOAD_CONCURRENCY: usize = 20; 346 347 let pds_url_clone = pds_url.clone(); 348 let did_str = did.to_string(); 349 350 let download_results: Vec<miette::Result<(String, PathBuf, Vec<u8>)>> = stream::iter(download_tasks) 351 .map(|task| { 352 let pds = pds_url_clone.clone(); 353 let did_copy = did_str.clone(); 354 355 async move { 356 println!(" ↓ Downloading {}", task.path); 357 let data = download::download_and_decompress_blob( 358 &pds, 359 &task.blob, 360 &did_copy, 361 task.base64, 362 task.gzip, 363 ) 364 .await?; 365 366 Ok::<_, miette::Report>((task.path, task.output_path, data)) 367 } 368 }) 369 .buffer_unordered(DOWNLOAD_CONCURRENCY) 370 .collect() 371 .await; 372 373 // Write downloaded files to disk 374 for result in download_results { 375 let (path, output_path, data) = result?; 376 std::fs::write(&output_path, data).into_diagnostic()?; 377 *downloaded += 1; 378 println!(" ✓ Downloaded {}", path); 379 } 380 381 // Recursively process directories 382 for (subdir, subdir_path, current_path) in dir_tasks { 383 std::fs::create_dir_all(&subdir_path).into_diagnostic()?; 384 385 download_directory( 386 &subdir, 387 &subdir_path, 388 pds_url, 389 did, 390 new_blob_map, 391 existing_file_cids, 392 existing_output_dir, 393 current_path, 394 downloaded, 395 reused, 396 ) 397 .await?; 398 } 399 400 Ok(()) 401 }) 402} 403 404/// Expand subfs nodes in a directory tree by fetching and merging subfs records (RECURSIVELY) 405/// Uses reqwest client directly for pull command (no agent needed) 406async fn expand_subfs_in_pull_with_client<'a>( 407 directory: &Directory<'a>, 408 client: &reqwest::Client, 409 pds_url: &Url, 410) -> miette::Result<Directory<'static>> { 411 use jacquard_common::IntoStatic; 412 use jacquard_common::types::value::from_data; 413 use crate::place_wisp::subfs::SubfsRecord; 414 415 let mut all_subfs_map: HashMap<String, crate::place_wisp::subfs::Directory> = HashMap::new(); 416 let mut to_fetch = subfs_utils::extract_subfs_uris(directory, String::new()); 417 418 if to_fetch.is_empty() { 419 return Ok((*directory).clone().into_static()); 420 } 421 422 println!("Found {} subfs records, fetching recursively...", to_fetch.len()); 423 424 let mut iteration = 0; 425 const MAX_ITERATIONS: usize = 10; 426 427 while !to_fetch.is_empty() && iteration < MAX_ITERATIONS { 428 iteration += 1; 429 println!(" Iteration {}: fetching {} subfs records...", iteration, to_fetch.len()); 430 431 let mut fetch_tasks = Vec::new(); 432 433 for (uri, path) in to_fetch.clone() { 434 let client = client.clone(); 435 let pds_url = pds_url.clone(); 436 437 fetch_tasks.push(async move { 438 // Parse URI 439 let parts: Vec<&str> = uri.trim_start_matches("at://").split('/').collect(); 440 if parts.len() < 3 { 441 return Err(miette::miette!("Invalid subfs URI: {}", uri)); 442 } 443 444 let did_str = parts[0]; 445 let collection = parts[1]; 446 let rkey_str = parts[2]; 447 448 if collection != "place.wisp.subfs" { 449 return Err(miette::miette!("Expected place.wisp.subfs collection, got: {}", collection)); 450 } 451 452 // Fetch using GetRecord 453 use jacquard::api::com_atproto::repo::get_record::GetRecord; 454 use jacquard_common::types::string::{Rkey as RkeyType, Did as DidType, RecordKey}; 455 use jacquard_common::types::ident::AtIdentifier; 456 457 let rkey_parsed = RkeyType::new(rkey_str).into_diagnostic()?; 458 let did_parsed = DidType::new(did_str).into_diagnostic()?; 459 460 let request = GetRecord::new() 461 .repo(AtIdentifier::Did(did_parsed)) 462 .collection(CowStr::from("place.wisp.subfs")) 463 .rkey(RecordKey::from(rkey_parsed)) 464 .build(); 465 466 let response = client 467 .xrpc(pds_url) 468 .send(&request) 469 .await 470 .into_diagnostic()?; 471 472 let record_output = response.into_output().into_diagnostic()?; 473 let subfs_record: SubfsRecord = from_data(&record_output.value).into_diagnostic()?; 474 475 Ok::<_, miette::Report>((path, subfs_record.into_static())) 476 }); 477 } 478 479 let results: Vec<_> = futures::future::join_all(fetch_tasks).await; 480 481 // Process results and find nested subfs 482 let mut newly_found_uris = Vec::new(); 483 for result in results { 484 match result { 485 Ok((path, record)) => { 486 println!(" ✓ Fetched subfs at {}", path); 487 488 // Extract nested subfs URIs 489 let nested_uris = extract_subfs_uris_from_subfs_dir(&record.root, path.clone()); 490 newly_found_uris.extend(nested_uris); 491 492 all_subfs_map.insert(path, record.root); 493 } 494 Err(e) => { 495 eprintln!(" ⚠️ Failed to fetch subfs: {}", e); 496 } 497 } 498 } 499 500 // Filter out already-fetched paths 501 to_fetch = newly_found_uris 502 .into_iter() 503 .filter(|(_, path)| !all_subfs_map.contains_key(path)) 504 .collect(); 505 } 506 507 if iteration >= MAX_ITERATIONS { 508 eprintln!("⚠️ Max iterations reached while fetching nested subfs"); 509 } 510 511 println!(" Total subfs records fetched: {}", all_subfs_map.len()); 512 513 // Now replace all subfs nodes with their content 514 Ok(replace_subfs_with_content(directory.clone(), &all_subfs_map, String::new())) 515} 516 517/// Extract subfs URIs from a subfs::Directory (helper for pull) 518fn extract_subfs_uris_from_subfs_dir( 519 directory: &crate::place_wisp::subfs::Directory, 520 current_path: String, 521) -> Vec<(String, String)> { 522 let mut uris = Vec::new(); 523 524 for entry in &directory.entries { 525 let full_path = if current_path.is_empty() { 526 entry.name.to_string() 527 } else { 528 format!("{}/{}", current_path, entry.name) 529 }; 530 531 match &entry.node { 532 crate::place_wisp::subfs::EntryNode::Subfs(subfs_node) => { 533 uris.push((subfs_node.subject.to_string(), full_path.clone())); 534 } 535 crate::place_wisp::subfs::EntryNode::Directory(subdir) => { 536 let nested = extract_subfs_uris_from_subfs_dir(subdir, full_path); 537 uris.extend(nested); 538 } 539 _ => {} 540 } 541 } 542 543 uris 544} 545 546/// Recursively replace subfs nodes with their actual content 547fn replace_subfs_with_content( 548 directory: Directory, 549 subfs_map: &HashMap<String, crate::place_wisp::subfs::Directory>, 550 current_path: String, 551) -> Directory<'static> { 552 use jacquard_common::IntoStatic; 553 554 let new_entries: Vec<Entry<'static>> = directory 555 .entries 556 .into_iter() 557 .flat_map(|entry| { 558 let full_path = if current_path.is_empty() { 559 entry.name.to_string() 560 } else { 561 format!("{}/{}", current_path, entry.name) 562 }; 563 564 match entry.node { 565 EntryNode::Subfs(subfs_node) => { 566 // Check if we have this subfs record 567 if let Some(subfs_dir) = subfs_map.get(&full_path) { 568 let flat = subfs_node.flat.unwrap_or(true); // Default to flat merge 569 570 if flat { 571 // Flat merge: hoist subfs entries into parent 572 println!(" Merging subfs {} (flat)", full_path); 573 let converted_entries: Vec<Entry<'static>> = subfs_dir 574 .entries 575 .iter() 576 .map(|subfs_entry| convert_subfs_entry_to_fs(subfs_entry.clone().into_static())) 577 .collect(); 578 579 converted_entries 580 } else { 581 // Nested: create a directory with the subfs name 582 println!(" Merging subfs {} (nested)", full_path); 583 let converted_entries: Vec<Entry<'static>> = subfs_dir 584 .entries 585 .iter() 586 .map(|subfs_entry| convert_subfs_entry_to_fs(subfs_entry.clone().into_static())) 587 .collect(); 588 589 vec![Entry::new() 590 .name(entry.name.into_static()) 591 .node(EntryNode::Directory(Box::new( 592 Directory::new() 593 .r#type(CowStr::from("directory")) 594 .entries(converted_entries) 595 .build() 596 ))) 597 .build()] 598 } 599 } else { 600 // Subfs not found, skip with warning 601 eprintln!(" ⚠️ Subfs not found: {}", full_path); 602 vec![] 603 } 604 } 605 EntryNode::Directory(dir) => { 606 // Recursively process subdirectories 607 vec![Entry::new() 608 .name(entry.name.into_static()) 609 .node(EntryNode::Directory(Box::new( 610 replace_subfs_with_content(*dir, subfs_map, full_path) 611 ))) 612 .build()] 613 } 614 EntryNode::File(_) => { 615 vec![entry.into_static()] 616 } 617 EntryNode::Unknown(_) => { 618 vec![entry.into_static()] 619 } 620 } 621 }) 622 .collect(); 623 624 Directory::new() 625 .r#type(CowStr::from("directory")) 626 .entries(new_entries) 627 .build() 628} 629 630/// Convert a subfs entry to a fs entry (they have the same structure but different types) 631fn convert_subfs_entry_to_fs(subfs_entry: crate::place_wisp::subfs::Entry<'static>) -> Entry<'static> { 632 use jacquard_common::IntoStatic; 633 634 let node = match subfs_entry.node { 635 crate::place_wisp::subfs::EntryNode::File(file) => { 636 EntryNode::File(Box::new( 637 File::new() 638 .r#type(file.r#type.into_static()) 639 .blob(file.blob.into_static()) 640 .encoding(file.encoding.map(|e| e.into_static())) 641 .mime_type(file.mime_type.map(|m| m.into_static())) 642 .base64(file.base64) 643 .build() 644 )) 645 } 646 crate::place_wisp::subfs::EntryNode::Directory(dir) => { 647 let converted_entries: Vec<Entry<'static>> = dir 648 .entries 649 .into_iter() 650 .map(|e| convert_subfs_entry_to_fs(e.into_static())) 651 .collect(); 652 653 EntryNode::Directory(Box::new( 654 Directory::new() 655 .r#type(dir.r#type.into_static()) 656 .entries(converted_entries) 657 .build() 658 )) 659 } 660 crate::place_wisp::subfs::EntryNode::Subfs(_nested_subfs) => { 661 // Nested subfs should have been expanded already - if we get here, it means expansion failed 662 // Treat it like a directory reference that should have been expanded 663 eprintln!(" ⚠️ Warning: unexpanded nested subfs at path, treating as empty directory"); 664 EntryNode::Directory(Box::new( 665 Directory::new() 666 .r#type(CowStr::from("directory")) 667 .entries(vec![]) 668 .build() 669 )) 670 } 671 crate::place_wisp::subfs::EntryNode::Unknown(unknown) => { 672 EntryNode::Unknown(unknown) 673 } 674 }; 675 676 Entry::new() 677 .name(subfs_entry.name.into_static()) 678 .node(node) 679 .build() 680} 681