Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place
1use crate::pull::pull_site; 2use crate::redirects::{load_redirect_rules, match_redirect_rule, RedirectRule}; 3use crate::place_wisp::settings::Settings; 4use axum::{ 5 Router, 6 extract::Request, 7 response::{Response, IntoResponse, Redirect}, 8 http::{StatusCode, Uri, header}, 9 body::Body, 10}; 11use jacquard::CowStr; 12use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage}; 13use jacquard::api::com_atproto::repo::get_record::GetRecord; 14use jacquard_common::types::string::Did; 15use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient, XrpcExt}; 16use jacquard_common::IntoStatic; 17use jacquard_common::types::value::from_data; 18use miette::IntoDiagnostic; 19use n0_future::StreamExt; 20use std::collections::HashMap; 21use std::path::{PathBuf, Path}; 22use std::sync::Arc; 23use tokio::sync::RwLock; 24use tower::Service; 25use tower_http::compression::CompressionLayer; 26use tower_http::services::ServeDir; 27 28/// Shared state for the server 29#[derive(Clone)] 30struct ServerState { 31 did: CowStr<'static>, 32 rkey: CowStr<'static>, 33 output_dir: PathBuf, 34 last_cid: Arc<RwLock<Option<String>>>, 35 redirect_rules: Arc<RwLock<Vec<RedirectRule>>>, 36 settings: Arc<RwLock<Option<Settings<'static>>>>, 37} 38 39/// Fetch settings for a site from the PDS 40async fn fetch_settings( 41 pds_url: &url::Url, 42 did: &Did<'_>, 43 rkey: &str, 44) -> miette::Result<Option<Settings<'static>>> { 45 use jacquard_common::types::ident::AtIdentifier; 46 use jacquard_common::types::string::{Rkey as RkeyType, RecordKey}; 47 48 let client = reqwest::Client::new(); 49 let rkey_parsed = RkeyType::new(rkey).into_diagnostic()?; 50 51 let request = GetRecord::new() 52 .repo(AtIdentifier::Did(did.clone())) 53 .collection(CowStr::from("place.wisp.settings")) 54 .rkey(RecordKey::from(rkey_parsed)) 55 .build(); 56 57 match client.xrpc(pds_url.clone()).send(&request).await { 58 Ok(response) => { 59 let output = response.into_output().into_diagnostic()?; 60 61 // Parse the record value as Settings 62 match from_data::<Settings>(&output.value) { 63 Ok(settings) => { 64 Ok(Some(settings.into_static())) 65 } 66 Err(_) => { 67 // Settings record exists but couldn't parse - use defaults 68 Ok(None) 69 } 70 } 71 } 72 Err(_) => { 73 // Settings record doesn't exist 74 Ok(None) 75 } 76 } 77} 78 79/// Serve a site locally with real-time firehose updates 80pub async fn serve_site( 81 input: CowStr<'static>, 82 rkey: CowStr<'static>, 83 output_dir: PathBuf, 84 port: u16, 85) -> miette::Result<()> { 86 println!("Serving site {} from {} on port {}...", rkey, input, port); 87 88 // Resolve handle to DID if needed 89 use jacquard_identity::PublicResolver; 90 use jacquard::prelude::IdentityResolver; 91 92 let resolver = PublicResolver::default(); 93 let did = if input.starts_with("did:") { 94 Did::new(&input).into_diagnostic()? 95 } else { 96 // It's a handle, resolve it 97 let handle = jacquard_common::types::string::Handle::new(&input).into_diagnostic()?; 98 resolver.resolve_handle(&handle).await.into_diagnostic()? 99 }; 100 101 println!("Resolved to DID: {}", did.as_str()); 102 103 // Resolve PDS URL (needed for settings fetch) 104 let pds_url = resolver.pds_for_did(&did).await.into_diagnostic()?; 105 106 // Create output directory if it doesn't exist 107 std::fs::create_dir_all(&output_dir).into_diagnostic()?; 108 109 // Initial pull of the site 110 println!("Performing initial pull..."); 111 let did_str = CowStr::from(did.as_str().to_string()); 112 pull_site(did_str.clone(), rkey.clone(), output_dir.clone()).await?; 113 114 // Fetch settings 115 let settings = fetch_settings(&pds_url, &did, rkey.as_ref()).await?; 116 if let Some(ref s) = settings { 117 println!("\nSettings loaded:"); 118 if let Some(true) = s.directory_listing { 119 println!(" • Directory listing: enabled"); 120 } 121 if let Some(ref spa_file) = s.spa_mode { 122 println!(" • SPA mode: enabled ({})", spa_file); 123 } 124 if let Some(ref custom404) = s.custom404 { 125 println!(" • Custom 404: {}", custom404); 126 } 127 } else { 128 println!("No settings configured (using defaults)"); 129 } 130 131 // Load redirect rules 132 let redirect_rules = load_redirect_rules(&output_dir); 133 if !redirect_rules.is_empty() { 134 println!("Loaded {} redirect rules from _redirects", redirect_rules.len()); 135 } 136 137 // Create shared state 138 let state = ServerState { 139 did: did_str.clone(), 140 rkey: rkey.clone(), 141 output_dir: output_dir.clone(), 142 last_cid: Arc::new(RwLock::new(None)), 143 redirect_rules: Arc::new(RwLock::new(redirect_rules)), 144 settings: Arc::new(RwLock::new(settings)), 145 }; 146 147 // Start firehose listener in background 148 let firehose_state = state.clone(); 149 tokio::spawn(async move { 150 if let Err(e) = watch_firehose(firehose_state).await { 151 eprintln!("Firehose error: {}", e); 152 } 153 }); 154 155 // Create HTTP server with gzip compression and redirect handling 156 let serve_dir = ServeDir::new(&output_dir).precompressed_gzip(); 157 158 let app = Router::new() 159 .fallback(move |req: Request| { 160 let state = state.clone(); 161 let mut serve_dir = serve_dir.clone(); 162 async move { 163 handle_request_with_redirects(req, state, &mut serve_dir).await 164 } 165 }) 166 .layer(CompressionLayer::new()); 167 168 let addr = format!("0.0.0.0:{}", port); 169 let listener = tokio::net::TcpListener::bind(&addr) 170 .await 171 .into_diagnostic()?; 172 173 println!("\n✓ Server running at http://localhost:{}", port); 174 println!(" Watching for updates on the firehose...\n"); 175 176 axum::serve(listener, app).await.into_diagnostic()?; 177 178 Ok(()) 179} 180 181/// Serve a file for SPA mode 182async fn serve_file_for_spa(output_dir: &Path, spa_file: &str) -> Response { 183 let file_path = output_dir.join(spa_file.trim_start_matches('/')); 184 185 match tokio::fs::read(&file_path).await { 186 Ok(contents) => { 187 Response::builder() 188 .status(StatusCode::OK) 189 .header(header::CONTENT_TYPE, "text/html; charset=utf-8") 190 .body(Body::from(contents)) 191 .unwrap() 192 } 193 Err(_) => { 194 StatusCode::NOT_FOUND.into_response() 195 } 196 } 197} 198 199/// Serve custom 404 page 200async fn serve_custom_404(output_dir: &Path, custom404_file: &str) -> Response { 201 let file_path = output_dir.join(custom404_file.trim_start_matches('/')); 202 203 match tokio::fs::read(&file_path).await { 204 Ok(contents) => { 205 Response::builder() 206 .status(StatusCode::NOT_FOUND) 207 .header(header::CONTENT_TYPE, "text/html; charset=utf-8") 208 .body(Body::from(contents)) 209 .unwrap() 210 } 211 Err(_) => { 212 StatusCode::NOT_FOUND.into_response() 213 } 214 } 215} 216 217/// Serve directory listing 218async fn serve_directory_listing(dir_path: &Path, url_path: &str) -> Response { 219 match tokio::fs::read_dir(dir_path).await { 220 Ok(mut entries) => { 221 let mut html = String::from("<!DOCTYPE html><html><head><meta charset='utf-8'><title>Directory listing</title>"); 222 html.push_str("<style>body{font-family:sans-serif;margin:2em}a{display:block;padding:0.5em;text-decoration:none;color:#0066cc}a:hover{background:#f0f0f0}</style>"); 223 html.push_str("</head><body>"); 224 html.push_str(&format!("<h1>Index of {}</h1>", url_path)); 225 html.push_str("<hr>"); 226 227 // Add parent directory link if not at root 228 if url_path != "/" { 229 let parent = if url_path.ends_with('/') { 230 format!("{}../", url_path) 231 } else { 232 format!("{}/", url_path.rsplitn(2, '/').nth(1).unwrap_or("/")) 233 }; 234 html.push_str(&format!("<a href='{}'>../</a>", parent)); 235 } 236 237 let mut items = Vec::new(); 238 while let Ok(Some(entry)) = entries.next_entry().await { 239 if let Ok(name) = entry.file_name().into_string() { 240 let is_dir = entry.path().is_dir(); 241 let display_name = if is_dir { 242 format!("{}/", name) 243 } else { 244 name.clone() 245 }; 246 247 let link_path = if url_path.ends_with('/') { 248 format!("{}{}", url_path, name) 249 } else { 250 format!("{}/{}", url_path, name) 251 }; 252 253 items.push((display_name, link_path, is_dir)); 254 } 255 } 256 257 // Sort: directories first, then alphabetically 258 items.sort_by(|a, b| { 259 match (a.2, b.2) { 260 (true, false) => std::cmp::Ordering::Less, 261 (false, true) => std::cmp::Ordering::Greater, 262 _ => a.0.cmp(&b.0), 263 } 264 }); 265 266 for (display_name, link_path, _) in items { 267 html.push_str(&format!("<a href='{}'>{}</a>", link_path, display_name)); 268 } 269 270 html.push_str("</body></html>"); 271 272 Response::builder() 273 .status(StatusCode::OK) 274 .header(header::CONTENT_TYPE, "text/html; charset=utf-8") 275 .body(Body::from(html)) 276 .unwrap() 277 } 278 Err(_) => { 279 StatusCode::NOT_FOUND.into_response() 280 } 281 } 282} 283 284/// Handle a request with redirect and settings support 285async fn handle_request_with_redirects( 286 req: Request, 287 state: ServerState, 288 serve_dir: &mut ServeDir, 289) -> Response { 290 let uri = req.uri().clone(); 291 let path = uri.path(); 292 let method = req.method().clone(); 293 294 // Parse query parameters 295 let query_params = uri.query().map(|q| { 296 let mut params = HashMap::new(); 297 for pair in q.split('&') { 298 if let Some((key, value)) = pair.split_once('=') { 299 params.insert(key.to_string(), value.to_string()); 300 } 301 } 302 params 303 }); 304 305 // Get settings 306 let settings = state.settings.read().await.clone(); 307 308 // Check for redirect rules first 309 let redirect_rules = state.redirect_rules.read().await; 310 if let Some(redirect_match) = match_redirect_rule(path, &redirect_rules, query_params.as_ref()) { 311 let is_force = redirect_match.force; 312 drop(redirect_rules); // Release the lock 313 314 // If not forced, check if the file exists first 315 if !is_force { 316 // Try to serve the file normally first 317 let test_req = Request::builder() 318 .uri(uri.clone()) 319 .method(&method) 320 .body(axum::body::Body::empty()) 321 .unwrap(); 322 323 match serve_dir.call(test_req).await { 324 Ok(response) if response.status().is_success() => { 325 // File exists and was served successfully, return it 326 return response.into_response(); 327 } 328 _ => { 329 // File doesn't exist or error, apply redirect 330 } 331 } 332 } 333 334 // Handle different status codes 335 match redirect_match.status { 336 200 => { 337 // Rewrite: serve the target file but keep the URL the same 338 if let Ok(target_uri) = redirect_match.target_path.parse::<Uri>() { 339 let new_req = Request::builder() 340 .uri(target_uri) 341 .method(&method) 342 .body(axum::body::Body::empty()) 343 .unwrap(); 344 345 match serve_dir.call(new_req).await { 346 Ok(response) => response.into_response(), 347 Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(), 348 } 349 } else { 350 StatusCode::INTERNAL_SERVER_ERROR.into_response() 351 } 352 } 353 301 => { 354 // Permanent redirect 355 Redirect::permanent(&redirect_match.target_path).into_response() 356 } 357 302 => { 358 // Temporary redirect 359 Redirect::temporary(&redirect_match.target_path).into_response() 360 } 361 404 => { 362 // Custom 404 page 363 if let Ok(target_uri) = redirect_match.target_path.parse::<Uri>() { 364 let new_req = Request::builder() 365 .uri(target_uri) 366 .method(&method) 367 .body(axum::body::Body::empty()) 368 .unwrap(); 369 370 match serve_dir.call(new_req).await { 371 Ok(mut response) => { 372 *response.status_mut() = StatusCode::NOT_FOUND; 373 response.into_response() 374 } 375 Err(_) => StatusCode::NOT_FOUND.into_response(), 376 } 377 } else { 378 StatusCode::NOT_FOUND.into_response() 379 } 380 } 381 _ => { 382 // Unsupported status code, fall through to normal serving 383 match serve_dir.call(req).await { 384 Ok(response) => response.into_response(), 385 Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(), 386 } 387 } 388 } 389 } else { 390 drop(redirect_rules); 391 392 // No redirect match, try to serve the file 393 let response_result = serve_dir.call(req).await; 394 395 match response_result { 396 Ok(response) if response.status().is_success() => { 397 // File served successfully 398 response.into_response() 399 } 400 Ok(response) if response.status() == StatusCode::NOT_FOUND => { 401 // File not found, check settings for fallback behavior 402 if let Some(ref settings) = settings { 403 // SPA mode takes precedence 404 if let Some(ref spa_file) = settings.spa_mode { 405 // Serve the SPA file for all non-file routes 406 return serve_file_for_spa(&state.output_dir, spa_file.as_ref()).await; 407 } 408 409 // Check if path is a directory and directory listing is enabled 410 if let Some(true) = settings.directory_listing { 411 let file_path = state.output_dir.join(path.trim_start_matches('/')); 412 if file_path.is_dir() { 413 return serve_directory_listing(&file_path, path).await; 414 } 415 } 416 417 // Check for custom 404 418 if let Some(ref custom404) = settings.custom404 { 419 return serve_custom_404(&state.output_dir, custom404.as_ref()).await; 420 } 421 } 422 423 // No special handling, return 404 424 StatusCode::NOT_FOUND.into_response() 425 } 426 Ok(response) => response.into_response(), 427 Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(), 428 } 429 } 430} 431 432/// Watch the firehose for updates to the specific site 433fn watch_firehose(state: ServerState) -> std::pin::Pin<Box<dyn std::future::Future<Output = miette::Result<()>> + Send>> { 434 Box::pin(async move { 435 use jacquard_identity::PublicResolver; 436 use jacquard::prelude::IdentityResolver; 437 438 // Resolve DID to PDS URL 439 let resolver = PublicResolver::default(); 440 let did = Did::new(&state.did).into_diagnostic()?; 441 let pds_url = resolver.pds_for_did(&did).await.into_diagnostic()?; 442 443 println!("[PDS] Resolved DID to PDS: {}", pds_url); 444 445 // Convert HTTP(S) URL to WebSocket URL 446 let mut ws_url = pds_url.clone(); 447 let scheme = if pds_url.scheme() == "https" { "wss" } else { "ws" }; 448 ws_url.set_scheme(scheme) 449 .map_err(|_| miette::miette!("Failed to set WebSocket scheme"))?; 450 451 println!("[PDS] Connecting to {}...", ws_url); 452 453 // Create subscription client 454 let client = TungsteniteSubscriptionClient::from_base_uri(ws_url); 455 456 // Subscribe to the PDS firehose 457 let params = SubscribeRepos::new().build(); 458 459 let stream = client.subscribe(&params).await.into_diagnostic()?; 460 println!("[PDS] Connected! Watching for updates..."); 461 462 // Convert to typed message stream 463 let (_sink, mut messages) = stream.into_stream(); 464 465 loop { 466 match messages.next().await { 467 Some(Ok(msg)) => { 468 if let Err(e) = handle_firehose_message(&state, msg).await { 469 eprintln!("[PDS] Error handling message: {}", e); 470 } 471 } 472 Some(Err(e)) => { 473 eprintln!("[PDS] Stream error: {}", e); 474 // Try to reconnect after a delay 475 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 476 return Box::pin(watch_firehose(state)).await; 477 } 478 None => { 479 println!("[PDS] Stream ended, reconnecting..."); 480 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 481 return Box::pin(watch_firehose(state)).await; 482 } 483 } 484 } 485 }) 486} 487 488/// Handle a firehose message 489async fn handle_firehose_message<'a>( 490 state: &ServerState, 491 msg: SubscribeReposMessage<'a>, 492) -> miette::Result<()> { 493 match msg { 494 SubscribeReposMessage::Commit(commit_msg) => { 495 // Check if this commit is from our DID 496 if commit_msg.repo.as_str() != state.did.as_str() { 497 return Ok(()); 498 } 499 500 // Check if any operation affects our site or settings 501 let site_path = format!("place.wisp.fs/{}", state.rkey); 502 let settings_path = format!("place.wisp.settings/{}", state.rkey); 503 let has_site_update = commit_msg.ops.iter().any(|op| op.path.as_ref() == site_path); 504 let has_settings_update = commit_msg.ops.iter().any(|op| op.path.as_ref() == settings_path); 505 506 if has_site_update { 507 // Debug: log all operations for this commit 508 println!("[Debug] Commit has {} ops for {}", commit_msg.ops.len(), state.rkey); 509 for op in &commit_msg.ops { 510 if op.path.as_ref() == site_path { 511 println!("[Debug] - {} {}", op.action.as_ref(), op.path.as_ref()); 512 } 513 } 514 } 515 516 if has_site_update { 517 // Use the commit CID as the version tracker 518 let commit_cid = commit_msg.commit.to_string(); 519 520 // Check if this is a new commit 521 let should_update = { 522 let last_cid = state.last_cid.read().await; 523 Some(commit_cid.clone()) != *last_cid 524 }; 525 526 if should_update { 527 // Check operation types 528 let has_create_or_update = commit_msg.ops.iter().any(|op| { 529 op.path.as_ref() == site_path && 530 (op.action.as_ref() == "create" || op.action.as_ref() == "update") 531 }); 532 let has_delete = commit_msg.ops.iter().any(|op| { 533 op.path.as_ref() == site_path && op.action.as_ref() == "delete" 534 }); 535 536 // If there's a create/update, pull the site (even if there's also a delete in the same commit) 537 if has_create_or_update { 538 println!("\n[Update] Detected change to site {} (commit: {})", state.rkey, commit_cid); 539 println!("[Update] Pulling latest version..."); 540 541 // Pull the updated site 542 match pull_site( 543 state.did.clone(), 544 state.rkey.clone(), 545 state.output_dir.clone(), 546 ) 547 .await 548 { 549 Ok(_) => { 550 // Update last CID 551 let mut last_cid = state.last_cid.write().await; 552 *last_cid = Some(commit_cid); 553 554 // Reload redirect rules 555 let new_redirect_rules = load_redirect_rules(&state.output_dir); 556 let mut redirect_rules = state.redirect_rules.write().await; 557 *redirect_rules = new_redirect_rules; 558 559 println!("[Update] ✓ Site updated successfully!\n"); 560 } 561 Err(e) => { 562 eprintln!("[Update] Failed to pull site: {}", e); 563 } 564 } 565 } else if has_delete { 566 // Only a delete, no create/update 567 println!("\n[Update] Site {} was deleted", state.rkey); 568 569 // Update last CID so we don't process this commit again 570 let mut last_cid = state.last_cid.write().await; 571 *last_cid = Some(commit_cid); 572 } 573 } 574 } 575 576 // Handle settings updates 577 if has_settings_update { 578 println!("\n[Settings] Detected change to settings"); 579 580 // Resolve PDS URL 581 use jacquard_identity::PublicResolver; 582 use jacquard::prelude::IdentityResolver; 583 584 let resolver = PublicResolver::default(); 585 let did = Did::new(&state.did).into_diagnostic()?; 586 let pds_url = resolver.pds_for_did(&did).await.into_diagnostic()?; 587 588 // Fetch updated settings 589 match fetch_settings(&pds_url, &did, state.rkey.as_ref()).await { 590 Ok(new_settings) => { 591 let mut settings = state.settings.write().await; 592 *settings = new_settings.clone(); 593 drop(settings); 594 595 if let Some(ref s) = new_settings { 596 println!("[Settings] Updated:"); 597 if let Some(true) = s.directory_listing { 598 println!(" • Directory listing: enabled"); 599 } 600 if let Some(ref spa_file) = s.spa_mode { 601 println!(" • SPA mode: enabled ({})", spa_file); 602 } 603 if let Some(ref custom404) = s.custom404 { 604 println!(" • Custom 404: {}", custom404); 605 } 606 } else { 607 println!("[Settings] Cleared (using defaults)"); 608 } 609 } 610 Err(e) => { 611 eprintln!("[Settings] Failed to fetch updated settings: {}", e); 612 } 613 } 614 } 615 } 616 _ => { 617 // Ignore identity and account messages 618 } 619 } 620 621 Ok(()) 622} 623