relay filter/appview bootstrap
1mod broadcast;
2
3pub use broadcast::*;
4
5use std::sync::Arc;
6
7use axum::{
8 extract::{
9 ws::{Message, WebSocket, WebSocketUpgrade},
10 State,
11 },
12 response::IntoResponse,
13 routing::get,
14 Router,
15};
16use futures::{SinkExt, StreamExt};
17
18use crate::AppState;
19
20pub fn router() -> Router<Arc<AppState>> {
21 Router::new().route("/ws", get(ws_handler))
22}
23
24async fn ws_handler(
25 ws: WebSocketUpgrade,
26 State(state): State<Arc<AppState>>,
27) -> impl IntoResponse {
28 ws.on_upgrade(move |socket| handle_socket(socket, state))
29}
30
31async fn handle_socket(socket: WebSocket, state: Arc<AppState>) {
32 let (mut sender, mut receiver) = socket.split();
33
34 let mut rx = state.broadcaster.subscribe();
35
36 let send_task = tokio::spawn(async move {
37 loop {
38 match rx.recv().await {
39 Ok(msg) => {
40 if sender.send(Message::Text(msg.into())).await.is_err() {
41 break;
42 }
43 }
44 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
45 tracing::warn!(skipped = n, "WebSocket client lagged, skipped messages");
46 // Continue receiving - client will miss some messages but stay connected
47 }
48 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
49 tracing::debug!("Broadcast channel closed");
50 break;
51 }
52 }
53 }
54 });
55 let send_abort = send_task.abort_handle();
56
57 let recv_task = tokio::spawn(async move {
58 while let Some(msg) = receiver.next().await {
59 match msg {
60 Ok(Message::Ping(data)) => {
61 tracing::trace!("Received ping, sending pong");
62 // Note: axum's WebSocket auto-responds to pings, but we log it
63 let _ = data;
64 }
65 Ok(Message::Close(_)) => {
66 tracing::debug!("Client closed connection");
67 break;
68 }
69 Err(e) => {
70 tracing::warn!(error = %e, "WebSocket error");
71 break;
72 }
73 _ => {}
74 }
75 }
76 });
77 let recv_abort = recv_task.abort_handle();
78
79 tokio::select! {
80 _ = send_task => {
81 recv_abort.abort();
82 },
83 _ = recv_task => {
84 send_abort.abort();
85 },
86 }
87
88 tracing::debug!("WebSocket connection closed");
89}