Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place

fix subfs nesting

nekomimi.pet e3f99bc1 c544254d

verified
Changed files
+340 -85
cli
+115 -19
cli/src/main.rs
···
use jacquard::oauth::client::OAuthClient;
use jacquard::oauth::loopback::LoopbackConfig;
use jacquard::prelude::IdentityResolver;
-
use jacquard_common::types::string::{Datetime, Rkey, RecordKey};
+
use jacquard_common::types::string::{Datetime, Rkey, RecordKey, AtUri};
use jacquard_common::types::blob::MimeType;
use miette::IntoDiagnostic;
use std::path::{Path, PathBuf};
···
println!(" Split #{}: {} ({} files, {:.1}KB)",
attempts, largest_dir.path, largest_dir.file_count, largest_dir.size as f64 / 1024.0);
-
// Create a subfs record for this directory
-
use jacquard_common::types::string::Tid;
-
let subfs_tid = Tid::now_0();
-
let subfs_rkey = subfs_tid.to_string();
+
// Check if this directory is itself too large for a single subfs record
+
const MAX_SUBFS_SIZE: usize = 75 * 1024; // 75KB soft limit for safety
+
let mut subfs_uri = String::new();
+
+
if largest_dir.size > MAX_SUBFS_SIZE {
+
// Need to split this directory into multiple chunks
+
println!(" → Directory too large, splitting into chunks...");
+
let chunks = subfs_utils::split_directory_into_chunks(&largest_dir.directory, MAX_SUBFS_SIZE);
+
println!(" → Created {} chunks", chunks.len());
+
+
// Upload each chunk as a subfs record
+
let mut chunk_uris = Vec::new();
+
for (i, chunk) in chunks.iter().enumerate() {
+
use jacquard_common::types::string::Tid;
+
let chunk_tid = Tid::now_0();
+
let chunk_rkey = chunk_tid.to_string();
+
+
let chunk_file_count = subfs_utils::count_files_in_directory(chunk);
+
let chunk_size = subfs_utils::estimate_directory_size(chunk);
-
let subfs_manifest = crate::place_wisp::subfs::SubfsRecord::new()
-
.root(convert_fs_dir_to_subfs_dir(largest_dir.directory.clone()))
-
.file_count(Some(largest_dir.file_count as i64))
-
.created_at(Datetime::now())
-
.build();
+
let chunk_manifest = crate::place_wisp::subfs::SubfsRecord::new()
+
.root(convert_fs_dir_to_subfs_dir(chunk.clone()))
+
.file_count(Some(chunk_file_count as i64))
+
.created_at(Datetime::now())
+
.build();
+
+
println!(" → Uploading chunk {}/{} ({} files, {:.1}KB)...",
+
i + 1, chunks.len(), chunk_file_count, chunk_size as f64 / 1024.0);
+
+
let chunk_output = agent.put_record(
+
RecordKey::from(Rkey::new(&chunk_rkey).into_diagnostic()?),
+
chunk_manifest
+
).await.into_diagnostic()?;
-
// Upload subfs record
-
let subfs_output = agent.put_record(
-
RecordKey::from(Rkey::new(&subfs_rkey).into_diagnostic()?),
-
subfs_manifest
-
).await.into_diagnostic()?;
+
let chunk_uri = chunk_output.uri.to_string();
+
chunk_uris.push((chunk_uri.clone(), format!("{}#{}", largest_dir.path, i)));
+
new_subfs_uris.push((chunk_uri.clone(), format!("{}#{}", largest_dir.path, i)));
+
}
-
let subfs_uri = subfs_output.uri.to_string();
-
println!(" ✅ Created subfs: {}", subfs_uri);
+
// Create a parent subfs record that references all chunks
+
// Each chunk reference MUST have flat: true to merge chunk contents
+
println!(" → Creating parent subfs with {} chunk references...", chunk_uris.len());
+
use jacquard_common::CowStr;
+
use crate::place_wisp::fs::{Subfs};
-
// Replace directory with subfs node (flat: false to preserve structure)
+
// Convert to fs::Subfs (which has the 'flat' field) instead of subfs::Subfs
+
let parent_entries_fs: Vec<Entry> = chunk_uris.iter().enumerate().map(|(i, (uri, _))| {
+
let uri_string = uri.clone();
+
let at_uri = AtUri::new_cow(CowStr::from(uri_string)).expect("valid URI");
+
Entry::new()
+
.name(CowStr::from(format!("chunk{}", i)))
+
.node(EntryNode::Subfs(Box::new(
+
Subfs::new()
+
.r#type(CowStr::from("subfs"))
+
.subject(at_uri)
+
.flat(Some(true)) // EXPLICITLY TRUE - merge chunk contents
+
.build()
+
)))
+
.build()
+
}).collect();
+
+
let parent_root_fs = Directory::new()
+
.r#type(CowStr::from("directory"))
+
.entries(parent_entries_fs)
+
.build();
+
+
// Convert to subfs::Directory for the parent subfs record
+
let parent_root_subfs = convert_fs_dir_to_subfs_dir(parent_root_fs);
+
+
use jacquard_common::types::string::Tid;
+
let parent_tid = Tid::now_0();
+
let parent_rkey = parent_tid.to_string();
+
+
let parent_manifest = crate::place_wisp::subfs::SubfsRecord::new()
+
.root(parent_root_subfs)
+
.file_count(Some(largest_dir.file_count as i64))
+
.created_at(Datetime::now())
+
.build();
+
+
let parent_output = agent.put_record(
+
RecordKey::from(Rkey::new(&parent_rkey).into_diagnostic()?),
+
parent_manifest
+
).await.into_diagnostic()?;
+
+
subfs_uri = parent_output.uri.to_string();
+
println!(" ✅ Created parent subfs with chunks (flat=true on each chunk): {}", subfs_uri);
+
} else {
+
// Directory fits in a single subfs record
+
use jacquard_common::types::string::Tid;
+
let subfs_tid = Tid::now_0();
+
let subfs_rkey = subfs_tid.to_string();
+
+
let subfs_manifest = crate::place_wisp::subfs::SubfsRecord::new()
+
.root(convert_fs_dir_to_subfs_dir(largest_dir.directory.clone()))
+
.file_count(Some(largest_dir.file_count as i64))
+
.created_at(Datetime::now())
+
.build();
+
+
// Upload subfs record
+
let subfs_output = agent.put_record(
+
RecordKey::from(Rkey::new(&subfs_rkey).into_diagnostic()?),
+
subfs_manifest
+
).await.into_diagnostic()?;
+
+
subfs_uri = subfs_output.uri.to_string();
+
println!(" ✅ Created subfs: {}", subfs_uri);
+
}
+
+
// Replace directory with subfs node (flat: false to preserve directory structure)
working_directory = subfs_utils::replace_directory_with_subfs(
working_directory,
&largest_dir.path,
&subfs_uri,
-
false // Preserve directory structure
+
false // Preserve directory - the chunks inside have flat=true
)?;
new_subfs_uris.push((subfs_uri, largest_dir.path.clone()));
···
}
return Ok((file_builder.build(), true));
+
} else {
+
// CID mismatch - file changed
+
println!(" → File changed: {} (old CID: {}, new CID: {})", file_path_key, existing_cid, file_cid);
+
}
+
} else {
+
// File not in existing blob map
+
if file_path_key.starts_with("imgs/") {
+
println!(" → New file (not in blob map): {}", file_path_key);
}
}
+30 -32
cli/src/pull.rs
···
let pds_url = resolver.pds_for_did(&did).await.into_diagnostic()?;
println!("Resolved PDS: {}", pds_url);
-
// Fetch the place.wisp.fs record
-
+
// Create a temporary agent for fetching records (no auth needed for public reads)
println!("Fetching record from PDS...");
let client = reqwest::Client::new();
-
+
// Use com.atproto.repo.getRecord
use jacquard::api::com_atproto::repo::get_record::GetRecord;
use jacquard_common::types::string::Rkey as RkeyType;
let rkey_parsed = RkeyType::new(&rkey).into_diagnostic()?;
-
+
use jacquard_common::types::ident::AtIdentifier;
use jacquard_common::types::string::RecordKey;
let request = GetRecord::new()
···
println!("Found site '{}' with {} files (in main record)", fs_record.site, file_count);
// Check for and expand subfs nodes
-
let expanded_root = expand_subfs_in_pull(&fs_record.root, &pds_url, did.as_str()).await?;
+
// Note: We use a custom expand function for pull since we don't have an Agent
+
let expanded_root = expand_subfs_in_pull_with_client(&fs_record.root, &client, &pds_url).await?;
let total_file_count = subfs_utils::count_files_in_directory(&expanded_root);
if total_file_count as i64 != fs_record.file_count.unwrap_or(0) {
···
}
/// Expand subfs nodes in a directory tree by fetching and merging subfs records (RECURSIVELY)
-
async fn expand_subfs_in_pull<'a>(
+
/// Uses reqwest client directly for pull command (no agent needed)
+
async fn expand_subfs_in_pull_with_client<'a>(
directory: &Directory<'a>,
+
client: &reqwest::Client,
pds_url: &Url,
-
_did: &str,
) -> miette::Result<Directory<'static>> {
+
use jacquard_common::IntoStatic;
+
use jacquard_common::types::value::from_data;
use crate::place_wisp::subfs::SubfsRecord;
-
use jacquard_common::types::value::from_data;
-
use jacquard_common::IntoStatic;
-
// Recursively fetch ALL subfs records (including nested ones)
let mut all_subfs_map: HashMap<String, crate::place_wisp::subfs::Directory> = HashMap::new();
let mut to_fetch = subfs_utils::extract_subfs_uris(directory, String::new());
···
}
println!("Found {} subfs records, fetching recursively...", to_fetch.len());
-
let client = reqwest::Client::new();
-
// Keep fetching until we've resolved all subfs (including nested ones)
let mut iteration = 0;
-
const MAX_ITERATIONS: usize = 10; // Prevent infinite loops
+
const MAX_ITERATIONS: usize = 10;
while !to_fetch.is_empty() && iteration < MAX_ITERATIONS {
iteration += 1;
···
let pds_url = pds_url.clone();
fetch_tasks.push(async move {
+
// Parse URI
let parts: Vec<&str> = uri.trim_start_matches("at://").split('/').collect();
if parts.len() < 3 {
return Err(miette::miette!("Invalid subfs URI: {}", uri));
}
-
let _did = parts[0];
+
let did_str = parts[0];
let collection = parts[1];
-
let rkey = parts[2];
+
let rkey_str = parts[2];
if collection != "place.wisp.subfs" {
return Err(miette::miette!("Expected place.wisp.subfs collection, got: {}", collection));
}
+
// Fetch using GetRecord
use jacquard::api::com_atproto::repo::get_record::GetRecord;
-
use jacquard_common::types::string::Rkey as RkeyType;
+
use jacquard_common::types::string::{Rkey as RkeyType, Did as DidType, RecordKey};
use jacquard_common::types::ident::AtIdentifier;
-
use jacquard_common::types::string::{RecordKey, Did as DidType};
-
let rkey_parsed = RkeyType::new(rkey).into_diagnostic()?;
-
let did_parsed = DidType::new(_did).into_diagnostic()?;
+
let rkey_parsed = RkeyType::new(rkey_str).into_diagnostic()?;
+
let did_parsed = DidType::new(did_str).into_diagnostic()?;
let request = GetRecord::new()
.repo(AtIdentifier::Did(did_parsed))
···
let record_output = response.into_output().into_diagnostic()?;
let subfs_record: SubfsRecord = from_data(&record_output.value).into_diagnostic()?;
-
let subfs_record_static = subfs_record.into_static();
-
Ok::<_, miette::Report>((path, subfs_record_static))
+
Ok::<_, miette::Report>((path, subfs_record.into_static()))
});
}
let results: Vec<_> = futures::future::join_all(fetch_tasks).await;
// Process results and find nested subfs
-
let mut newly_fetched = Vec::new();
+
let mut newly_found_uris = Vec::new();
for result in results {
match result {
Ok((path, record)) => {
println!(" ✓ Fetched subfs at {}", path);
-
// Check for nested subfs in this record
-
let nested_subfs = extract_subfs_from_subfs_dir(&record.root, path.clone());
-
newly_fetched.extend(nested_subfs);
+
// Extract nested subfs URIs
+
let nested_uris = extract_subfs_uris_from_subfs_dir(&record.root, path.clone());
+
newly_found_uris.extend(nested_uris);
all_subfs_map.insert(path, record.root);
}
···
}
}
-
// Update to_fetch with only the NEW subfs we haven't fetched yet
-
to_fetch = newly_fetched
+
// Filter out already-fetched paths
+
to_fetch = newly_found_uris
.into_iter()
-
.filter(|(uri, _)| !all_subfs_map.iter().any(|(k, _)| k == uri))
+
.filter(|(_, path)| !all_subfs_map.contains_key(path))
.collect();
}
if iteration >= MAX_ITERATIONS {
-
return Err(miette::miette!("Max iterations reached while fetching nested subfs"));
+
eprintln!("⚠️ Max iterations reached while fetching nested subfs");
}
println!(" Total subfs records fetched: {}", all_subfs_map.len());
···
Ok(replace_subfs_with_content(directory.clone(), &all_subfs_map, String::new()))
}
-
/// Extract subfs URIs from a subfs::Directory
-
fn extract_subfs_from_subfs_dir(
+
/// Extract subfs URIs from a subfs::Directory (helper for pull)
+
fn extract_subfs_uris_from_subfs_dir(
directory: &crate::place_wisp::subfs::Directory,
current_path: String,
) -> Vec<(String, String)> {
···
uris.push((subfs_node.subject.to_string(), full_path.clone()));
}
crate::place_wisp::subfs::EntryNode::Directory(subdir) => {
-
let nested = extract_subfs_from_subfs_dir(subdir, full_path);
+
let nested = extract_subfs_uris_from_subfs_dir(subdir, full_path);
uris.extend(nested);
}
_ => {}
+195 -34
cli/src/subfs_utils.rs
···
Ok(record_output.value.into_static())
}
-
/// Merge blob maps from subfs records into the main blob map
-
/// Returns the total number of blobs merged from all subfs records
-
pub async fn merge_subfs_blob_maps(
+
/// Recursively fetch all subfs records (including nested ones)
+
/// Returns a list of (mount_path, SubfsRecord) tuples
+
/// Note: Multiple records can have the same mount_path (for flat-merged chunks)
+
pub async fn fetch_all_subfs_records_recursive(
agent: &Agent<impl AgentSession + IdentityResolver>,
-
subfs_uris: Vec<(String, String)>,
-
main_blob_map: &mut HashMap<String, (BlobRef<'static>, String)>,
-
) -> miette::Result<usize> {
-
let mut total_merged = 0;
+
initial_uris: Vec<(String, String)>,
+
) -> miette::Result<Vec<(String, SubfsRecord<'static>)>> {
+
use futures::stream::{self, StreamExt};
-
println!("Fetching {} subfs records for blob reuse...", subfs_uris.len());
+
let mut all_subfs: Vec<(String, SubfsRecord<'static>)> = Vec::new();
+
let mut fetched_uris: std::collections::HashSet<String> = std::collections::HashSet::new();
+
let mut to_fetch = initial_uris;
-
// Fetch all subfs records in parallel (but with some concurrency limit)
-
use futures::stream::{self, StreamExt};
+
if to_fetch.is_empty() {
+
return Ok(all_subfs);
+
}
+
+
println!("Found {} subfs records, fetching recursively...", to_fetch.len());
+
+
let mut iteration = 0;
+
const MAX_ITERATIONS: usize = 10;
-
let subfs_results: Vec<_> = stream::iter(subfs_uris)
-
.map(|(uri, mount_path)| async move {
-
match fetch_subfs_record(agent, &uri).await {
-
Ok(record) => Some((record, mount_path)),
-
Err(e) => {
-
eprintln!(" ⚠️ Failed to fetch subfs {}: {}", uri, e);
-
None
+
while !to_fetch.is_empty() && iteration < MAX_ITERATIONS {
+
iteration += 1;
+
println!(" Iteration {}: fetching {} subfs records...", iteration, to_fetch.len());
+
+
let subfs_results: Vec<_> = stream::iter(to_fetch.clone())
+
.map(|(uri, mount_path)| async move {
+
match fetch_subfs_record(agent, &uri).await {
+
Ok(record) => Some((mount_path, record, uri)),
+
Err(e) => {
+
eprintln!(" ⚠️ Failed to fetch subfs {}: {}", uri, e);
+
None
+
}
}
+
})
+
.buffer_unordered(5)
+
.collect()
+
.await;
+
+
// Process results and find nested subfs
+
let mut newly_found_uris = Vec::new();
+
for result in subfs_results {
+
if let Some((mount_path, record, uri)) = result {
+
println!(" ✓ Fetched subfs at {}", mount_path);
+
+
// Extract nested subfs URIs from this record
+
let nested_uris = extract_subfs_uris_from_subfs_dir(&record.root, mount_path.clone());
+
newly_found_uris.extend(nested_uris);
+
+
all_subfs.push((mount_path, record));
+
fetched_uris.insert(uri);
}
-
})
-
.buffer_unordered(5)
-
.collect()
-
.await;
+
}
-
// Convert subfs Directory to fs Directory for blob extraction
-
// Note: We need to extract blobs from the subfs record's root
-
for result in subfs_results {
-
if let Some((subfs_record, mount_path)) = result {
-
// Extract blobs from this subfs record's root
-
// The blob_map module works with fs::Directory, but subfs::Directory has the same structure
-
// We need to convert or work directly with the entries
+
// Filter out already-fetched URIs (based on URI, not path)
+
to_fetch = newly_found_uris
+
.into_iter()
+
.filter(|(uri, _)| !fetched_uris.contains(uri))
+
.collect();
+
}
-
let subfs_blob_map = extract_subfs_blobs(&subfs_record.root, mount_path.clone());
-
let count = subfs_blob_map.len();
+
if iteration >= MAX_ITERATIONS {
+
eprintln!("⚠️ Max iterations reached while fetching nested subfs");
+
}
-
for (path, blob_info) in subfs_blob_map {
-
main_blob_map.insert(path, blob_info);
+
println!(" Total subfs records fetched: {}", all_subfs.len());
+
+
Ok(all_subfs)
+
}
+
+
/// Extract subfs URIs from a subfs::Directory
+
fn extract_subfs_uris_from_subfs_dir(
+
directory: &crate::place_wisp::subfs::Directory,
+
current_path: String,
+
) -> Vec<(String, String)> {
+
let mut uris = Vec::new();
+
+
for entry in &directory.entries {
+
match &entry.node {
+
crate::place_wisp::subfs::EntryNode::Subfs(subfs_node) => {
+
// Check if this is a chunk entry (chunk0, chunk1, etc.)
+
// Chunks should be flat-merged, so use the parent's path
+
let mount_path = if entry.name.starts_with("chunk") &&
+
entry.name.chars().skip(5).all(|c| c.is_ascii_digit()) {
+
// This is a chunk - use parent's path for flat merge
+
println!(" → Found chunk {} at {}, will flat-merge to {}", entry.name, current_path, current_path);
+
current_path.clone()
+
} else {
+
// Normal subfs - append name to path
+
if current_path.is_empty() {
+
entry.name.to_string()
+
} else {
+
format!("{}/{}", current_path, entry.name)
+
}
+
};
+
+
uris.push((subfs_node.subject.to_string(), mount_path));
}
+
crate::place_wisp::subfs::EntryNode::Directory(subdir) => {
+
let full_path = if current_path.is_empty() {
+
entry.name.to_string()
+
} else {
+
format!("{}/{}", current_path, entry.name)
+
};
+
let nested = extract_subfs_uris_from_subfs_dir(subdir, full_path);
+
uris.extend(nested);
+
}
+
_ => {}
+
}
+
}
-
total_merged += count;
-
println!(" ✓ Merged {} blobs from subfs at {}", count, mount_path);
+
uris
+
}
+
+
/// Merge blob maps from subfs records into the main blob map (RECURSIVE)
+
/// Returns the total number of blobs merged from all subfs records
+
pub async fn merge_subfs_blob_maps(
+
agent: &Agent<impl AgentSession + IdentityResolver>,
+
subfs_uris: Vec<(String, String)>,
+
main_blob_map: &mut HashMap<String, (BlobRef<'static>, String)>,
+
) -> miette::Result<usize> {
+
// Fetch all subfs records recursively
+
let all_subfs = fetch_all_subfs_records_recursive(agent, subfs_uris).await?;
+
+
let mut total_merged = 0;
+
+
// Extract blobs from all fetched subfs records
+
// Skip parent records that only contain chunk references (no actual files)
+
for (mount_path, subfs_record) in all_subfs {
+
// Check if this record only contains chunk subfs references (no files)
+
let only_has_chunks = subfs_record.root.entries.iter().all(|e| {
+
matches!(&e.node, crate::place_wisp::subfs::EntryNode::Subfs(_)) &&
+
e.name.starts_with("chunk") &&
+
e.name.chars().skip(5).all(|c| c.is_ascii_digit())
+
});
+
+
if only_has_chunks && !subfs_record.root.entries.is_empty() {
+
// This is a parent containing only chunks - skip it, blobs are in the chunks
+
println!(" → Skipping parent subfs at {} ({} chunks, no files)", mount_path, subfs_record.root.entries.len());
+
continue;
+
}
+
+
let subfs_blob_map = extract_subfs_blobs(&subfs_record.root, mount_path.clone());
+
let count = subfs_blob_map.len();
+
+
for (path, blob_info) in subfs_blob_map {
+
main_blob_map.insert(path, blob_info);
}
+
+
total_merged += count;
+
println!(" ✓ Merged {} blobs from subfs at {}", count, mount_path);
}
Ok(total_merged)
···
Ok(())
}
+
+
/// Split a large directory into multiple smaller chunks
+
/// Returns a list of chunk directories, each small enough to fit in a subfs record
+
pub fn split_directory_into_chunks(
+
directory: &FsDirectory,
+
max_size: usize,
+
) -> Vec<FsDirectory<'static>> {
+
use jacquard_common::CowStr;
+
+
let mut chunks = Vec::new();
+
let mut current_chunk_entries = Vec::new();
+
let mut current_chunk_size = 100; // Base size for directory structure
+
+
for entry in &directory.entries {
+
// Estimate the size of this entry
+
let entry_size = estimate_entry_size(entry);
+
+
// If adding this entry would exceed the max size, start a new chunk
+
if !current_chunk_entries.is_empty() && (current_chunk_size + entry_size > max_size) {
+
// Create a chunk from current entries
+
let chunk = FsDirectory::new()
+
.r#type(CowStr::from("directory"))
+
.entries(current_chunk_entries.clone())
+
.build();
+
+
chunks.push(chunk);
+
+
// Start new chunk
+
current_chunk_entries.clear();
+
current_chunk_size = 100;
+
}
+
+
current_chunk_entries.push(entry.clone().into_static());
+
current_chunk_size += entry_size;
+
}
+
+
// Add the last chunk if it has any entries
+
if !current_chunk_entries.is_empty() {
+
let chunk = FsDirectory::new()
+
.r#type(CowStr::from("directory"))
+
.entries(current_chunk_entries)
+
.build();
+
chunks.push(chunk);
+
}
+
+
chunks
+
}
+
+
/// Estimate the JSON size of a single entry
+
fn estimate_entry_size(entry: &crate::place_wisp::fs::Entry) -> usize {
+
match serde_json::to_string(entry) {
+
Ok(json) => json.len(),
+
Err(_) => 500, // Conservative estimate if serialization fails
+
}
+
}