Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place
1use crate::pull::pull_site; 2use crate::redirects::{load_redirect_rules, match_redirect_rule, RedirectRule}; 3use axum::{ 4 Router, 5 extract::Request, 6 response::{Response, IntoResponse, Redirect}, 7 http::{StatusCode, Uri}, 8}; 9use jacquard::CowStr; 10use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage}; 11use jacquard_common::types::string::Did; 12use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient}; 13use miette::IntoDiagnostic; 14use n0_future::StreamExt; 15use std::collections::HashMap; 16use std::path::PathBuf; 17use std::sync::Arc; 18use tokio::sync::RwLock; 19use tower::Service; 20use tower_http::compression::CompressionLayer; 21use tower_http::services::ServeDir; 22 23/// Shared state for the server 24#[derive(Clone)] 25struct ServerState { 26 did: CowStr<'static>, 27 rkey: CowStr<'static>, 28 output_dir: PathBuf, 29 last_cid: Arc<RwLock<Option<String>>>, 30 redirect_rules: Arc<RwLock<Vec<RedirectRule>>>, 31} 32 33/// Serve a site locally with real-time firehose updates 34pub async fn serve_site( 35 input: CowStr<'static>, 36 rkey: CowStr<'static>, 37 output_dir: PathBuf, 38 port: u16, 39) -> miette::Result<()> { 40 println!("Serving site {} from {} on port {}...", rkey, input, port); 41 42 // Resolve handle to DID if needed 43 use jacquard_identity::PublicResolver; 44 use jacquard::prelude::IdentityResolver; 45 46 let resolver = PublicResolver::default(); 47 let did = if input.starts_with("did:") { 48 Did::new(&input).into_diagnostic()? 49 } else { 50 // It's a handle, resolve it 51 let handle = jacquard_common::types::string::Handle::new(&input).into_diagnostic()?; 52 resolver.resolve_handle(&handle).await.into_diagnostic()? 53 }; 54 55 println!("Resolved to DID: {}", did.as_str()); 56 57 // Create output directory if it doesn't exist 58 std::fs::create_dir_all(&output_dir).into_diagnostic()?; 59 60 // Initial pull of the site 61 println!("Performing initial pull..."); 62 let did_str = CowStr::from(did.as_str().to_string()); 63 pull_site(did_str.clone(), rkey.clone(), output_dir.clone()).await?; 64 65 // Load redirect rules 66 let redirect_rules = load_redirect_rules(&output_dir); 67 if !redirect_rules.is_empty() { 68 println!("Loaded {} redirect rules from _redirects", redirect_rules.len()); 69 } 70 71 // Create shared state 72 let state = ServerState { 73 did: did_str.clone(), 74 rkey: rkey.clone(), 75 output_dir: output_dir.clone(), 76 last_cid: Arc::new(RwLock::new(None)), 77 redirect_rules: Arc::new(RwLock::new(redirect_rules)), 78 }; 79 80 // Start firehose listener in background 81 let firehose_state = state.clone(); 82 tokio::spawn(async move { 83 if let Err(e) = watch_firehose(firehose_state).await { 84 eprintln!("Firehose error: {}", e); 85 } 86 }); 87 88 // Create HTTP server with gzip compression and redirect handling 89 let serve_dir = ServeDir::new(&output_dir).precompressed_gzip(); 90 91 let app = Router::new() 92 .fallback(move |req: Request| { 93 let state = state.clone(); 94 let mut serve_dir = serve_dir.clone(); 95 async move { 96 handle_request_with_redirects(req, state, &mut serve_dir).await 97 } 98 }) 99 .layer(CompressionLayer::new()); 100 101 let addr = format!("0.0.0.0:{}", port); 102 let listener = tokio::net::TcpListener::bind(&addr) 103 .await 104 .into_diagnostic()?; 105 106 println!("\n✓ Server running at http://localhost:{}", port); 107 println!(" Watching for updates on the firehose...\n"); 108 109 axum::serve(listener, app).await.into_diagnostic()?; 110 111 Ok(()) 112} 113 114/// Handle a request with redirect support 115async fn handle_request_with_redirects( 116 req: Request, 117 state: ServerState, 118 serve_dir: &mut ServeDir, 119) -> Response { 120 let uri = req.uri().clone(); 121 let path = uri.path(); 122 let method = req.method().clone(); 123 124 // Parse query parameters 125 let query_params = uri.query().map(|q| { 126 let mut params = HashMap::new(); 127 for pair in q.split('&') { 128 if let Some((key, value)) = pair.split_once('=') { 129 params.insert(key.to_string(), value.to_string()); 130 } 131 } 132 params 133 }); 134 135 // Check for redirect rules 136 let redirect_rules = state.redirect_rules.read().await; 137 if let Some(redirect_match) = match_redirect_rule(path, &redirect_rules, query_params.as_ref()) { 138 let is_force = redirect_match.force; 139 drop(redirect_rules); // Release the lock 140 141 // If not forced, check if the file exists first 142 if !is_force { 143 // Try to serve the file normally first 144 let test_req = Request::builder() 145 .uri(uri.clone()) 146 .method(&method) 147 .body(axum::body::Body::empty()) 148 .unwrap(); 149 150 match serve_dir.call(test_req).await { 151 Ok(response) if response.status().is_success() => { 152 // File exists and was served successfully, return it 153 return response.into_response(); 154 } 155 _ => { 156 // File doesn't exist or error, apply redirect 157 } 158 } 159 } 160 161 // Handle different status codes 162 match redirect_match.status { 163 200 => { 164 // Rewrite: serve the target file but keep the URL the same 165 if let Ok(target_uri) = redirect_match.target_path.parse::<Uri>() { 166 let new_req = Request::builder() 167 .uri(target_uri) 168 .method(&method) 169 .body(axum::body::Body::empty()) 170 .unwrap(); 171 172 match serve_dir.call(new_req).await { 173 Ok(response) => response.into_response(), 174 Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(), 175 } 176 } else { 177 StatusCode::INTERNAL_SERVER_ERROR.into_response() 178 } 179 } 180 301 => { 181 // Permanent redirect 182 Redirect::permanent(&redirect_match.target_path).into_response() 183 } 184 302 => { 185 // Temporary redirect 186 Redirect::temporary(&redirect_match.target_path).into_response() 187 } 188 404 => { 189 // Custom 404 page 190 if let Ok(target_uri) = redirect_match.target_path.parse::<Uri>() { 191 let new_req = Request::builder() 192 .uri(target_uri) 193 .method(&method) 194 .body(axum::body::Body::empty()) 195 .unwrap(); 196 197 match serve_dir.call(new_req).await { 198 Ok(mut response) => { 199 *response.status_mut() = StatusCode::NOT_FOUND; 200 response.into_response() 201 } 202 Err(_) => StatusCode::NOT_FOUND.into_response(), 203 } 204 } else { 205 StatusCode::NOT_FOUND.into_response() 206 } 207 } 208 _ => { 209 // Unsupported status code, fall through to normal serving 210 match serve_dir.call(req).await { 211 Ok(response) => response.into_response(), 212 Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(), 213 } 214 } 215 } 216 } else { 217 drop(redirect_rules); 218 // No redirect match, serve normally 219 match serve_dir.call(req).await { 220 Ok(response) => response.into_response(), 221 Err(_) => StatusCode::NOT_FOUND.into_response(), 222 } 223 } 224} 225 226/// Watch the firehose for updates to the specific site 227fn watch_firehose(state: ServerState) -> std::pin::Pin<Box<dyn std::future::Future<Output = miette::Result<()>> + Send>> { 228 Box::pin(async move { 229 use jacquard_identity::PublicResolver; 230 use jacquard::prelude::IdentityResolver; 231 232 // Resolve DID to PDS URL 233 let resolver = PublicResolver::default(); 234 let did = Did::new(&state.did).into_diagnostic()?; 235 let pds_url = resolver.pds_for_did(&did).await.into_diagnostic()?; 236 237 println!("[PDS] Resolved DID to PDS: {}", pds_url); 238 239 // Convert HTTP(S) URL to WebSocket URL 240 let mut ws_url = pds_url.clone(); 241 let scheme = if pds_url.scheme() == "https" { "wss" } else { "ws" }; 242 ws_url.set_scheme(scheme) 243 .map_err(|_| miette::miette!("Failed to set WebSocket scheme"))?; 244 245 println!("[PDS] Connecting to {}...", ws_url); 246 247 // Create subscription client 248 let client = TungsteniteSubscriptionClient::from_base_uri(ws_url); 249 250 // Subscribe to the PDS firehose 251 let params = SubscribeRepos::new().build(); 252 253 let stream = client.subscribe(&params).await.into_diagnostic()?; 254 println!("[PDS] Connected! Watching for updates..."); 255 256 // Convert to typed message stream 257 let (_sink, mut messages) = stream.into_stream(); 258 259 loop { 260 match messages.next().await { 261 Some(Ok(msg)) => { 262 if let Err(e) = handle_firehose_message(&state, msg).await { 263 eprintln!("[PDS] Error handling message: {}", e); 264 } 265 } 266 Some(Err(e)) => { 267 eprintln!("[PDS] Stream error: {}", e); 268 // Try to reconnect after a delay 269 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 270 return Box::pin(watch_firehose(state)).await; 271 } 272 None => { 273 println!("[PDS] Stream ended, reconnecting..."); 274 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 275 return Box::pin(watch_firehose(state)).await; 276 } 277 } 278 } 279 }) 280} 281 282/// Handle a firehose message 283async fn handle_firehose_message<'a>( 284 state: &ServerState, 285 msg: SubscribeReposMessage<'a>, 286) -> miette::Result<()> { 287 match msg { 288 SubscribeReposMessage::Commit(commit_msg) => { 289 // Check if this commit is from our DID 290 if commit_msg.repo.as_str() != state.did.as_str() { 291 return Ok(()); 292 } 293 294 // Check if any operation affects our site 295 let target_path = format!("place.wisp.fs/{}", state.rkey); 296 let has_site_update = commit_msg.ops.iter().any(|op| op.path.as_ref() == target_path); 297 298 if has_site_update { 299 // Debug: log all operations for this commit 300 println!("[Debug] Commit has {} ops for {}", commit_msg.ops.len(), state.rkey); 301 for op in &commit_msg.ops { 302 if op.path.as_ref() == target_path { 303 println!("[Debug] - {} {}", op.action.as_ref(), op.path.as_ref()); 304 } 305 } 306 } 307 308 if has_site_update { 309 // Use the commit CID as the version tracker 310 let commit_cid = commit_msg.commit.to_string(); 311 312 // Check if this is a new commit 313 let should_update = { 314 let last_cid = state.last_cid.read().await; 315 Some(commit_cid.clone()) != *last_cid 316 }; 317 318 if should_update { 319 // Check operation types 320 let has_create_or_update = commit_msg.ops.iter().any(|op| { 321 op.path.as_ref() == target_path && 322 (op.action.as_ref() == "create" || op.action.as_ref() == "update") 323 }); 324 let has_delete = commit_msg.ops.iter().any(|op| { 325 op.path.as_ref() == target_path && op.action.as_ref() == "delete" 326 }); 327 328 // If there's a create/update, pull the site (even if there's also a delete in the same commit) 329 if has_create_or_update { 330 println!("\n[Update] Detected change to site {} (commit: {})", state.rkey, commit_cid); 331 println!("[Update] Pulling latest version..."); 332 333 // Pull the updated site 334 match pull_site( 335 state.did.clone(), 336 state.rkey.clone(), 337 state.output_dir.clone(), 338 ) 339 .await 340 { 341 Ok(_) => { 342 // Update last CID 343 let mut last_cid = state.last_cid.write().await; 344 *last_cid = Some(commit_cid); 345 346 // Reload redirect rules 347 let new_redirect_rules = load_redirect_rules(&state.output_dir); 348 let mut redirect_rules = state.redirect_rules.write().await; 349 *redirect_rules = new_redirect_rules; 350 351 println!("[Update] ✓ Site updated successfully!\n"); 352 } 353 Err(e) => { 354 eprintln!("[Update] Failed to pull site: {}", e); 355 } 356 } 357 } else if has_delete { 358 // Only a delete, no create/update 359 println!("\n[Update] Site {} was deleted", state.rkey); 360 361 // Update last CID so we don't process this commit again 362 let mut last_cid = state.last_cid.write().await; 363 *last_cid = Some(commit_cid); 364 } 365 } 366 } 367 } 368 _ => { 369 // Ignore identity and account messages 370 } 371 } 372 373 Ok(()) 374} 375