Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol.
wisp.place
1use crate::pull::pull_site;
2use axum::Router;
3use jacquard::CowStr;
4use jacquard_common::jetstream::{CommitOperation, JetstreamMessage, JetstreamParams};
5use jacquard_common::types::string::Did;
6use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient};
7use miette::IntoDiagnostic;
8use n0_future::StreamExt;
9use std::path::PathBuf;
10use std::sync::Arc;
11use tokio::sync::RwLock;
12use tower_http::compression::CompressionLayer;
13use tower_http::services::ServeDir;
14use url::Url;
15
16/// Shared state for the server
17#[derive(Clone)]
18struct ServerState {
19 did: CowStr<'static>,
20 rkey: CowStr<'static>,
21 output_dir: PathBuf,
22 last_cid: Arc<RwLock<Option<String>>>,
23}
24
25/// Serve a site locally with real-time firehose updates
26pub async fn serve_site(
27 input: CowStr<'static>,
28 rkey: CowStr<'static>,
29 output_dir: PathBuf,
30 port: u16,
31) -> miette::Result<()> {
32 println!("Serving site {} from {} on port {}...", rkey, input, port);
33
34 // Resolve handle to DID if needed
35 use jacquard_identity::PublicResolver;
36 use jacquard::prelude::IdentityResolver;
37
38 let resolver = PublicResolver::default();
39 let did = if input.starts_with("did:") {
40 Did::new(&input).into_diagnostic()?
41 } else {
42 // It's a handle, resolve it
43 let handle = jacquard_common::types::string::Handle::new(&input).into_diagnostic()?;
44 resolver.resolve_handle(&handle).await.into_diagnostic()?
45 };
46
47 println!("Resolved to DID: {}", did.as_str());
48
49 // Create output directory if it doesn't exist
50 std::fs::create_dir_all(&output_dir).into_diagnostic()?;
51
52 // Initial pull of the site
53 println!("Performing initial pull...");
54 let did_str = CowStr::from(did.as_str().to_string());
55 pull_site(did_str.clone(), rkey.clone(), output_dir.clone()).await?;
56
57 // Create shared state
58 let state = ServerState {
59 did: did_str.clone(),
60 rkey: rkey.clone(),
61 output_dir: output_dir.clone(),
62 last_cid: Arc::new(RwLock::new(None)),
63 };
64
65 // Start firehose listener in background
66 let firehose_state = state.clone();
67 tokio::spawn(async move {
68 if let Err(e) = watch_firehose(firehose_state).await {
69 eprintln!("Firehose error: {}", e);
70 }
71 });
72
73 // Create HTTP server with gzip compression
74 let app = Router::new()
75 .fallback_service(
76 ServeDir::new(&output_dir)
77 .precompressed_gzip()
78 )
79 .layer(CompressionLayer::new())
80 .with_state(state);
81
82 let addr = format!("0.0.0.0:{}", port);
83 let listener = tokio::net::TcpListener::bind(&addr)
84 .await
85 .into_diagnostic()?;
86
87 println!("\n✓ Server running at http://localhost:{}", port);
88 println!(" Watching for updates on the firehose...\n");
89
90 axum::serve(listener, app).await.into_diagnostic()?;
91
92 Ok(())
93}
94
95/// Watch the firehose for updates to the specific site
96fn watch_firehose(state: ServerState) -> std::pin::Pin<Box<dyn std::future::Future<Output = miette::Result<()>> + Send>> {
97 Box::pin(async move {
98 let jetstream_url = Url::parse("wss://jetstream1.us-east.fire.hose.cam")
99 .into_diagnostic()?;
100
101 println!("[Firehose] Connecting to Jetstream...");
102
103 // Create subscription client
104 let client = TungsteniteSubscriptionClient::from_base_uri(jetstream_url);
105
106 // Subscribe with no filters (we'll filter manually)
107 // Jetstream doesn't support filtering by collection in the params builder
108 let params = JetstreamParams::new().build();
109
110 let stream = client.subscribe(¶ms).await.into_diagnostic()?;
111 println!("[Firehose] Connected! Watching for updates...");
112
113 // Convert to typed message stream
114 let (_sink, mut messages) = stream.into_stream();
115
116 loop {
117 match messages.next().await {
118 Some(Ok(msg)) => {
119 if let Err(e) = handle_firehose_message(&state, msg).await {
120 eprintln!("[Firehose] Error handling message: {}", e);
121 }
122 }
123 Some(Err(e)) => {
124 eprintln!("[Firehose] Stream error: {}", e);
125 // Try to reconnect after a delay
126 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
127 return Box::pin(watch_firehose(state)).await;
128 }
129 None => {
130 println!("[Firehose] Stream ended, reconnecting...");
131 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
132 return Box::pin(watch_firehose(state)).await;
133 }
134 }
135 }
136 })
137}
138
139/// Handle a firehose message
140async fn handle_firehose_message(
141 state: &ServerState,
142 msg: JetstreamMessage<'_>,
143) -> miette::Result<()> {
144 match msg {
145 JetstreamMessage::Commit {
146 did,
147 commit,
148 ..
149 } => {
150 // Check if this is our site
151 if did.as_str() == state.did.as_str()
152 && commit.collection.as_str() == "place.wisp.fs"
153 && commit.rkey.as_str() == state.rkey.as_str()
154 {
155 match commit.operation {
156 CommitOperation::Create | CommitOperation::Update => {
157 let new_cid = commit.cid.as_ref().map(|c| c.to_string());
158
159 // Check if CID changed
160 let should_update = {
161 let last_cid = state.last_cid.read().await;
162 new_cid != *last_cid
163 };
164
165 if should_update {
166 println!("\n[Update] Detected change to site {} (CID: {:?})", state.rkey, new_cid);
167 println!("[Update] Pulling latest version...");
168
169 // Pull the updated site
170 match pull_site(
171 state.did.clone(),
172 state.rkey.clone(),
173 state.output_dir.clone(),
174 )
175 .await
176 {
177 Ok(_) => {
178 // Update last CID
179 let mut last_cid = state.last_cid.write().await;
180 *last_cid = new_cid;
181 println!("[Update] ✓ Site updated successfully!\n");
182 }
183 Err(e) => {
184 eprintln!("[Update] Failed to pull site: {}", e);
185 }
186 }
187 }
188 }
189 CommitOperation::Delete => {
190 println!("\n[Update] Site {} was deleted", state.rkey);
191 }
192 }
193 }
194 }
195 _ => {
196 // Ignore identity and account messages
197 }
198 }
199
200 Ok(())
201}
202