1use bincode::{Decode, Encode};
2
3#[cfg(any(feature = "client", feature = "server"))]
4const BINCODE_CFG: bincode::config::Configuration = bincode::config::standard();
5
6#[derive(Debug, Clone, Copy, Encode, Decode)]
7pub enum WsMessage {
8 Laser(LaserMessage),
9 Mouse(MouseMessage),
10}
11
12#[derive(Debug, Clone, Copy, Encode, Decode)]
13pub struct LaserMessage {
14 pub x: u32,
15 pub y: u32,
16 pub id: u64,
17 pub line_id: u8,
18}
19
20#[derive(Debug, Clone, Copy, Encode, Decode)]
21pub struct MouseMessage {
22 pub x: u32,
23 pub y: u32,
24}
25
26#[cfg(feature = "client")]
27pub mod client {
28 use futures_util::{SinkExt, StreamExt};
29 use tokio::sync::mpsc;
30 use tokio_tungstenite_wasm::Message;
31
32 use crate::{
33 AppResult, WindowHandle,
34 ws::{BINCODE_CFG, WsMessage},
35 };
36
37 pub async fn connect(
38 server_url: &str,
39 window: WindowHandle,
40 mut overlay_rx: mpsc::Receiver<WsMessage>,
41 overlay_tx: mpsc::Sender<WsMessage>,
42 id: u64,
43 ) -> AppResult<()> {
44 let (mut tx, mut rx) = tokio_tungstenite_wasm::connect(server_url).await?.split();
45 let send_task = async move {
46 while let Some(msg) = overlay_rx.recv().await {
47 let Ok(encoded) = bincode::encode_to_vec(&msg, BINCODE_CFG) else {
48 continue;
49 };
50 let _ = tx.send(Message::Binary(encoded.into())).await;
51 }
52 };
53 let receive_task = async move {
54 while let Some(ev) = rx.next().await {
55 match ev {
56 Ok(Message::Binary(payload)) => {
57 let Ok((decoded, _)): Result<(WsMessage, _), _> =
58 bincode::decode_from_slice(&payload, BINCODE_CFG)
59 else {
60 continue;
61 };
62 let _ = overlay_tx.send(decoded).await;
63 if let Some(window) = window.get() {
64 window.request_redraw();
65 }
66 }
67 Ok(Message::Close(_)) => {
68 eprintln!("server closed connection");
69 if let Some(window) = window.get() {
70 window.set_title("offline");
71 }
72 break;
73 }
74 Err(err) => {
75 eprintln!("error receiving message: {}", err);
76 if let Some(window) = window.get() {
77 window.set_title(&format!("error: {}", err));
78 }
79 }
80 _ => {}
81 }
82 }
83 };
84 let _ = futures_util::future::join(send_task, receive_task).await;
85 Ok(())
86 }
87}
88
89#[cfg(feature = "server")]
90pub mod server {
91 use std::net::SocketAddr;
92
93 use futures_util::{SinkExt, StreamExt};
94 use tokio::{
95 net::{TcpListener, TcpStream},
96 sync::{broadcast, mpsc},
97 };
98 use tokio_websockets::Message;
99
100 use crate::{
101 AppResult, WindowHandle,
102 ws::{BINCODE_CFG, WsMessage},
103 };
104
105 pub async fn listen(
106 port: u16,
107 window: WindowHandle,
108 overlay_tx: mpsc::Sender<WsMessage>,
109 ) -> AppResult<(impl Future, broadcast::Sender<(u64, WsMessage)>)> {
110 let addr = SocketAddr::from(([0, 0, 0, 0], port));
111 let listener = TcpListener::bind(&addr).await?;
112 println!("listening on {}", addr);
113
114 let (tx, mut rx) = broadcast::channel(1024);
115
116 let server_task = tokio::spawn({
117 let tx = tx.clone();
118 async move {
119 loop {
120 let conn = match listener.accept().await {
121 Ok((conn, addr)) => {
122 println!("accepted connection from {}", addr);
123 conn
124 }
125 Err(err) => {
126 eprintln!("error accepting connection: {}", err);
127 continue;
128 }
129 };
130 tokio::spawn(handle_server_conn(conn, tx.clone(), tx.subscribe()));
131 }
132 }
133 });
134 let overlay_task = tokio::spawn(async move {
135 while let Ok((_, msg)) = rx.recv().await {
136 let _ = overlay_tx.send(msg).await;
137 if let Some(window) = window.get() {
138 window.request_redraw();
139 }
140 }
141 });
142
143 Ok((futures_util::future::join(server_task, overlay_task), tx))
144 }
145
146 async fn handle_server_conn(
147 conn: TcpStream,
148 msg_tx: broadcast::Sender<(u64, WsMessage)>,
149 mut msg_rx: broadcast::Receiver<(u64, WsMessage)>,
150 ) -> impl Future {
151 let (_, server) = tokio_websockets::ServerBuilder::new()
152 .accept(conn)
153 .await
154 .unwrap();
155 let (mut tx, mut rx) = server.split();
156
157 let id = fastrand::u64(..);
158
159 let send_task = tokio::spawn(async move {
160 while let Some(msg) = rx.next().await {
161 match msg {
162 Ok(msg) => {
163 let payload = msg.into_payload();
164 let Ok((decoded, _)) = bincode::decode_from_slice(&payload, BINCODE_CFG)
165 else {
166 continue;
167 };
168 let _ = msg_tx.send((id, decoded));
169 }
170 Err(err) => {
171 eprintln!("error receiving message: {}", err);
172 }
173 }
174 }
175 });
176 let receive_task = tokio::spawn(async move {
177 while let Ok((msg_id, msg)) = msg_rx.recv().await {
178 // skip if message is from self
179 if msg_id == id {
180 continue;
181 }
182 let Ok(encoded) = bincode::encode_to_vec(&msg, BINCODE_CFG) else {
183 continue;
184 };
185 let _ = tx.send(Message::binary(encoded)).await;
186 }
187 });
188
189 futures_util::future::join(send_task, receive_task)
190 }
191}