Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place
at redirects 7.1 kB view raw
1use crate::pull::pull_site; 2use axum::Router; 3use jacquard::CowStr; 4use jacquard_common::jetstream::{CommitOperation, JetstreamMessage, JetstreamParams}; 5use jacquard_common::types::string::Did; 6use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient}; 7use miette::IntoDiagnostic; 8use n0_future::StreamExt; 9use std::path::PathBuf; 10use std::sync::Arc; 11use tokio::sync::RwLock; 12use tower_http::compression::CompressionLayer; 13use tower_http::services::ServeDir; 14use url::Url; 15 16/// Shared state for the server 17#[derive(Clone)] 18struct ServerState { 19 did: CowStr<'static>, 20 rkey: CowStr<'static>, 21 output_dir: PathBuf, 22 last_cid: Arc<RwLock<Option<String>>>, 23} 24 25/// Serve a site locally with real-time firehose updates 26pub async fn serve_site( 27 input: CowStr<'static>, 28 rkey: CowStr<'static>, 29 output_dir: PathBuf, 30 port: u16, 31) -> miette::Result<()> { 32 println!("Serving site {} from {} on port {}...", rkey, input, port); 33 34 // Resolve handle to DID if needed 35 use jacquard_identity::PublicResolver; 36 use jacquard::prelude::IdentityResolver; 37 38 let resolver = PublicResolver::default(); 39 let did = if input.starts_with("did:") { 40 Did::new(&input).into_diagnostic()? 41 } else { 42 // It's a handle, resolve it 43 let handle = jacquard_common::types::string::Handle::new(&input).into_diagnostic()?; 44 resolver.resolve_handle(&handle).await.into_diagnostic()? 45 }; 46 47 println!("Resolved to DID: {}", did.as_str()); 48 49 // Create output directory if it doesn't exist 50 std::fs::create_dir_all(&output_dir).into_diagnostic()?; 51 52 // Initial pull of the site 53 println!("Performing initial pull..."); 54 let did_str = CowStr::from(did.as_str().to_string()); 55 pull_site(did_str.clone(), rkey.clone(), output_dir.clone()).await?; 56 57 // Create shared state 58 let state = ServerState { 59 did: did_str.clone(), 60 rkey: rkey.clone(), 61 output_dir: output_dir.clone(), 62 last_cid: Arc::new(RwLock::new(None)), 63 }; 64 65 // Start firehose listener in background 66 let firehose_state = state.clone(); 67 tokio::spawn(async move { 68 if let Err(e) = watch_firehose(firehose_state).await { 69 eprintln!("Firehose error: {}", e); 70 } 71 }); 72 73 // Create HTTP server with gzip compression 74 let app = Router::new() 75 .fallback_service( 76 ServeDir::new(&output_dir) 77 .precompressed_gzip() 78 ) 79 .layer(CompressionLayer::new()) 80 .with_state(state); 81 82 let addr = format!("0.0.0.0:{}", port); 83 let listener = tokio::net::TcpListener::bind(&addr) 84 .await 85 .into_diagnostic()?; 86 87 println!("\n✓ Server running at http://localhost:{}", port); 88 println!(" Watching for updates on the firehose...\n"); 89 90 axum::serve(listener, app).await.into_diagnostic()?; 91 92 Ok(()) 93} 94 95/// Watch the firehose for updates to the specific site 96fn watch_firehose(state: ServerState) -> std::pin::Pin<Box<dyn std::future::Future<Output = miette::Result<()>> + Send>> { 97 Box::pin(async move { 98 let jetstream_url = Url::parse("wss://jetstream1.us-east.fire.hose.cam") 99 .into_diagnostic()?; 100 101 println!("[Firehose] Connecting to Jetstream..."); 102 103 // Create subscription client 104 let client = TungsteniteSubscriptionClient::from_base_uri(jetstream_url); 105 106 // Subscribe with no filters (we'll filter manually) 107 // Jetstream doesn't support filtering by collection in the params builder 108 let params = JetstreamParams::new().build(); 109 110 let stream = client.subscribe(&params).await.into_diagnostic()?; 111 println!("[Firehose] Connected! Watching for updates..."); 112 113 // Convert to typed message stream 114 let (_sink, mut messages) = stream.into_stream(); 115 116 loop { 117 match messages.next().await { 118 Some(Ok(msg)) => { 119 if let Err(e) = handle_firehose_message(&state, msg).await { 120 eprintln!("[Firehose] Error handling message: {}", e); 121 } 122 } 123 Some(Err(e)) => { 124 eprintln!("[Firehose] Stream error: {}", e); 125 // Try to reconnect after a delay 126 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 127 return Box::pin(watch_firehose(state)).await; 128 } 129 None => { 130 println!("[Firehose] Stream ended, reconnecting..."); 131 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 132 return Box::pin(watch_firehose(state)).await; 133 } 134 } 135 } 136 }) 137} 138 139/// Handle a firehose message 140async fn handle_firehose_message( 141 state: &ServerState, 142 msg: JetstreamMessage<'_>, 143) -> miette::Result<()> { 144 match msg { 145 JetstreamMessage::Commit { 146 did, 147 commit, 148 .. 149 } => { 150 // Check if this is our site 151 if did.as_str() == state.did.as_str() 152 && commit.collection.as_str() == "place.wisp.fs" 153 && commit.rkey.as_str() == state.rkey.as_str() 154 { 155 match commit.operation { 156 CommitOperation::Create | CommitOperation::Update => { 157 let new_cid = commit.cid.as_ref().map(|c| c.to_string()); 158 159 // Check if CID changed 160 let should_update = { 161 let last_cid = state.last_cid.read().await; 162 new_cid != *last_cid 163 }; 164 165 if should_update { 166 println!("\n[Update] Detected change to site {} (CID: {:?})", state.rkey, new_cid); 167 println!("[Update] Pulling latest version..."); 168 169 // Pull the updated site 170 match pull_site( 171 state.did.clone(), 172 state.rkey.clone(), 173 state.output_dir.clone(), 174 ) 175 .await 176 { 177 Ok(_) => { 178 // Update last CID 179 let mut last_cid = state.last_cid.write().await; 180 *last_cid = new_cid; 181 println!("[Update] ✓ Site updated successfully!\n"); 182 } 183 Err(e) => { 184 eprintln!("[Update] Failed to pull site: {}", e); 185 } 186 } 187 } 188 } 189 CommitOperation::Delete => { 190 println!("\n[Update] Site {} was deleted", state.rkey); 191 } 192 } 193 } 194 } 195 _ => { 196 // Ignore identity and account messages 197 } 198 } 199 200 Ok(()) 201} 202