Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol.
wisp.place
1use jacquard_common::types::string::AtUri;
2use jacquard_common::types::blob::BlobRef;
3use jacquard_common::IntoStatic;
4use jacquard::client::{Agent, AgentSession, AgentSessionExt};
5use jacquard::prelude::IdentityResolver;
6use miette::IntoDiagnostic;
7use std::collections::HashMap;
8
9use crate::place_wisp::fs::{Directory as FsDirectory, EntryNode as FsEntryNode};
10use crate::place_wisp::subfs::SubfsRecord;
11
12/// Extract all subfs URIs from a directory tree with their mount paths
13pub fn extract_subfs_uris(directory: &FsDirectory, current_path: String) -> Vec<(String, String)> {
14 let mut uris = Vec::new();
15
16 for entry in &directory.entries {
17 let full_path = if current_path.is_empty() {
18 entry.name.to_string()
19 } else {
20 format!("{}/{}", current_path, entry.name)
21 };
22
23 match &entry.node {
24 FsEntryNode::Subfs(subfs_node) => {
25 // Found a subfs node - store its URI and mount path
26 uris.push((subfs_node.subject.to_string(), full_path.clone()));
27 }
28 FsEntryNode::Directory(subdir) => {
29 // Recursively search subdirectories
30 let sub_uris = extract_subfs_uris(subdir, full_path);
31 uris.extend(sub_uris);
32 }
33 FsEntryNode::File(_) => {
34 // Files don't contain subfs
35 }
36 FsEntryNode::Unknown(_) => {
37 // Skip unknown nodes
38 }
39 }
40 }
41
42 uris
43}
44
45/// Fetch a subfs record from the PDS
46pub async fn fetch_subfs_record(
47 agent: &Agent<impl AgentSession + IdentityResolver>,
48 uri: &str,
49) -> miette::Result<SubfsRecord<'static>> {
50 // Parse URI: at://did/collection/rkey
51 let parts: Vec<&str> = uri.trim_start_matches("at://").split('/').collect();
52
53 if parts.len() < 3 {
54 return Err(miette::miette!("Invalid subfs URI: {}", uri));
55 }
56
57 let _did = parts[0];
58 let collection = parts[1];
59 let _rkey = parts[2];
60
61 if collection != "place.wisp.subfs" {
62 return Err(miette::miette!("Expected place.wisp.subfs collection, got: {}", collection));
63 }
64
65 // Construct AT-URI for fetching
66 let at_uri = AtUri::new(uri).into_diagnostic()?;
67
68 // Fetch the record
69 let response = agent.get_record::<SubfsRecord>(&at_uri).await.into_diagnostic()?;
70 let record_output = response.into_output().into_diagnostic()?;
71
72 Ok(record_output.value.into_static())
73}
74
75/// Recursively fetch all subfs records (including nested ones)
76/// Returns a list of (mount_path, SubfsRecord) tuples
77/// Note: Multiple records can have the same mount_path (for flat-merged chunks)
78pub async fn fetch_all_subfs_records_recursive(
79 agent: &Agent<impl AgentSession + IdentityResolver>,
80 initial_uris: Vec<(String, String)>,
81) -> miette::Result<Vec<(String, SubfsRecord<'static>)>> {
82 use futures::stream::{self, StreamExt};
83
84 let mut all_subfs: Vec<(String, SubfsRecord<'static>)> = Vec::new();
85 let mut fetched_uris: std::collections::HashSet<String> = std::collections::HashSet::new();
86 let mut to_fetch = initial_uris;
87
88 if to_fetch.is_empty() {
89 return Ok(all_subfs);
90 }
91
92 println!("Found {} subfs records, fetching recursively...", to_fetch.len());
93
94 let mut iteration = 0;
95 const MAX_ITERATIONS: usize = 10;
96
97 while !to_fetch.is_empty() && iteration < MAX_ITERATIONS {
98 iteration += 1;
99 println!(" Iteration {}: fetching {} subfs records...", iteration, to_fetch.len());
100
101 let subfs_results: Vec<_> = stream::iter(to_fetch.clone())
102 .map(|(uri, mount_path)| async move {
103 match fetch_subfs_record(agent, &uri).await {
104 Ok(record) => Some((mount_path, record, uri)),
105 Err(e) => {
106 eprintln!(" ⚠️ Failed to fetch subfs {}: {}", uri, e);
107 None
108 }
109 }
110 })
111 .buffer_unordered(5)
112 .collect()
113 .await;
114
115 // Process results and find nested subfs
116 let mut newly_found_uris = Vec::new();
117 for result in subfs_results {
118 if let Some((mount_path, record, uri)) = result {
119 println!(" ✓ Fetched subfs at {}", mount_path);
120
121 // Extract nested subfs URIs from this record
122 let nested_uris = extract_subfs_uris_from_subfs_dir(&record.root, mount_path.clone());
123 newly_found_uris.extend(nested_uris);
124
125 all_subfs.push((mount_path, record));
126 fetched_uris.insert(uri);
127 }
128 }
129
130 // Filter out already-fetched URIs (based on URI, not path)
131 to_fetch = newly_found_uris
132 .into_iter()
133 .filter(|(uri, _)| !fetched_uris.contains(uri))
134 .collect();
135 }
136
137 if iteration >= MAX_ITERATIONS {
138 eprintln!("⚠️ Max iterations reached while fetching nested subfs");
139 }
140
141 println!(" Total subfs records fetched: {}", all_subfs.len());
142
143 Ok(all_subfs)
144}
145
146/// Extract subfs URIs from a subfs::Directory
147fn extract_subfs_uris_from_subfs_dir(
148 directory: &crate::place_wisp::subfs::Directory,
149 current_path: String,
150) -> Vec<(String, String)> {
151 let mut uris = Vec::new();
152
153 for entry in &directory.entries {
154 match &entry.node {
155 crate::place_wisp::subfs::EntryNode::Subfs(subfs_node) => {
156 // Check if this is a chunk entry (chunk0, chunk1, etc.)
157 // Chunks should be flat-merged, so use the parent's path
158 let mount_path = if entry.name.starts_with("chunk") &&
159 entry.name.chars().skip(5).all(|c| c.is_ascii_digit()) {
160 // This is a chunk - use parent's path for flat merge
161 println!(" → Found chunk {} at {}, will flat-merge to {}", entry.name, current_path, current_path);
162 current_path.clone()
163 } else {
164 // Normal subfs - append name to path
165 if current_path.is_empty() {
166 entry.name.to_string()
167 } else {
168 format!("{}/{}", current_path, entry.name)
169 }
170 };
171
172 uris.push((subfs_node.subject.to_string(), mount_path));
173 }
174 crate::place_wisp::subfs::EntryNode::Directory(subdir) => {
175 let full_path = if current_path.is_empty() {
176 entry.name.to_string()
177 } else {
178 format!("{}/{}", current_path, entry.name)
179 };
180 let nested = extract_subfs_uris_from_subfs_dir(subdir, full_path);
181 uris.extend(nested);
182 }
183 _ => {}
184 }
185 }
186
187 uris
188}
189
190/// Merge blob maps from subfs records into the main blob map (RECURSIVE)
191/// Returns the total number of blobs merged from all subfs records
192pub async fn merge_subfs_blob_maps(
193 agent: &Agent<impl AgentSession + IdentityResolver>,
194 subfs_uris: Vec<(String, String)>,
195 main_blob_map: &mut HashMap<String, (BlobRef<'static>, String)>,
196) -> miette::Result<usize> {
197 // Fetch all subfs records recursively
198 let all_subfs = fetch_all_subfs_records_recursive(agent, subfs_uris).await?;
199
200 let mut total_merged = 0;
201
202 // Extract blobs from all fetched subfs records
203 // Skip parent records that only contain chunk references (no actual files)
204 for (mount_path, subfs_record) in all_subfs {
205 // Check if this record only contains chunk subfs references (no files)
206 let only_has_chunks = subfs_record.root.entries.iter().all(|e| {
207 matches!(&e.node, crate::place_wisp::subfs::EntryNode::Subfs(_)) &&
208 e.name.starts_with("chunk") &&
209 e.name.chars().skip(5).all(|c| c.is_ascii_digit())
210 });
211
212 if only_has_chunks && !subfs_record.root.entries.is_empty() {
213 // This is a parent containing only chunks - skip it, blobs are in the chunks
214 println!(" → Skipping parent subfs at {} ({} chunks, no files)", mount_path, subfs_record.root.entries.len());
215 continue;
216 }
217
218 let subfs_blob_map = extract_subfs_blobs(&subfs_record.root, mount_path.clone());
219 let count = subfs_blob_map.len();
220
221 for (path, blob_info) in subfs_blob_map {
222 main_blob_map.insert(path, blob_info);
223 }
224
225 total_merged += count;
226 println!(" ✓ Merged {} blobs from subfs at {}", count, mount_path);
227 }
228
229 Ok(total_merged)
230}
231
232/// Extract blobs from a subfs directory (works with subfs::Directory)
233/// Returns a map of file paths to their blob refs and CIDs
234fn extract_subfs_blobs(
235 directory: &crate::place_wisp::subfs::Directory,
236 current_path: String,
237) -> HashMap<String, (BlobRef<'static>, String)> {
238 let mut blob_map = HashMap::new();
239
240 for entry in &directory.entries {
241 let full_path = if current_path.is_empty() {
242 entry.name.to_string()
243 } else {
244 format!("{}/{}", current_path, entry.name)
245 };
246
247 match &entry.node {
248 crate::place_wisp::subfs::EntryNode::File(file_node) => {
249 let blob_ref = &file_node.blob;
250 let cid_string = blob_ref.blob().r#ref.to_string();
251 blob_map.insert(
252 full_path,
253 (blob_ref.clone().into_static(), cid_string)
254 );
255 }
256 crate::place_wisp::subfs::EntryNode::Directory(subdir) => {
257 let sub_map = extract_subfs_blobs(subdir, full_path);
258 blob_map.extend(sub_map);
259 }
260 crate::place_wisp::subfs::EntryNode::Subfs(_nested_subfs) => {
261 // Nested subfs - these should be resolved recursively in the main flow
262 // For now, we skip them (they'll be fetched separately)
263 eprintln!(" ⚠️ Found nested subfs at {}, skipping (should be fetched separately)", full_path);
264 }
265 crate::place_wisp::subfs::EntryNode::Unknown(_) => {
266 // Skip unknown nodes
267 }
268 }
269 }
270
271 blob_map
272}
273
274/// Count total files in a directory tree
275pub fn count_files_in_directory(directory: &FsDirectory) -> usize {
276 let mut count = 0;
277
278 for entry in &directory.entries {
279 match &entry.node {
280 FsEntryNode::File(_) => count += 1,
281 FsEntryNode::Directory(subdir) => {
282 count += count_files_in_directory(subdir);
283 }
284 FsEntryNode::Subfs(_) => {
285 // Subfs nodes don't count towards the main manifest file count
286 }
287 FsEntryNode::Unknown(_) => {}
288 }
289 }
290
291 count
292}
293
294/// Estimate JSON size of a directory tree
295pub fn estimate_directory_size(directory: &FsDirectory) -> usize {
296 // Serialize to JSON and measure
297 match serde_json::to_string(directory) {
298 Ok(json) => json.len(),
299 Err(_) => 0,
300 }
301}
302
303/// Information about a directory that could be split into a subfs record
304#[derive(Debug)]
305pub struct SplittableDirectory {
306 pub path: String,
307 pub directory: FsDirectory<'static>,
308 pub size: usize,
309 pub file_count: usize,
310}
311
312/// Find large directories that could be split into subfs records
313/// Returns directories sorted by size (largest first)
314pub fn find_large_directories(directory: &FsDirectory, current_path: String) -> Vec<SplittableDirectory> {
315 let mut result = Vec::new();
316
317 for entry in &directory.entries {
318 if let FsEntryNode::Directory(subdir) = &entry.node {
319 let dir_path = if current_path.is_empty() {
320 entry.name.to_string()
321 } else {
322 format!("{}/{}", current_path, entry.name)
323 };
324
325 let size = estimate_directory_size(subdir);
326 let file_count = count_files_in_directory(subdir);
327
328 result.push(SplittableDirectory {
329 path: dir_path.clone(),
330 directory: (*subdir.clone()).into_static(),
331 size,
332 file_count,
333 });
334
335 // Recursively find subdirectories
336 let subdirs = find_large_directories(subdir, dir_path);
337 result.extend(subdirs);
338 }
339 }
340
341 // Sort by size (largest first)
342 result.sort_by(|a, b| b.size.cmp(&a.size));
343
344 result
345}
346
347/// Replace a directory with a subfs node in the tree
348pub fn replace_directory_with_subfs(
349 directory: FsDirectory<'static>,
350 target_path: &str,
351 subfs_uri: &str,
352 flat: bool,
353) -> miette::Result<FsDirectory<'static>> {
354 use jacquard_common::CowStr;
355 use crate::place_wisp::fs::{Entry, Subfs};
356
357 let path_parts: Vec<&str> = target_path.split('/').collect();
358
359 if path_parts.is_empty() {
360 return Err(miette::miette!("Cannot replace root directory"));
361 }
362
363 // Parse the subfs URI and make it owned/'static
364 let at_uri = AtUri::new_cow(jacquard_common::CowStr::from(subfs_uri.to_string())).into_diagnostic()?;
365
366 // If this is a root-level directory
367 if path_parts.len() == 1 {
368 let target_name = path_parts[0];
369 let new_entries: Vec<Entry> = directory.entries.into_iter().map(|entry| {
370 if entry.name == target_name {
371 // Replace this directory with a subfs node
372 Entry::new()
373 .name(entry.name)
374 .node(FsEntryNode::Subfs(Box::new(
375 Subfs::new()
376 .r#type(CowStr::from("subfs"))
377 .subject(at_uri.clone())
378 .flat(Some(flat))
379 .build()
380 )))
381 .build()
382 } else {
383 entry
384 }
385 }).collect();
386
387 return Ok(FsDirectory::new()
388 .r#type(CowStr::from("directory"))
389 .entries(new_entries)
390 .build());
391 }
392
393 // Recursively navigate to parent directory
394 let first_part = path_parts[0];
395 let remaining_path = path_parts[1..].join("/");
396
397 let new_entries: Vec<Entry> = directory.entries.into_iter().filter_map(|entry| {
398 if entry.name == first_part {
399 if let FsEntryNode::Directory(subdir) = entry.node {
400 // Recursively process this subdirectory
401 match replace_directory_with_subfs((*subdir).into_static(), &remaining_path, subfs_uri, flat) {
402 Ok(updated_subdir) => {
403 Some(Entry::new()
404 .name(entry.name)
405 .node(FsEntryNode::Directory(Box::new(updated_subdir)))
406 .build())
407 }
408 Err(_) => None, // Skip entries that fail to update
409 }
410 } else {
411 Some(entry)
412 }
413 } else {
414 Some(entry)
415 }
416 }).collect();
417
418 Ok(FsDirectory::new()
419 .r#type(CowStr::from("directory"))
420 .entries(new_entries)
421 .build())
422}
423
424/// Delete a subfs record from the PDS
425pub async fn delete_subfs_record(
426 agent: &Agent<impl AgentSession + IdentityResolver>,
427 uri: &str,
428) -> miette::Result<()> {
429 use jacquard_common::types::uri::RecordUri;
430
431 // Construct AT-URI and convert to RecordUri
432 let at_uri = AtUri::new(uri).into_diagnostic()?;
433 let record_uri: RecordUri<'_, crate::place_wisp::subfs::SubfsRecordRecord> = RecordUri::try_from_uri(at_uri).into_diagnostic()?;
434
435 let rkey = record_uri.rkey()
436 .ok_or_else(|| miette::miette!("Invalid subfs URI: missing rkey"))?
437 .clone();
438
439 agent.delete_record::<SubfsRecord>(rkey).await.into_diagnostic()?;
440
441 Ok(())
442}
443
444/// Split a large directory into multiple smaller chunks
445/// Returns a list of chunk directories, each small enough to fit in a subfs record
446pub fn split_directory_into_chunks(
447 directory: &FsDirectory,
448 max_size: usize,
449) -> Vec<FsDirectory<'static>> {
450 use jacquard_common::CowStr;
451
452 let mut chunks = Vec::new();
453 let mut current_chunk_entries = Vec::new();
454 let mut current_chunk_size = 100; // Base size for directory structure
455
456 for entry in &directory.entries {
457 // Estimate the size of this entry
458 let entry_size = estimate_entry_size(entry);
459
460 // If adding this entry would exceed the max size, start a new chunk
461 if !current_chunk_entries.is_empty() && (current_chunk_size + entry_size > max_size) {
462 // Create a chunk from current entries
463 let chunk = FsDirectory::new()
464 .r#type(CowStr::from("directory"))
465 .entries(current_chunk_entries.clone())
466 .build();
467
468 chunks.push(chunk);
469
470 // Start new chunk
471 current_chunk_entries.clear();
472 current_chunk_size = 100;
473 }
474
475 current_chunk_entries.push(entry.clone().into_static());
476 current_chunk_size += entry_size;
477 }
478
479 // Add the last chunk if it has any entries
480 if !current_chunk_entries.is_empty() {
481 let chunk = FsDirectory::new()
482 .r#type(CowStr::from("directory"))
483 .entries(current_chunk_entries)
484 .build();
485 chunks.push(chunk);
486 }
487
488 chunks
489}
490
491/// Estimate the JSON size of a single entry
492fn estimate_entry_size(entry: &crate::place_wisp::fs::Entry) -> usize {
493 match serde_json::to_string(entry) {
494 Ok(json) => json.len(),
495 Err(_) => 500, // Conservative estimate if serialization fails
496 }
497}