relay filter/appview bootstrap
at main 2.5 kB view raw
1mod broadcast; 2 3pub use broadcast::*; 4 5use std::sync::Arc; 6 7use axum::{ 8 extract::{ 9 ws::{Message, WebSocket, WebSocketUpgrade}, 10 State, 11 }, 12 response::IntoResponse, 13 routing::get, 14 Router, 15}; 16use futures::{SinkExt, StreamExt}; 17 18use crate::AppState; 19 20pub fn router() -> Router<Arc<AppState>> { 21 Router::new().route("/ws", get(ws_handler)) 22} 23 24async fn ws_handler( 25 ws: WebSocketUpgrade, 26 State(state): State<Arc<AppState>>, 27) -> impl IntoResponse { 28 ws.on_upgrade(move |socket| handle_socket(socket, state)) 29} 30 31async fn handle_socket(socket: WebSocket, state: Arc<AppState>) { 32 let (mut sender, mut receiver) = socket.split(); 33 34 let mut rx = state.broadcaster.subscribe(); 35 36 let send_task = tokio::spawn(async move { 37 loop { 38 match rx.recv().await { 39 Ok(msg) => { 40 if sender.send(Message::Text(msg.into())).await.is_err() { 41 break; 42 } 43 } 44 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { 45 tracing::warn!(skipped = n, "WebSocket client lagged, skipped messages"); 46 // Continue receiving - client will miss some messages but stay connected 47 } 48 Err(tokio::sync::broadcast::error::RecvError::Closed) => { 49 tracing::debug!("Broadcast channel closed"); 50 break; 51 } 52 } 53 } 54 }); 55 let send_abort = send_task.abort_handle(); 56 57 let recv_task = tokio::spawn(async move { 58 while let Some(msg) = receiver.next().await { 59 match msg { 60 Ok(Message::Ping(data)) => { 61 tracing::trace!("Received ping, sending pong"); 62 // Note: axum's WebSocket auto-responds to pings, but we log it 63 let _ = data; 64 } 65 Ok(Message::Close(_)) => { 66 tracing::debug!("Client closed connection"); 67 break; 68 } 69 Err(e) => { 70 tracing::warn!(error = %e, "WebSocket error"); 71 break; 72 } 73 _ => {} 74 } 75 } 76 }); 77 let recv_abort = recv_task.abort_handle(); 78 79 tokio::select! { 80 _ = send_task => { 81 recv_abort.abort(); 82 }, 83 _ = recv_task => { 84 send_abort.abort(); 85 }, 86 } 87 88 tracing::debug!("WebSocket connection closed"); 89}