Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol.
wisp.place
1use crate::pull::pull_site;
2use crate::redirects::{load_redirect_rules, match_redirect_rule, RedirectRule};
3use axum::{
4 Router,
5 extract::Request,
6 response::{Response, IntoResponse, Redirect},
7 http::{StatusCode, Uri},
8};
9use jacquard::CowStr;
10use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage};
11use jacquard_common::types::string::Did;
12use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient};
13use miette::IntoDiagnostic;
14use n0_future::StreamExt;
15use std::collections::HashMap;
16use std::path::PathBuf;
17use std::sync::Arc;
18use tokio::sync::RwLock;
19use tower::Service;
20use tower_http::compression::CompressionLayer;
21use tower_http::services::ServeDir;
22
23/// Shared state for the server
24#[derive(Clone)]
25struct ServerState {
26 did: CowStr<'static>,
27 rkey: CowStr<'static>,
28 output_dir: PathBuf,
29 last_cid: Arc<RwLock<Option<String>>>,
30 redirect_rules: Arc<RwLock<Vec<RedirectRule>>>,
31}
32
33/// Serve a site locally with real-time firehose updates
34pub async fn serve_site(
35 input: CowStr<'static>,
36 rkey: CowStr<'static>,
37 output_dir: PathBuf,
38 port: u16,
39) -> miette::Result<()> {
40 println!("Serving site {} from {} on port {}...", rkey, input, port);
41
42 // Resolve handle to DID if needed
43 use jacquard_identity::PublicResolver;
44 use jacquard::prelude::IdentityResolver;
45
46 let resolver = PublicResolver::default();
47 let did = if input.starts_with("did:") {
48 Did::new(&input).into_diagnostic()?
49 } else {
50 // It's a handle, resolve it
51 let handle = jacquard_common::types::string::Handle::new(&input).into_diagnostic()?;
52 resolver.resolve_handle(&handle).await.into_diagnostic()?
53 };
54
55 println!("Resolved to DID: {}", did.as_str());
56
57 // Create output directory if it doesn't exist
58 std::fs::create_dir_all(&output_dir).into_diagnostic()?;
59
60 // Initial pull of the site
61 println!("Performing initial pull...");
62 let did_str = CowStr::from(did.as_str().to_string());
63 pull_site(did_str.clone(), rkey.clone(), output_dir.clone()).await?;
64
65 // Load redirect rules
66 let redirect_rules = load_redirect_rules(&output_dir);
67 if !redirect_rules.is_empty() {
68 println!("Loaded {} redirect rules from _redirects", redirect_rules.len());
69 }
70
71 // Create shared state
72 let state = ServerState {
73 did: did_str.clone(),
74 rkey: rkey.clone(),
75 output_dir: output_dir.clone(),
76 last_cid: Arc::new(RwLock::new(None)),
77 redirect_rules: Arc::new(RwLock::new(redirect_rules)),
78 };
79
80 // Start firehose listener in background
81 let firehose_state = state.clone();
82 tokio::spawn(async move {
83 if let Err(e) = watch_firehose(firehose_state).await {
84 eprintln!("Firehose error: {}", e);
85 }
86 });
87
88 // Create HTTP server with gzip compression and redirect handling
89 let serve_dir = ServeDir::new(&output_dir).precompressed_gzip();
90
91 let app = Router::new()
92 .fallback(move |req: Request| {
93 let state = state.clone();
94 let mut serve_dir = serve_dir.clone();
95 async move {
96 handle_request_with_redirects(req, state, &mut serve_dir).await
97 }
98 })
99 .layer(CompressionLayer::new());
100
101 let addr = format!("0.0.0.0:{}", port);
102 let listener = tokio::net::TcpListener::bind(&addr)
103 .await
104 .into_diagnostic()?;
105
106 println!("\n✓ Server running at http://localhost:{}", port);
107 println!(" Watching for updates on the firehose...\n");
108
109 axum::serve(listener, app).await.into_diagnostic()?;
110
111 Ok(())
112}
113
114/// Handle a request with redirect support
115async fn handle_request_with_redirects(
116 req: Request,
117 state: ServerState,
118 serve_dir: &mut ServeDir,
119) -> Response {
120 let uri = req.uri().clone();
121 let path = uri.path();
122 let method = req.method().clone();
123
124 // Parse query parameters
125 let query_params = uri.query().map(|q| {
126 let mut params = HashMap::new();
127 for pair in q.split('&') {
128 if let Some((key, value)) = pair.split_once('=') {
129 params.insert(key.to_string(), value.to_string());
130 }
131 }
132 params
133 });
134
135 // Check for redirect rules
136 let redirect_rules = state.redirect_rules.read().await;
137 if let Some(redirect_match) = match_redirect_rule(path, &redirect_rules, query_params.as_ref()) {
138 let is_force = redirect_match.force;
139 drop(redirect_rules); // Release the lock
140
141 // If not forced, check if the file exists first
142 if !is_force {
143 // Try to serve the file normally first
144 let test_req = Request::builder()
145 .uri(uri.clone())
146 .method(&method)
147 .body(axum::body::Body::empty())
148 .unwrap();
149
150 match serve_dir.call(test_req).await {
151 Ok(response) if response.status().is_success() => {
152 // File exists and was served successfully, return it
153 return response.into_response();
154 }
155 _ => {
156 // File doesn't exist or error, apply redirect
157 }
158 }
159 }
160
161 // Handle different status codes
162 match redirect_match.status {
163 200 => {
164 // Rewrite: serve the target file but keep the URL the same
165 if let Ok(target_uri) = redirect_match.target_path.parse::<Uri>() {
166 let new_req = Request::builder()
167 .uri(target_uri)
168 .method(&method)
169 .body(axum::body::Body::empty())
170 .unwrap();
171
172 match serve_dir.call(new_req).await {
173 Ok(response) => response.into_response(),
174 Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
175 }
176 } else {
177 StatusCode::INTERNAL_SERVER_ERROR.into_response()
178 }
179 }
180 301 => {
181 // Permanent redirect
182 Redirect::permanent(&redirect_match.target_path).into_response()
183 }
184 302 => {
185 // Temporary redirect
186 Redirect::temporary(&redirect_match.target_path).into_response()
187 }
188 404 => {
189 // Custom 404 page
190 if let Ok(target_uri) = redirect_match.target_path.parse::<Uri>() {
191 let new_req = Request::builder()
192 .uri(target_uri)
193 .method(&method)
194 .body(axum::body::Body::empty())
195 .unwrap();
196
197 match serve_dir.call(new_req).await {
198 Ok(mut response) => {
199 *response.status_mut() = StatusCode::NOT_FOUND;
200 response.into_response()
201 }
202 Err(_) => StatusCode::NOT_FOUND.into_response(),
203 }
204 } else {
205 StatusCode::NOT_FOUND.into_response()
206 }
207 }
208 _ => {
209 // Unsupported status code, fall through to normal serving
210 match serve_dir.call(req).await {
211 Ok(response) => response.into_response(),
212 Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
213 }
214 }
215 }
216 } else {
217 drop(redirect_rules);
218 // No redirect match, serve normally
219 match serve_dir.call(req).await {
220 Ok(response) => response.into_response(),
221 Err(_) => StatusCode::NOT_FOUND.into_response(),
222 }
223 }
224}
225
226/// Watch the firehose for updates to the specific site
227fn watch_firehose(state: ServerState) -> std::pin::Pin<Box<dyn std::future::Future<Output = miette::Result<()>> + Send>> {
228 Box::pin(async move {
229 use jacquard_identity::PublicResolver;
230 use jacquard::prelude::IdentityResolver;
231
232 // Resolve DID to PDS URL
233 let resolver = PublicResolver::default();
234 let did = Did::new(&state.did).into_diagnostic()?;
235 let pds_url = resolver.pds_for_did(&did).await.into_diagnostic()?;
236
237 println!("[PDS] Resolved DID to PDS: {}", pds_url);
238
239 // Convert HTTP(S) URL to WebSocket URL
240 let mut ws_url = pds_url.clone();
241 let scheme = if pds_url.scheme() == "https" { "wss" } else { "ws" };
242 ws_url.set_scheme(scheme)
243 .map_err(|_| miette::miette!("Failed to set WebSocket scheme"))?;
244
245 println!("[PDS] Connecting to {}...", ws_url);
246
247 // Create subscription client
248 let client = TungsteniteSubscriptionClient::from_base_uri(ws_url);
249
250 // Subscribe to the PDS firehose
251 let params = SubscribeRepos::new().build();
252
253 let stream = client.subscribe(¶ms).await.into_diagnostic()?;
254 println!("[PDS] Connected! Watching for updates...");
255
256 // Convert to typed message stream
257 let (_sink, mut messages) = stream.into_stream();
258
259 loop {
260 match messages.next().await {
261 Some(Ok(msg)) => {
262 if let Err(e) = handle_firehose_message(&state, msg).await {
263 eprintln!("[PDS] Error handling message: {}", e);
264 }
265 }
266 Some(Err(e)) => {
267 eprintln!("[PDS] Stream error: {}", e);
268 // Try to reconnect after a delay
269 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
270 return Box::pin(watch_firehose(state)).await;
271 }
272 None => {
273 println!("[PDS] Stream ended, reconnecting...");
274 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
275 return Box::pin(watch_firehose(state)).await;
276 }
277 }
278 }
279 })
280}
281
282/// Handle a firehose message
283async fn handle_firehose_message<'a>(
284 state: &ServerState,
285 msg: SubscribeReposMessage<'a>,
286) -> miette::Result<()> {
287 match msg {
288 SubscribeReposMessage::Commit(commit_msg) => {
289 // Check if this commit is from our DID
290 if commit_msg.repo.as_str() != state.did.as_str() {
291 return Ok(());
292 }
293
294 // Check if any operation affects our site
295 let target_path = format!("place.wisp.fs/{}", state.rkey);
296 let has_site_update = commit_msg.ops.iter().any(|op| op.path.as_ref() == target_path);
297
298 if has_site_update {
299 // Debug: log all operations for this commit
300 println!("[Debug] Commit has {} ops for {}", commit_msg.ops.len(), state.rkey);
301 for op in &commit_msg.ops {
302 if op.path.as_ref() == target_path {
303 println!("[Debug] - {} {}", op.action.as_ref(), op.path.as_ref());
304 }
305 }
306 }
307
308 if has_site_update {
309 // Use the commit CID as the version tracker
310 let commit_cid = commit_msg.commit.to_string();
311
312 // Check if this is a new commit
313 let should_update = {
314 let last_cid = state.last_cid.read().await;
315 Some(commit_cid.clone()) != *last_cid
316 };
317
318 if should_update {
319 // Check operation types
320 let has_create_or_update = commit_msg.ops.iter().any(|op| {
321 op.path.as_ref() == target_path &&
322 (op.action.as_ref() == "create" || op.action.as_ref() == "update")
323 });
324 let has_delete = commit_msg.ops.iter().any(|op| {
325 op.path.as_ref() == target_path && op.action.as_ref() == "delete"
326 });
327
328 // If there's a create/update, pull the site (even if there's also a delete in the same commit)
329 if has_create_or_update {
330 println!("\n[Update] Detected change to site {} (commit: {})", state.rkey, commit_cid);
331 println!("[Update] Pulling latest version...");
332
333 // Pull the updated site
334 match pull_site(
335 state.did.clone(),
336 state.rkey.clone(),
337 state.output_dir.clone(),
338 )
339 .await
340 {
341 Ok(_) => {
342 // Update last CID
343 let mut last_cid = state.last_cid.write().await;
344 *last_cid = Some(commit_cid);
345
346 // Reload redirect rules
347 let new_redirect_rules = load_redirect_rules(&state.output_dir);
348 let mut redirect_rules = state.redirect_rules.write().await;
349 *redirect_rules = new_redirect_rules;
350
351 println!("[Update] ✓ Site updated successfully!\n");
352 }
353 Err(e) => {
354 eprintln!("[Update] Failed to pull site: {}", e);
355 }
356 }
357 } else if has_delete {
358 // Only a delete, no create/update
359 println!("\n[Update] Site {} was deleted", state.rkey);
360
361 // Update last CID so we don't process this commit again
362 let mut last_cid = state.last_cid.write().await;
363 *last_cid = Some(commit_cid);
364 }
365 }
366 }
367 }
368 _ => {
369 // Ignore identity and account messages
370 }
371 }
372
373 Ok(())
374}
375