Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol.
wisp.place
1use crate::pull::pull_site;
2use crate::redirects::{load_redirect_rules, match_redirect_rule, RedirectRule};
3use crate::place_wisp::settings::Settings;
4use axum::{
5 Router,
6 extract::Request,
7 response::{Response, IntoResponse, Redirect},
8 http::{StatusCode, Uri, header},
9 body::Body,
10};
11use jacquard::CowStr;
12use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage};
13use jacquard::api::com_atproto::repo::get_record::GetRecord;
14use jacquard_common::types::string::Did;
15use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient, XrpcExt};
16use jacquard_common::IntoStatic;
17use jacquard_common::types::value::from_data;
18use miette::IntoDiagnostic;
19use n0_future::StreamExt;
20use std::collections::HashMap;
21use std::path::{PathBuf, Path};
22use std::sync::Arc;
23use tokio::sync::RwLock;
24use tower::Service;
25use tower_http::compression::CompressionLayer;
26use tower_http::services::ServeDir;
27
28/// Shared state for the server
29#[derive(Clone)]
30struct ServerState {
31 did: CowStr<'static>,
32 rkey: CowStr<'static>,
33 output_dir: PathBuf,
34 last_cid: Arc<RwLock<Option<String>>>,
35 redirect_rules: Arc<RwLock<Vec<RedirectRule>>>,
36 settings: Arc<RwLock<Option<Settings<'static>>>>,
37}
38
39/// Fetch settings for a site from the PDS
40async fn fetch_settings(
41 pds_url: &url::Url,
42 did: &Did<'_>,
43 rkey: &str,
44) -> miette::Result<Option<Settings<'static>>> {
45 use jacquard_common::types::ident::AtIdentifier;
46 use jacquard_common::types::string::{Rkey as RkeyType, RecordKey};
47
48 let client = reqwest::Client::new();
49 let rkey_parsed = RkeyType::new(rkey).into_diagnostic()?;
50
51 let request = GetRecord::new()
52 .repo(AtIdentifier::Did(did.clone()))
53 .collection(CowStr::from("place.wisp.settings"))
54 .rkey(RecordKey::from(rkey_parsed))
55 .build();
56
57 match client.xrpc(pds_url.clone()).send(&request).await {
58 Ok(response) => {
59 let output = response.into_output().into_diagnostic()?;
60
61 // Parse the record value as Settings
62 match from_data::<Settings>(&output.value) {
63 Ok(settings) => {
64 Ok(Some(settings.into_static()))
65 }
66 Err(_) => {
67 // Settings record exists but couldn't parse - use defaults
68 Ok(None)
69 }
70 }
71 }
72 Err(_) => {
73 // Settings record doesn't exist
74 Ok(None)
75 }
76 }
77}
78
79/// Serve a site locally with real-time firehose updates
80pub async fn serve_site(
81 input: CowStr<'static>,
82 rkey: CowStr<'static>,
83 output_dir: PathBuf,
84 port: u16,
85) -> miette::Result<()> {
86 println!("Serving site {} from {} on port {}...", rkey, input, port);
87
88 // Resolve handle to DID if needed
89 use jacquard_identity::PublicResolver;
90 use jacquard::prelude::IdentityResolver;
91
92 let resolver = PublicResolver::default();
93 let did = if input.starts_with("did:") {
94 Did::new(&input).into_diagnostic()?
95 } else {
96 // It's a handle, resolve it
97 let handle = jacquard_common::types::string::Handle::new(&input).into_diagnostic()?;
98 resolver.resolve_handle(&handle).await.into_diagnostic()?
99 };
100
101 println!("Resolved to DID: {}", did.as_str());
102
103 // Resolve PDS URL (needed for settings fetch)
104 let pds_url = resolver.pds_for_did(&did).await.into_diagnostic()?;
105
106 // Create output directory if it doesn't exist
107 std::fs::create_dir_all(&output_dir).into_diagnostic()?;
108
109 // Initial pull of the site
110 println!("Performing initial pull...");
111 let did_str = CowStr::from(did.as_str().to_string());
112 pull_site(did_str.clone(), rkey.clone(), output_dir.clone()).await?;
113
114 // Fetch settings
115 let settings = fetch_settings(&pds_url, &did, rkey.as_ref()).await?;
116 if let Some(ref s) = settings {
117 println!("\nSettings loaded:");
118 if let Some(true) = s.directory_listing {
119 println!(" • Directory listing: enabled");
120 }
121 if let Some(ref spa_file) = s.spa_mode {
122 println!(" • SPA mode: enabled ({})", spa_file);
123 }
124 if let Some(ref custom404) = s.custom404 {
125 println!(" • Custom 404: {}", custom404);
126 }
127 } else {
128 println!("No settings configured (using defaults)");
129 }
130
131 // Load redirect rules
132 let redirect_rules = load_redirect_rules(&output_dir);
133 if !redirect_rules.is_empty() {
134 println!("Loaded {} redirect rules from _redirects", redirect_rules.len());
135 }
136
137 // Create shared state
138 let state = ServerState {
139 did: did_str.clone(),
140 rkey: rkey.clone(),
141 output_dir: output_dir.clone(),
142 last_cid: Arc::new(RwLock::new(None)),
143 redirect_rules: Arc::new(RwLock::new(redirect_rules)),
144 settings: Arc::new(RwLock::new(settings)),
145 };
146
147 // Start firehose listener in background
148 let firehose_state = state.clone();
149 tokio::spawn(async move {
150 if let Err(e) = watch_firehose(firehose_state).await {
151 eprintln!("Firehose error: {}", e);
152 }
153 });
154
155 // Create HTTP server with gzip compression and redirect handling
156 let serve_dir = ServeDir::new(&output_dir).precompressed_gzip();
157
158 let app = Router::new()
159 .fallback(move |req: Request| {
160 let state = state.clone();
161 let mut serve_dir = serve_dir.clone();
162 async move {
163 handle_request_with_redirects(req, state, &mut serve_dir).await
164 }
165 })
166 .layer(CompressionLayer::new());
167
168 let addr = format!("0.0.0.0:{}", port);
169 let listener = tokio::net::TcpListener::bind(&addr)
170 .await
171 .into_diagnostic()?;
172
173 println!("\n✓ Server running at http://localhost:{}", port);
174 println!(" Watching for updates on the firehose...\n");
175
176 axum::serve(listener, app).await.into_diagnostic()?;
177
178 Ok(())
179}
180
181/// Serve a file for SPA mode
182async fn serve_file_for_spa(output_dir: &Path, spa_file: &str) -> Response {
183 let file_path = output_dir.join(spa_file.trim_start_matches('/'));
184
185 match tokio::fs::read(&file_path).await {
186 Ok(contents) => {
187 Response::builder()
188 .status(StatusCode::OK)
189 .header(header::CONTENT_TYPE, "text/html; charset=utf-8")
190 .body(Body::from(contents))
191 .unwrap()
192 }
193 Err(_) => {
194 StatusCode::NOT_FOUND.into_response()
195 }
196 }
197}
198
199/// Serve custom 404 page
200async fn serve_custom_404(output_dir: &Path, custom404_file: &str) -> Response {
201 let file_path = output_dir.join(custom404_file.trim_start_matches('/'));
202
203 match tokio::fs::read(&file_path).await {
204 Ok(contents) => {
205 Response::builder()
206 .status(StatusCode::NOT_FOUND)
207 .header(header::CONTENT_TYPE, "text/html; charset=utf-8")
208 .body(Body::from(contents))
209 .unwrap()
210 }
211 Err(_) => {
212 StatusCode::NOT_FOUND.into_response()
213 }
214 }
215}
216
217/// Serve directory listing
218async fn serve_directory_listing(dir_path: &Path, url_path: &str) -> Response {
219 match tokio::fs::read_dir(dir_path).await {
220 Ok(mut entries) => {
221 let mut html = String::from("<!DOCTYPE html><html><head><meta charset='utf-8'><title>Directory listing</title>");
222 html.push_str("<style>body{font-family:sans-serif;margin:2em}a{display:block;padding:0.5em;text-decoration:none;color:#0066cc}a:hover{background:#f0f0f0}</style>");
223 html.push_str("</head><body>");
224 html.push_str(&format!("<h1>Index of {}</h1>", url_path));
225 html.push_str("<hr>");
226
227 // Add parent directory link if not at root
228 if url_path != "/" {
229 let parent = if url_path.ends_with('/') {
230 format!("{}../", url_path)
231 } else {
232 format!("{}/", url_path.rsplitn(2, '/').nth(1).unwrap_or("/"))
233 };
234 html.push_str(&format!("<a href='{}'>../</a>", parent));
235 }
236
237 let mut items = Vec::new();
238 while let Ok(Some(entry)) = entries.next_entry().await {
239 if let Ok(name) = entry.file_name().into_string() {
240 let is_dir = entry.path().is_dir();
241 let display_name = if is_dir {
242 format!("{}/", name)
243 } else {
244 name.clone()
245 };
246
247 let link_path = if url_path.ends_with('/') {
248 format!("{}{}", url_path, name)
249 } else {
250 format!("{}/{}", url_path, name)
251 };
252
253 items.push((display_name, link_path, is_dir));
254 }
255 }
256
257 // Sort: directories first, then alphabetically
258 items.sort_by(|a, b| {
259 match (a.2, b.2) {
260 (true, false) => std::cmp::Ordering::Less,
261 (false, true) => std::cmp::Ordering::Greater,
262 _ => a.0.cmp(&b.0),
263 }
264 });
265
266 for (display_name, link_path, _) in items {
267 html.push_str(&format!("<a href='{}'>{}</a>", link_path, display_name));
268 }
269
270 html.push_str("</body></html>");
271
272 Response::builder()
273 .status(StatusCode::OK)
274 .header(header::CONTENT_TYPE, "text/html; charset=utf-8")
275 .body(Body::from(html))
276 .unwrap()
277 }
278 Err(_) => {
279 StatusCode::NOT_FOUND.into_response()
280 }
281 }
282}
283
284/// Handle a request with redirect and settings support
285async fn handle_request_with_redirects(
286 req: Request,
287 state: ServerState,
288 serve_dir: &mut ServeDir,
289) -> Response {
290 let uri = req.uri().clone();
291 let path = uri.path();
292 let method = req.method().clone();
293
294 // Parse query parameters
295 let query_params = uri.query().map(|q| {
296 let mut params = HashMap::new();
297 for pair in q.split('&') {
298 if let Some((key, value)) = pair.split_once('=') {
299 params.insert(key.to_string(), value.to_string());
300 }
301 }
302 params
303 });
304
305 // Get settings
306 let settings = state.settings.read().await.clone();
307
308 // Check for redirect rules first
309 let redirect_rules = state.redirect_rules.read().await;
310 if let Some(redirect_match) = match_redirect_rule(path, &redirect_rules, query_params.as_ref()) {
311 let is_force = redirect_match.force;
312 drop(redirect_rules); // Release the lock
313
314 // If not forced, check if the file exists first
315 if !is_force {
316 // Try to serve the file normally first
317 let test_req = Request::builder()
318 .uri(uri.clone())
319 .method(&method)
320 .body(axum::body::Body::empty())
321 .unwrap();
322
323 match serve_dir.call(test_req).await {
324 Ok(response) if response.status().is_success() => {
325 // File exists and was served successfully, return it
326 return response.into_response();
327 }
328 _ => {
329 // File doesn't exist or error, apply redirect
330 }
331 }
332 }
333
334 // Handle different status codes
335 match redirect_match.status {
336 200 => {
337 // Rewrite: serve the target file but keep the URL the same
338 if let Ok(target_uri) = redirect_match.target_path.parse::<Uri>() {
339 let new_req = Request::builder()
340 .uri(target_uri)
341 .method(&method)
342 .body(axum::body::Body::empty())
343 .unwrap();
344
345 match serve_dir.call(new_req).await {
346 Ok(response) => response.into_response(),
347 Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
348 }
349 } else {
350 StatusCode::INTERNAL_SERVER_ERROR.into_response()
351 }
352 }
353 301 => {
354 // Permanent redirect
355 Redirect::permanent(&redirect_match.target_path).into_response()
356 }
357 302 => {
358 // Temporary redirect
359 Redirect::temporary(&redirect_match.target_path).into_response()
360 }
361 404 => {
362 // Custom 404 page
363 if let Ok(target_uri) = redirect_match.target_path.parse::<Uri>() {
364 let new_req = Request::builder()
365 .uri(target_uri)
366 .method(&method)
367 .body(axum::body::Body::empty())
368 .unwrap();
369
370 match serve_dir.call(new_req).await {
371 Ok(mut response) => {
372 *response.status_mut() = StatusCode::NOT_FOUND;
373 response.into_response()
374 }
375 Err(_) => StatusCode::NOT_FOUND.into_response(),
376 }
377 } else {
378 StatusCode::NOT_FOUND.into_response()
379 }
380 }
381 _ => {
382 // Unsupported status code, fall through to normal serving
383 match serve_dir.call(req).await {
384 Ok(response) => response.into_response(),
385 Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
386 }
387 }
388 }
389 } else {
390 drop(redirect_rules);
391
392 // No redirect match, try to serve the file
393 let response_result = serve_dir.call(req).await;
394
395 match response_result {
396 Ok(response) if response.status().is_success() => {
397 // File served successfully
398 response.into_response()
399 }
400 Ok(response) if response.status() == StatusCode::NOT_FOUND => {
401 // File not found, check settings for fallback behavior
402 if let Some(ref settings) = settings {
403 // SPA mode takes precedence
404 if let Some(ref spa_file) = settings.spa_mode {
405 // Serve the SPA file for all non-file routes
406 return serve_file_for_spa(&state.output_dir, spa_file.as_ref()).await;
407 }
408
409 // Check if path is a directory and directory listing is enabled
410 if let Some(true) = settings.directory_listing {
411 let file_path = state.output_dir.join(path.trim_start_matches('/'));
412 if file_path.is_dir() {
413 return serve_directory_listing(&file_path, path).await;
414 }
415 }
416
417 // Check for custom 404
418 if let Some(ref custom404) = settings.custom404 {
419 return serve_custom_404(&state.output_dir, custom404.as_ref()).await;
420 }
421 }
422
423 // No special handling, return 404
424 StatusCode::NOT_FOUND.into_response()
425 }
426 Ok(response) => response.into_response(),
427 Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
428 }
429 }
430}
431
432/// Watch the firehose for updates to the specific site
433fn watch_firehose(state: ServerState) -> std::pin::Pin<Box<dyn std::future::Future<Output = miette::Result<()>> + Send>> {
434 Box::pin(async move {
435 use jacquard_identity::PublicResolver;
436 use jacquard::prelude::IdentityResolver;
437
438 // Resolve DID to PDS URL
439 let resolver = PublicResolver::default();
440 let did = Did::new(&state.did).into_diagnostic()?;
441 let pds_url = resolver.pds_for_did(&did).await.into_diagnostic()?;
442
443 println!("[PDS] Resolved DID to PDS: {}", pds_url);
444
445 // Convert HTTP(S) URL to WebSocket URL
446 let mut ws_url = pds_url.clone();
447 let scheme = if pds_url.scheme() == "https" { "wss" } else { "ws" };
448 ws_url.set_scheme(scheme)
449 .map_err(|_| miette::miette!("Failed to set WebSocket scheme"))?;
450
451 println!("[PDS] Connecting to {}...", ws_url);
452
453 // Create subscription client
454 let client = TungsteniteSubscriptionClient::from_base_uri(ws_url);
455
456 // Subscribe to the PDS firehose
457 let params = SubscribeRepos::new().build();
458
459 let stream = client.subscribe(¶ms).await.into_diagnostic()?;
460 println!("[PDS] Connected! Watching for updates...");
461
462 // Convert to typed message stream
463 let (_sink, mut messages) = stream.into_stream();
464
465 loop {
466 match messages.next().await {
467 Some(Ok(msg)) => {
468 if let Err(e) = handle_firehose_message(&state, msg).await {
469 eprintln!("[PDS] Error handling message: {}", e);
470 }
471 }
472 Some(Err(e)) => {
473 eprintln!("[PDS] Stream error: {}", e);
474 // Try to reconnect after a delay
475 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
476 return Box::pin(watch_firehose(state)).await;
477 }
478 None => {
479 println!("[PDS] Stream ended, reconnecting...");
480 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
481 return Box::pin(watch_firehose(state)).await;
482 }
483 }
484 }
485 })
486}
487
488/// Handle a firehose message
489async fn handle_firehose_message<'a>(
490 state: &ServerState,
491 msg: SubscribeReposMessage<'a>,
492) -> miette::Result<()> {
493 match msg {
494 SubscribeReposMessage::Commit(commit_msg) => {
495 // Check if this commit is from our DID
496 if commit_msg.repo.as_str() != state.did.as_str() {
497 return Ok(());
498 }
499
500 // Check if any operation affects our site or settings
501 let site_path = format!("place.wisp.fs/{}", state.rkey);
502 let settings_path = format!("place.wisp.settings/{}", state.rkey);
503 let has_site_update = commit_msg.ops.iter().any(|op| op.path.as_ref() == site_path);
504 let has_settings_update = commit_msg.ops.iter().any(|op| op.path.as_ref() == settings_path);
505
506 if has_site_update {
507 // Debug: log all operations for this commit
508 println!("[Debug] Commit has {} ops for {}", commit_msg.ops.len(), state.rkey);
509 for op in &commit_msg.ops {
510 if op.path.as_ref() == site_path {
511 println!("[Debug] - {} {}", op.action.as_ref(), op.path.as_ref());
512 }
513 }
514 }
515
516 if has_site_update {
517 // Use the commit CID as the version tracker
518 let commit_cid = commit_msg.commit.to_string();
519
520 // Check if this is a new commit
521 let should_update = {
522 let last_cid = state.last_cid.read().await;
523 Some(commit_cid.clone()) != *last_cid
524 };
525
526 if should_update {
527 // Check operation types
528 let has_create_or_update = commit_msg.ops.iter().any(|op| {
529 op.path.as_ref() == site_path &&
530 (op.action.as_ref() == "create" || op.action.as_ref() == "update")
531 });
532 let has_delete = commit_msg.ops.iter().any(|op| {
533 op.path.as_ref() == site_path && op.action.as_ref() == "delete"
534 });
535
536 // If there's a create/update, pull the site (even if there's also a delete in the same commit)
537 if has_create_or_update {
538 println!("\n[Update] Detected change to site {} (commit: {})", state.rkey, commit_cid);
539 println!("[Update] Pulling latest version...");
540
541 // Pull the updated site
542 match pull_site(
543 state.did.clone(),
544 state.rkey.clone(),
545 state.output_dir.clone(),
546 )
547 .await
548 {
549 Ok(_) => {
550 // Update last CID
551 let mut last_cid = state.last_cid.write().await;
552 *last_cid = Some(commit_cid);
553
554 // Reload redirect rules
555 let new_redirect_rules = load_redirect_rules(&state.output_dir);
556 let mut redirect_rules = state.redirect_rules.write().await;
557 *redirect_rules = new_redirect_rules;
558
559 println!("[Update] ✓ Site updated successfully!\n");
560 }
561 Err(e) => {
562 eprintln!("[Update] Failed to pull site: {}", e);
563 }
564 }
565 } else if has_delete {
566 // Only a delete, no create/update
567 println!("\n[Update] Site {} was deleted", state.rkey);
568
569 // Update last CID so we don't process this commit again
570 let mut last_cid = state.last_cid.write().await;
571 *last_cid = Some(commit_cid);
572 }
573 }
574 }
575
576 // Handle settings updates
577 if has_settings_update {
578 println!("\n[Settings] Detected change to settings");
579
580 // Resolve PDS URL
581 use jacquard_identity::PublicResolver;
582 use jacquard::prelude::IdentityResolver;
583
584 let resolver = PublicResolver::default();
585 let did = Did::new(&state.did).into_diagnostic()?;
586 let pds_url = resolver.pds_for_did(&did).await.into_diagnostic()?;
587
588 // Fetch updated settings
589 match fetch_settings(&pds_url, &did, state.rkey.as_ref()).await {
590 Ok(new_settings) => {
591 let mut settings = state.settings.write().await;
592 *settings = new_settings.clone();
593 drop(settings);
594
595 if let Some(ref s) = new_settings {
596 println!("[Settings] Updated:");
597 if let Some(true) = s.directory_listing {
598 println!(" • Directory listing: enabled");
599 }
600 if let Some(ref spa_file) = s.spa_mode {
601 println!(" • SPA mode: enabled ({})", spa_file);
602 }
603 if let Some(ref custom404) = s.custom404 {
604 println!(" • Custom 404: {}", custom404);
605 }
606 } else {
607 println!("[Settings] Cleared (using defaults)");
608 }
609 }
610 Err(e) => {
611 eprintln!("[Settings] Failed to fetch updated settings: {}", e);
612 }
613 }
614 }
615 }
616 _ => {
617 // Ignore identity and account messages
618 }
619 }
620
621 Ok(())
622}
623