Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol.
wisp.place
1use crate::blob_map;
2use crate::download;
3use crate::metadata::SiteMetadata;
4use crate::place_wisp::fs::*;
5use crate::subfs_utils;
6use jacquard::CowStr;
7use jacquard::prelude::IdentityResolver;
8use jacquard_common::types::string::Did;
9use jacquard_common::xrpc::XrpcExt;
10use jacquard_identity::PublicResolver;
11use miette::IntoDiagnostic;
12use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use url::Url;
15
16/// Pull a site from the PDS to a local directory
17pub async fn pull_site(
18 input: CowStr<'static>,
19 rkey: CowStr<'static>,
20 output_dir: PathBuf,
21) -> miette::Result<()> {
22 println!("Pulling site {} from {}...", rkey, input);
23
24 // Resolve handle to DID if needed
25 let resolver = PublicResolver::default();
26 let did = if input.starts_with("did:") {
27 Did::new(&input).into_diagnostic()?
28 } else {
29 // It's a handle, resolve it
30 let handle = jacquard_common::types::string::Handle::new(&input).into_diagnostic()?;
31 resolver.resolve_handle(&handle).await.into_diagnostic()?
32 };
33
34 // Resolve PDS endpoint for the DID
35 let pds_url = resolver.pds_for_did(&did).await.into_diagnostic()?;
36 println!("Resolved PDS: {}", pds_url);
37
38 // Create a temporary agent for fetching records (no auth needed for public reads)
39 println!("Fetching record from PDS...");
40 let client = reqwest::Client::new();
41
42 // Use com.atproto.repo.getRecord
43 use jacquard::api::com_atproto::repo::get_record::GetRecord;
44 use jacquard_common::types::string::Rkey as RkeyType;
45 let rkey_parsed = RkeyType::new(&rkey).into_diagnostic()?;
46
47 use jacquard_common::types::ident::AtIdentifier;
48 use jacquard_common::types::string::RecordKey;
49 let request = GetRecord::new()
50 .repo(AtIdentifier::Did(did.clone()))
51 .collection(CowStr::from("place.wisp.fs"))
52 .rkey(RecordKey::from(rkey_parsed))
53 .build();
54
55 let response = client
56 .xrpc(pds_url.clone())
57 .send(&request)
58 .await
59 .into_diagnostic()?;
60
61 let record_output = response.into_output().into_diagnostic()?;
62 let record_cid = record_output.cid.as_ref().map(|c| c.to_string()).unwrap_or_default();
63
64 // Parse the record value as Fs
65 use jacquard_common::types::value::from_data;
66 let fs_record: Fs = from_data(&record_output.value).into_diagnostic()?;
67
68 let file_count = fs_record.file_count.map(|c| c.to_string()).unwrap_or_else(|| "?".to_string());
69 println!("Found site '{}' with {} files (in main record)", fs_record.site, file_count);
70
71 // Check for and expand subfs nodes
72 // Note: We use a custom expand function for pull since we don't have an Agent
73 let expanded_root = expand_subfs_in_pull_with_client(&fs_record.root, &client, &pds_url).await?;
74 let total_file_count = subfs_utils::count_files_in_directory(&expanded_root);
75
76 if total_file_count as i64 != fs_record.file_count.unwrap_or(0) {
77 println!("Total files after expanding subfs: {}", total_file_count);
78 }
79
80 // Load existing metadata for incremental updates
81 let existing_metadata = SiteMetadata::load(&output_dir)?;
82 let existing_file_cids = existing_metadata
83 .as_ref()
84 .map(|m| m.file_cids.clone())
85 .unwrap_or_default();
86
87 // Extract blob map from the expanded manifest
88 let new_blob_map = blob_map::extract_blob_map(&expanded_root);
89 let new_file_cids: HashMap<String, String> = new_blob_map
90 .iter()
91 .map(|(path, (_blob_ref, cid))| (path.clone(), cid.clone()))
92 .collect();
93
94 // Clean up any leftover temp directories from previous failed attempts
95 let parent = output_dir.parent().unwrap_or_else(|| std::path::Path::new("."));
96 let output_name = output_dir.file_name().unwrap_or_else(|| std::ffi::OsStr::new("site")).to_string_lossy();
97 let temp_prefix = format!(".tmp-{}-", output_name);
98
99 if let Ok(entries) = parent.read_dir() {
100 for entry in entries.flatten() {
101 let name = entry.file_name();
102 if name.to_string_lossy().starts_with(&temp_prefix) {
103 let _ = std::fs::remove_dir_all(entry.path());
104 }
105 }
106 }
107
108 // Check if we need to update (verify files actually exist, not just metadata)
109 if let Some(metadata) = &existing_metadata {
110 if metadata.record_cid == record_cid {
111 // Verify that the output directory actually exists and has the expected files
112 let has_all_files = output_dir.exists() && {
113 // Count actual files on disk (excluding metadata)
114 let mut actual_file_count = 0;
115 if let Ok(entries) = std::fs::read_dir(&output_dir) {
116 for entry in entries.flatten() {
117 let name = entry.file_name();
118 if !name.to_string_lossy().starts_with(".wisp-metadata") {
119 if entry.path().is_file() {
120 actual_file_count += 1;
121 }
122 }
123 }
124 }
125
126 // Compare with expected file count from metadata
127 let expected_count = metadata.file_cids.len();
128 actual_file_count > 0 && actual_file_count >= expected_count
129 };
130
131 if has_all_files {
132 println!("Site is already up to date!");
133 return Ok(());
134 } else {
135 println!("Site metadata exists but files are missing, re-downloading...");
136 }
137 }
138 }
139
140 // Create temporary directory for atomic update
141 // Place temp dir in parent directory to avoid issues with non-existent output_dir
142 let parent = output_dir.parent().unwrap_or_else(|| std::path::Path::new("."));
143 let temp_dir_name = format!(
144 ".tmp-{}-{}",
145 output_dir.file_name().unwrap_or_else(|| std::ffi::OsStr::new("site")).to_string_lossy(),
146 chrono::Utc::now().timestamp()
147 );
148 let temp_dir = parent.join(temp_dir_name);
149 std::fs::create_dir_all(&temp_dir).into_diagnostic()?;
150
151 println!("Downloading files...");
152 let mut downloaded = 0;
153 let mut reused = 0;
154
155 // Download files recursively (using expanded root)
156 let download_result = download_directory(
157 &expanded_root,
158 &temp_dir,
159 &pds_url,
160 did.as_str(),
161 &new_blob_map,
162 &existing_file_cids,
163 &output_dir,
164 String::new(),
165 &mut downloaded,
166 &mut reused,
167 )
168 .await;
169
170 // If download failed, clean up temp directory
171 if let Err(e) = download_result {
172 let _ = std::fs::remove_dir_all(&temp_dir);
173 return Err(e);
174 }
175
176 println!(
177 "Downloaded {} files, reused {} files",
178 downloaded, reused
179 );
180
181 // Save metadata
182 let metadata = SiteMetadata::new(record_cid, new_file_cids);
183 metadata.save(&temp_dir)?;
184
185 // Move files from temp to output directory
186 let output_abs = std::fs::canonicalize(&output_dir).unwrap_or_else(|_| output_dir.clone());
187 let current_dir = std::env::current_dir().into_diagnostic()?;
188
189 // Special handling for pulling to current directory
190 if output_abs == current_dir {
191 // Move files from temp to current directory
192 for entry in std::fs::read_dir(&temp_dir).into_diagnostic()? {
193 let entry = entry.into_diagnostic()?;
194 let dest = current_dir.join(entry.file_name());
195
196 // Remove existing file/dir if it exists
197 if dest.exists() {
198 if dest.is_dir() {
199 std::fs::remove_dir_all(&dest).into_diagnostic()?;
200 } else {
201 std::fs::remove_file(&dest).into_diagnostic()?;
202 }
203 }
204
205 // Move from temp to current dir
206 std::fs::rename(entry.path(), dest).into_diagnostic()?;
207 }
208
209 // Clean up temp directory
210 std::fs::remove_dir_all(&temp_dir).into_diagnostic()?;
211 } else {
212 // If output directory exists and has content, remove it first
213 if output_dir.exists() {
214 std::fs::remove_dir_all(&output_dir).into_diagnostic()?;
215 }
216
217 // Ensure parent directory exists
218 if let Some(parent) = output_dir.parent() {
219 if !parent.as_os_str().is_empty() && !parent.exists() {
220 std::fs::create_dir_all(parent).into_diagnostic()?;
221 }
222 }
223
224 // Rename temp to final location
225 match std::fs::rename(&temp_dir, &output_dir) {
226 Ok(_) => {},
227 Err(e) => {
228 // Clean up temp directory on failure
229 let _ = std::fs::remove_dir_all(&temp_dir);
230 return Err(miette::miette!("Failed to move temp directory: {}", e));
231 }
232 }
233 }
234
235 println!("✓ Site pulled successfully to {}", output_dir.display());
236
237 Ok(())
238}
239
240/// Recursively download a directory with concurrent downloads
241fn download_directory<'a>(
242 dir: &'a Directory<'_>,
243 output_dir: &'a Path,
244 pds_url: &'a Url,
245 did: &'a str,
246 new_blob_map: &'a HashMap<String, (jacquard_common::types::blob::BlobRef<'static>, String)>,
247 existing_file_cids: &'a HashMap<String, String>,
248 existing_output_dir: &'a Path,
249 path_prefix: String,
250 downloaded: &'a mut usize,
251 reused: &'a mut usize,
252) -> std::pin::Pin<Box<dyn std::future::Future<Output = miette::Result<()>> + Send + 'a>> {
253 Box::pin(async move {
254 use futures::stream::{self, StreamExt};
255
256 // Collect download tasks and directory tasks separately
257 struct DownloadTask {
258 path: String,
259 output_path: PathBuf,
260 blob: jacquard_common::types::blob::BlobRef<'static>,
261 base64: bool,
262 gzip: bool,
263 }
264
265 struct CopyTask {
266 path: String,
267 from: PathBuf,
268 to: PathBuf,
269 }
270
271 let mut download_tasks = Vec::new();
272 let mut copy_tasks = Vec::new();
273 let mut dir_tasks = Vec::new();
274
275 for entry in &dir.entries {
276 let entry_name = entry.name.as_str();
277 let current_path = if path_prefix.is_empty() {
278 entry_name.to_string()
279 } else {
280 format!("{}/{}", path_prefix, entry_name)
281 };
282
283 match &entry.node {
284 EntryNode::File(file) => {
285 let output_path = output_dir.join(entry_name);
286
287 // Check if file CID matches existing
288 let should_copy = if let Some((_blob_ref, new_cid)) = new_blob_map.get(¤t_path) {
289 if let Some(existing_cid) = existing_file_cids.get(¤t_path) {
290 if existing_cid == new_cid {
291 let existing_path = existing_output_dir.join(¤t_path);
292 if existing_path.exists() {
293 copy_tasks.push(CopyTask {
294 path: current_path.clone(),
295 from: existing_path,
296 to: output_path.clone(),
297 });
298 true
299 } else {
300 false
301 }
302 } else {
303 false
304 }
305 } else {
306 false
307 }
308 } else {
309 false
310 };
311
312 if !should_copy {
313 use jacquard_common::IntoStatic;
314 // File needs to be downloaded
315 download_tasks.push(DownloadTask {
316 path: current_path,
317 output_path,
318 blob: file.blob.clone().into_static(),
319 base64: file.base64.unwrap_or(false),
320 gzip: file.encoding.as_ref().map(|e| e.as_str() == "gzip").unwrap_or(false),
321 });
322 }
323 }
324 EntryNode::Directory(subdir) => {
325 let subdir_path = output_dir.join(entry_name);
326 dir_tasks.push((subdir.as_ref().clone(), subdir_path, current_path));
327 }
328 EntryNode::Subfs(_) => {
329 println!(" ⚠ Skipping subfs node at {} (should have been expanded)", current_path);
330 }
331 EntryNode::Unknown(_) => {
332 println!(" ⚠ Skipping unknown node type for {}", current_path);
333 }
334 }
335 }
336
337 // Execute copy tasks (fast, do them all)
338 for task in copy_tasks {
339 std::fs::copy(&task.from, &task.to).into_diagnostic()?;
340 *reused += 1;
341 println!(" ✓ Reused {}", task.path);
342 }
343
344 // Execute download tasks with concurrency limit (20 concurrent downloads)
345 const DOWNLOAD_CONCURRENCY: usize = 20;
346
347 let pds_url_clone = pds_url.clone();
348 let did_str = did.to_string();
349
350 let download_results: Vec<miette::Result<(String, PathBuf, Vec<u8>)>> = stream::iter(download_tasks)
351 .map(|task| {
352 let pds = pds_url_clone.clone();
353 let did_copy = did_str.clone();
354
355 async move {
356 println!(" ↓ Downloading {}", task.path);
357 let data = download::download_and_decompress_blob(
358 &pds,
359 &task.blob,
360 &did_copy,
361 task.base64,
362 task.gzip,
363 )
364 .await?;
365
366 Ok::<_, miette::Report>((task.path, task.output_path, data))
367 }
368 })
369 .buffer_unordered(DOWNLOAD_CONCURRENCY)
370 .collect()
371 .await;
372
373 // Write downloaded files to disk
374 for result in download_results {
375 let (path, output_path, data) = result?;
376 std::fs::write(&output_path, data).into_diagnostic()?;
377 *downloaded += 1;
378 println!(" ✓ Downloaded {}", path);
379 }
380
381 // Recursively process directories
382 for (subdir, subdir_path, current_path) in dir_tasks {
383 std::fs::create_dir_all(&subdir_path).into_diagnostic()?;
384
385 download_directory(
386 &subdir,
387 &subdir_path,
388 pds_url,
389 did,
390 new_blob_map,
391 existing_file_cids,
392 existing_output_dir,
393 current_path,
394 downloaded,
395 reused,
396 )
397 .await?;
398 }
399
400 Ok(())
401 })
402}
403
404/// Expand subfs nodes in a directory tree by fetching and merging subfs records (RECURSIVELY)
405/// Uses reqwest client directly for pull command (no agent needed)
406async fn expand_subfs_in_pull_with_client<'a>(
407 directory: &Directory<'a>,
408 client: &reqwest::Client,
409 pds_url: &Url,
410) -> miette::Result<Directory<'static>> {
411 use jacquard_common::IntoStatic;
412 use jacquard_common::types::value::from_data;
413 use crate::place_wisp::subfs::SubfsRecord;
414
415 let mut all_subfs_map: HashMap<String, crate::place_wisp::subfs::Directory> = HashMap::new();
416 let mut to_fetch = subfs_utils::extract_subfs_uris(directory, String::new());
417
418 if to_fetch.is_empty() {
419 return Ok((*directory).clone().into_static());
420 }
421
422 println!("Found {} subfs records, fetching recursively...", to_fetch.len());
423
424 let mut iteration = 0;
425 const MAX_ITERATIONS: usize = 10;
426
427 while !to_fetch.is_empty() && iteration < MAX_ITERATIONS {
428 iteration += 1;
429 println!(" Iteration {}: fetching {} subfs records...", iteration, to_fetch.len());
430
431 let mut fetch_tasks = Vec::new();
432
433 for (uri, path) in to_fetch.clone() {
434 let client = client.clone();
435 let pds_url = pds_url.clone();
436
437 fetch_tasks.push(async move {
438 // Parse URI
439 let parts: Vec<&str> = uri.trim_start_matches("at://").split('/').collect();
440 if parts.len() < 3 {
441 return Err(miette::miette!("Invalid subfs URI: {}", uri));
442 }
443
444 let did_str = parts[0];
445 let collection = parts[1];
446 let rkey_str = parts[2];
447
448 if collection != "place.wisp.subfs" {
449 return Err(miette::miette!("Expected place.wisp.subfs collection, got: {}", collection));
450 }
451
452 // Fetch using GetRecord
453 use jacquard::api::com_atproto::repo::get_record::GetRecord;
454 use jacquard_common::types::string::{Rkey as RkeyType, Did as DidType, RecordKey};
455 use jacquard_common::types::ident::AtIdentifier;
456
457 let rkey_parsed = RkeyType::new(rkey_str).into_diagnostic()?;
458 let did_parsed = DidType::new(did_str).into_diagnostic()?;
459
460 let request = GetRecord::new()
461 .repo(AtIdentifier::Did(did_parsed))
462 .collection(CowStr::from("place.wisp.subfs"))
463 .rkey(RecordKey::from(rkey_parsed))
464 .build();
465
466 let response = client
467 .xrpc(pds_url)
468 .send(&request)
469 .await
470 .into_diagnostic()?;
471
472 let record_output = response.into_output().into_diagnostic()?;
473 let subfs_record: SubfsRecord = from_data(&record_output.value).into_diagnostic()?;
474
475 Ok::<_, miette::Report>((path, subfs_record.into_static()))
476 });
477 }
478
479 let results: Vec<_> = futures::future::join_all(fetch_tasks).await;
480
481 // Process results and find nested subfs
482 let mut newly_found_uris = Vec::new();
483 for result in results {
484 match result {
485 Ok((path, record)) => {
486 println!(" ✓ Fetched subfs at {}", path);
487
488 // Extract nested subfs URIs
489 let nested_uris = extract_subfs_uris_from_subfs_dir(&record.root, path.clone());
490 newly_found_uris.extend(nested_uris);
491
492 all_subfs_map.insert(path, record.root);
493 }
494 Err(e) => {
495 eprintln!(" ⚠️ Failed to fetch subfs: {}", e);
496 }
497 }
498 }
499
500 // Filter out already-fetched paths
501 to_fetch = newly_found_uris
502 .into_iter()
503 .filter(|(_, path)| !all_subfs_map.contains_key(path))
504 .collect();
505 }
506
507 if iteration >= MAX_ITERATIONS {
508 eprintln!("⚠️ Max iterations reached while fetching nested subfs");
509 }
510
511 println!(" Total subfs records fetched: {}", all_subfs_map.len());
512
513 // Now replace all subfs nodes with their content
514 Ok(replace_subfs_with_content(directory.clone(), &all_subfs_map, String::new()))
515}
516
517/// Extract subfs URIs from a subfs::Directory (helper for pull)
518fn extract_subfs_uris_from_subfs_dir(
519 directory: &crate::place_wisp::subfs::Directory,
520 current_path: String,
521) -> Vec<(String, String)> {
522 let mut uris = Vec::new();
523
524 for entry in &directory.entries {
525 let full_path = if current_path.is_empty() {
526 entry.name.to_string()
527 } else {
528 format!("{}/{}", current_path, entry.name)
529 };
530
531 match &entry.node {
532 crate::place_wisp::subfs::EntryNode::Subfs(subfs_node) => {
533 uris.push((subfs_node.subject.to_string(), full_path.clone()));
534 }
535 crate::place_wisp::subfs::EntryNode::Directory(subdir) => {
536 let nested = extract_subfs_uris_from_subfs_dir(subdir, full_path);
537 uris.extend(nested);
538 }
539 _ => {}
540 }
541 }
542
543 uris
544}
545
546/// Recursively replace subfs nodes with their actual content
547fn replace_subfs_with_content(
548 directory: Directory,
549 subfs_map: &HashMap<String, crate::place_wisp::subfs::Directory>,
550 current_path: String,
551) -> Directory<'static> {
552 use jacquard_common::IntoStatic;
553
554 let new_entries: Vec<Entry<'static>> = directory
555 .entries
556 .into_iter()
557 .flat_map(|entry| {
558 let full_path = if current_path.is_empty() {
559 entry.name.to_string()
560 } else {
561 format!("{}/{}", current_path, entry.name)
562 };
563
564 match entry.node {
565 EntryNode::Subfs(subfs_node) => {
566 // Check if we have this subfs record
567 if let Some(subfs_dir) = subfs_map.get(&full_path) {
568 let flat = subfs_node.flat.unwrap_or(true); // Default to flat merge
569
570 if flat {
571 // Flat merge: hoist subfs entries into parent
572 println!(" Merging subfs {} (flat)", full_path);
573 let converted_entries: Vec<Entry<'static>> = subfs_dir
574 .entries
575 .iter()
576 .map(|subfs_entry| convert_subfs_entry_to_fs(subfs_entry.clone().into_static()))
577 .collect();
578
579 converted_entries
580 } else {
581 // Nested: create a directory with the subfs name
582 println!(" Merging subfs {} (nested)", full_path);
583 let converted_entries: Vec<Entry<'static>> = subfs_dir
584 .entries
585 .iter()
586 .map(|subfs_entry| convert_subfs_entry_to_fs(subfs_entry.clone().into_static()))
587 .collect();
588
589 vec![Entry::new()
590 .name(entry.name.into_static())
591 .node(EntryNode::Directory(Box::new(
592 Directory::new()
593 .r#type(CowStr::from("directory"))
594 .entries(converted_entries)
595 .build()
596 )))
597 .build()]
598 }
599 } else {
600 // Subfs not found, skip with warning
601 eprintln!(" ⚠️ Subfs not found: {}", full_path);
602 vec![]
603 }
604 }
605 EntryNode::Directory(dir) => {
606 // Recursively process subdirectories
607 vec![Entry::new()
608 .name(entry.name.into_static())
609 .node(EntryNode::Directory(Box::new(
610 replace_subfs_with_content(*dir, subfs_map, full_path)
611 )))
612 .build()]
613 }
614 EntryNode::File(_) => {
615 vec![entry.into_static()]
616 }
617 EntryNode::Unknown(_) => {
618 vec![entry.into_static()]
619 }
620 }
621 })
622 .collect();
623
624 Directory::new()
625 .r#type(CowStr::from("directory"))
626 .entries(new_entries)
627 .build()
628}
629
630/// Convert a subfs entry to a fs entry (they have the same structure but different types)
631fn convert_subfs_entry_to_fs(subfs_entry: crate::place_wisp::subfs::Entry<'static>) -> Entry<'static> {
632 use jacquard_common::IntoStatic;
633
634 let node = match subfs_entry.node {
635 crate::place_wisp::subfs::EntryNode::File(file) => {
636 EntryNode::File(Box::new(
637 File::new()
638 .r#type(file.r#type.into_static())
639 .blob(file.blob.into_static())
640 .encoding(file.encoding.map(|e| e.into_static()))
641 .mime_type(file.mime_type.map(|m| m.into_static()))
642 .base64(file.base64)
643 .build()
644 ))
645 }
646 crate::place_wisp::subfs::EntryNode::Directory(dir) => {
647 let converted_entries: Vec<Entry<'static>> = dir
648 .entries
649 .into_iter()
650 .map(|e| convert_subfs_entry_to_fs(e.into_static()))
651 .collect();
652
653 EntryNode::Directory(Box::new(
654 Directory::new()
655 .r#type(dir.r#type.into_static())
656 .entries(converted_entries)
657 .build()
658 ))
659 }
660 crate::place_wisp::subfs::EntryNode::Subfs(_nested_subfs) => {
661 // Nested subfs should have been expanded already - if we get here, it means expansion failed
662 // Treat it like a directory reference that should have been expanded
663 eprintln!(" ⚠️ Warning: unexpanded nested subfs at path, treating as empty directory");
664 EntryNode::Directory(Box::new(
665 Directory::new()
666 .r#type(CowStr::from("directory"))
667 .entries(vec![])
668 .build()
669 ))
670 }
671 crate::place_wisp::subfs::EntryNode::Unknown(unknown) => {
672 EntryNode::Unknown(unknown)
673 }
674 };
675
676 Entry::new()
677 .name(subfs_entry.name.into_static())
678 .node(node)
679 .build()
680}
681