···
use crate::pull::pull_site;
2
+
use crate::redirects::{load_redirect_rules, match_redirect_rule, RedirectRule};
6
+
response::{Response, IntoResponse, Redirect},
7
+
http::{StatusCode, Uri},
4
-
use jacquard_common::jetstream::{CommitOperation, JetstreamMessage, JetstreamParams};
10
+
use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage};
use jacquard_common::types::string::Did;
use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient};
use miette::IntoDiagnostic;
use n0_future::StreamExt;
15
+
use std::collections::HashMap;
use tower_http::compression::CompressionLayer;
use tower_http::services::ServeDir;
/// Shared state for the server
···
last_cid: Arc<RwLock<Option<String>>>,
30
+
redirect_rules: Arc<RwLock<Vec<RedirectRule>>>,
/// Serve a site locally with real-time firehose updates
···
let did_str = CowStr::from(did.as_str().to_string());
pull_site(did_str.clone(), rkey.clone(), output_dir.clone()).await?;
65
+
// Load redirect rules
66
+
let redirect_rules = load_redirect_rules(&output_dir);
67
+
if !redirect_rules.is_empty() {
68
+
println!("Loaded {} redirect rules from _redirects", redirect_rules.len());
let state = ServerState {
output_dir: output_dir.clone(),
last_cid: Arc::new(RwLock::new(None)),
77
+
redirect_rules: Arc::new(RwLock::new(redirect_rules)),
// Start firehose listener in background
···
73
-
// Create HTTP server with gzip compression
88
+
// Create HTTP server with gzip compression and redirect handling
89
+
let serve_dir = ServeDir::new(&output_dir).precompressed_gzip();
76
-
ServeDir::new(&output_dir)
77
-
.precompressed_gzip()
79
-
.layer(CompressionLayer::new())
92
+
.fallback(move |req: Request| {
93
+
let state = state.clone();
94
+
let mut serve_dir = serve_dir.clone();
96
+
handle_request_with_redirects(req, state, &mut serve_dir).await
99
+
.layer(CompressionLayer::new());
let addr = format!("0.0.0.0:{}", port);
let listener = tokio::net::TcpListener::bind(&addr)
···
axum::serve(listener, app).await.into_diagnostic()?;
114
+
/// Handle a request with redirect support
115
+
async fn handle_request_with_redirects(
117
+
state: ServerState,
118
+
serve_dir: &mut ServeDir,
120
+
let uri = req.uri().clone();
121
+
let path = uri.path();
122
+
let method = req.method().clone();
124
+
// Parse query parameters
125
+
let query_params = uri.query().map(|q| {
126
+
let mut params = HashMap::new();
127
+
for pair in q.split('&') {
128
+
if let Some((key, value)) = pair.split_once('=') {
129
+
params.insert(key.to_string(), value.to_string());
135
+
// Check for redirect rules
136
+
let redirect_rules = state.redirect_rules.read().await;
137
+
if let Some(redirect_match) = match_redirect_rule(path, &redirect_rules, query_params.as_ref()) {
138
+
let is_force = redirect_match.force;
139
+
drop(redirect_rules); // Release the lock
141
+
// If not forced, check if the file exists first
143
+
// Try to serve the file normally first
144
+
let test_req = Request::builder()
147
+
.body(axum::body::Body::empty())
150
+
match serve_dir.call(test_req).await {
151
+
Ok(response) if response.status().is_success() => {
152
+
// File exists and was served successfully, return it
153
+
return response.into_response();
156
+
// File doesn't exist or error, apply redirect
161
+
// Handle different status codes
162
+
match redirect_match.status {
164
+
// Rewrite: serve the target file but keep the URL the same
165
+
if let Ok(target_uri) = redirect_match.target_path.parse::<Uri>() {
166
+
let new_req = Request::builder()
169
+
.body(axum::body::Body::empty())
172
+
match serve_dir.call(new_req).await {
173
+
Ok(response) => response.into_response(),
174
+
Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
177
+
StatusCode::INTERNAL_SERVER_ERROR.into_response()
181
+
// Permanent redirect
182
+
Redirect::permanent(&redirect_match.target_path).into_response()
185
+
// Temporary redirect
186
+
Redirect::temporary(&redirect_match.target_path).into_response()
190
+
if let Ok(target_uri) = redirect_match.target_path.parse::<Uri>() {
191
+
let new_req = Request::builder()
194
+
.body(axum::body::Body::empty())
197
+
match serve_dir.call(new_req).await {
198
+
Ok(mut response) => {
199
+
*response.status_mut() = StatusCode::NOT_FOUND;
200
+
response.into_response()
202
+
Err(_) => StatusCode::NOT_FOUND.into_response(),
205
+
StatusCode::NOT_FOUND.into_response()
209
+
// Unsupported status code, fall through to normal serving
210
+
match serve_dir.call(req).await {
211
+
Ok(response) => response.into_response(),
212
+
Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
217
+
drop(redirect_rules);
218
+
// No redirect match, serve normally
219
+
match serve_dir.call(req).await {
220
+
Ok(response) => response.into_response(),
221
+
Err(_) => StatusCode::NOT_FOUND.into_response(),
/// Watch the firehose for updates to the specific site
fn watch_firehose(state: ServerState) -> std::pin::Pin<Box<dyn std::future::Future<Output = miette::Result<()>> + Send>> {
98
-
let jetstream_url = Url::parse("wss://jetstream1.us-east.fire.hose.cam")
99
-
.into_diagnostic()?;
229
+
use jacquard_identity::PublicResolver;
230
+
use jacquard::prelude::IdentityResolver;
232
+
// Resolve DID to PDS URL
233
+
let resolver = PublicResolver::default();
234
+
let did = Did::new(&state.did).into_diagnostic()?;
235
+
let pds_url = resolver.pds_for_did(&did).await.into_diagnostic()?;
237
+
println!("[PDS] Resolved DID to PDS: {}", pds_url);
239
+
// Convert HTTP(S) URL to WebSocket URL
240
+
let mut ws_url = pds_url.clone();
241
+
let scheme = if pds_url.scheme() == "https" { "wss" } else { "ws" };
242
+
ws_url.set_scheme(scheme)
243
+
.map_err(|_| miette::miette!("Failed to set WebSocket scheme"))?;
101
-
println!("[Firehose] Connecting to Jetstream...");
245
+
println!("[PDS] Connecting to {}...", ws_url);
// Create subscription client
104
-
let client = TungsteniteSubscriptionClient::from_base_uri(jetstream_url);
248
+
let client = TungsteniteSubscriptionClient::from_base_uri(ws_url);
106
-
// Subscribe with no filters (we'll filter manually)
107
-
// Jetstream doesn't support filtering by collection in the params builder
108
-
let params = JetstreamParams::new().build();
250
+
// Subscribe to the PDS firehose
251
+
let params = SubscribeRepos::new().build();
let stream = client.subscribe(¶ms).await.into_diagnostic()?;
111
-
println!("[Firehose] Connected! Watching for updates...");
254
+
println!("[PDS] Connected! Watching for updates...");
// Convert to typed message stream
let (_sink, mut messages) = stream.into_stream();
···
match messages.next().await {
if let Err(e) = handle_firehose_message(&state, msg).await {
120
-
eprintln!("[Firehose] Error handling message: {}", e);
263
+
eprintln!("[PDS] Error handling message: {}", e);
124
-
eprintln!("[Firehose] Stream error: {}", e);
267
+
eprintln!("[PDS] Stream error: {}", e);
// Try to reconnect after a delay
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
return Box::pin(watch_firehose(state)).await;
130
-
println!("[Firehose] Stream ended, reconnecting...");
273
+
println!("[PDS] Stream ended, reconnecting...");
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
return Box::pin(watch_firehose(state)).await;
···
/// Handle a firehose message
140
-
async fn handle_firehose_message(
283
+
async fn handle_firehose_message<'a>(
142
-
msg: JetstreamMessage<'_>,
285
+
msg: SubscribeReposMessage<'a>,
) -> miette::Result<()> {
145
-
JetstreamMessage::Commit {
150
-
// Check if this is our site
151
-
if did.as_str() == state.did.as_str()
152
-
&& commit.collection.as_str() == "place.wisp.fs"
153
-
&& commit.rkey.as_str() == state.rkey.as_str()
155
-
match commit.operation {
156
-
CommitOperation::Create | CommitOperation::Update => {
157
-
let new_cid = commit.cid.as_ref().map(|c| c.to_string());
159
-
// Check if CID changed
160
-
let should_update = {
161
-
let last_cid = state.last_cid.read().await;
162
-
new_cid != *last_cid
288
+
SubscribeReposMessage::Commit(commit_msg) => {
289
+
// Check if this commit is from our DID
290
+
if commit_msg.repo.as_str() != state.did.as_str() {
166
-
println!("\n[Update] Detected change to site {} (CID: {:?})", state.rkey, new_cid);
167
-
println!("[Update] Pulling latest version...");
294
+
// Check if any operation affects our site
295
+
let target_path = format!("place.wisp.fs/{}", state.rkey);
296
+
let has_site_update = commit_msg.ops.iter().any(|op| op.path.as_ref() == target_path);
169
-
// Pull the updated site
172
-
state.rkey.clone(),
173
-
state.output_dir.clone(),
179
-
let mut last_cid = state.last_cid.write().await;
180
-
*last_cid = new_cid;
181
-
println!("[Update] ✓ Site updated successfully!\n");
184
-
eprintln!("[Update] Failed to pull site: {}", e);
298
+
if has_site_update {
299
+
// Debug: log all operations for this commit
300
+
println!("[Debug] Commit has {} ops for {}", commit_msg.ops.len(), state.rkey);
301
+
for op in &commit_msg.ops {
302
+
if op.path.as_ref() == target_path {
303
+
println!("[Debug] - {} {}", op.action.as_ref(), op.path.as_ref());
308
+
if has_site_update {
309
+
// Use the commit CID as the version tracker
310
+
let commit_cid = commit_msg.commit.to_string();
312
+
// Check if this is a new commit
313
+
let should_update = {
314
+
let last_cid = state.last_cid.read().await;
315
+
Some(commit_cid.clone()) != *last_cid
319
+
// Check operation types
320
+
let has_create_or_update = commit_msg.ops.iter().any(|op| {
321
+
op.path.as_ref() == target_path &&
322
+
(op.action.as_ref() == "create" || op.action.as_ref() == "update")
324
+
let has_delete = commit_msg.ops.iter().any(|op| {
325
+
op.path.as_ref() == target_path && op.action.as_ref() == "delete"
328
+
// If there's a create/update, pull the site (even if there's also a delete in the same commit)
329
+
if has_create_or_update {
330
+
println!("\n[Update] Detected change to site {} (commit: {})", state.rkey, commit_cid);
331
+
println!("[Update] Pulling latest version...");
333
+
// Pull the updated site
336
+
state.rkey.clone(),
337
+
state.output_dir.clone(),
343
+
let mut last_cid = state.last_cid.write().await;
344
+
*last_cid = Some(commit_cid);
346
+
// Reload redirect rules
347
+
let new_redirect_rules = load_redirect_rules(&state.output_dir);
348
+
let mut redirect_rules = state.redirect_rules.write().await;
349
+
*redirect_rules = new_redirect_rules;
351
+
println!("[Update] ✓ Site updated successfully!\n");
354
+
eprintln!("[Update] Failed to pull site: {}", e);
189
-
CommitOperation::Delete => {
357
+
} else if has_delete {
358
+
// Only a delete, no create/update
println!("\n[Update] Site {} was deleted", state.rkey);
361
+
// Update last CID so we don't process this commit again
362
+
let mut last_cid = state.last_cid.write().await;
363
+
*last_cid = Some(commit_cid);