My agentic slop goes here. Not intended for anyone else!
1(** Test conpool with 16 localhost servers on different 127.0.* addresses *)
2
3open Eio.Std
4
5(** Create a simple echo server on a specific address and port *)
6let create_server ~sw ~net ipaddr port connections_ref =
7 let socket = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:10
8 (`Tcp (ipaddr, port))
9 in
10
11 Eio.Fiber.fork ~sw (fun () ->
12 try
13 while true do
14 Eio.Net.accept_fork socket ~sw ~on_error:(fun ex ->
15 traceln "Server %a error: %s" Eio.Net.Sockaddr.pp (`Tcp (ipaddr, port))
16 (Printexc.to_string ex)
17 ) (fun flow _addr ->
18 (* Track this connection *)
19 Atomic.incr connections_ref;
20
21 (* Simple protocol: read lines and echo them back, until EOF *)
22 try
23 let buf = Eio.Buf_read.of_flow flow ~max_size:1024 in
24 while true do
25 let line = Eio.Buf_read.line buf in
26 traceln "Server on %a:%d received: %s"
27 Eio.Net.Ipaddr.pp ipaddr port line;
28
29 Eio.Flow.copy_string (line ^ "\n") flow
30 done
31 with
32 | End_of_file ->
33 traceln "Server on %a:%d client disconnected"
34 Eio.Net.Ipaddr.pp ipaddr port;
35 Eio.Flow.close flow;
36 Atomic.decr connections_ref
37 | ex ->
38 traceln "Server on %a:%d error handling connection: %s"
39 Eio.Net.Ipaddr.pp ipaddr port
40 (Printexc.to_string ex);
41 Eio.Flow.close flow;
42 Atomic.decr connections_ref
43 )
44 done
45 with Eio.Cancel.Cancelled _ -> ()
46 )
47
48(** Generate 16 different servers on 127.0.0.1 with different ports *)
49let generate_localhost_addresses () =
50 List.init 16 (fun i ->
51 (* Use 127.0.0.1 for all, just different ports *)
52 let addr_str = "127.0.0.1" in
53 (* Create raw IPv4 address as 4 bytes *)
54 let raw_bytes = Bytes.create 4 in
55 Bytes.set raw_bytes 0 (Char.chr 127);
56 Bytes.set raw_bytes 1 (Char.chr 0);
57 Bytes.set raw_bytes 2 (Char.chr 0);
58 Bytes.set raw_bytes 3 (Char.chr 1);
59 let addr = Eio.Net.Ipaddr.of_raw (Bytes.to_string raw_bytes) in
60 (addr_str, addr, 10000 + i)
61 )
62
63let () =
64 (* Setup logging *)
65 Logs.set_reporter (Logs_fmt.reporter ());
66 Logs.set_level (Some Logs.Info);
67 Logs.Src.set_level Conpool.src (Some Logs.Debug);
68
69 Eio_main.run @@ fun env ->
70 Switch.run @@ fun sw ->
71
72 traceln "=== Starting 16 localhost servers ===";
73
74 (* Generate addresses *)
75 let servers = generate_localhost_addresses () in
76
77 (* Create connection counters for each server *)
78 let connection_refs = List.map (fun _ -> Atomic.make 0) servers in
79
80 (* Start all servers *)
81 List.iter2 (fun (_addr_str, addr, port) conn_ref ->
82 traceln "Starting server on %a:%d"
83 Eio.Net.Ipaddr.pp addr port;
84 create_server ~sw ~net:env#net addr port conn_ref
85 ) servers connection_refs;
86
87 (* Give servers time to start *)
88 Eio.Time.sleep env#clock 0.5;
89
90 traceln "\n=== Creating connection pool ===";
91
92 (* Create connection pool *)
93 let pool_config = Conpool.Config.make
94 ~max_connections_per_endpoint:5
95 ~max_idle_time:30.0
96 ~max_connection_lifetime:60.0
97 ()
98 in
99
100 let pool = Conpool.create
101 ~sw
102 ~net:env#net
103 ~clock:env#clock
104 ~config:pool_config
105 ()
106 in
107
108 traceln "\n=== Stress testing with thousands of concurrent connections ===";
109
110 (* Disable debug logging for stress test *)
111 Logs.Src.set_level Conpool.src (Some Logs.Info);
112
113 (* Create endpoints for all servers *)
114 let endpoints = List.map (fun (addr_str, _addr, port) ->
115 Conpool.Endpoint.make ~host:addr_str ~port
116 ) servers in
117
118 (* Stress test: thousands of concurrent requests across all 16 servers *)
119 let num_requests = 50000 in
120
121 traceln "Launching %d concurrent requests across %d endpoints..."
122 num_requests (List.length endpoints);
123 traceln "Pool config: max %d connections per endpoint"
124 (Conpool.Config.max_connections_per_endpoint pool_config);
125
126 let start_time = Unix.gettimeofday () in
127 let success_count = Atomic.make 0 in
128 let error_count = Atomic.make 0 in
129 let last_progress = ref 0 in
130
131 (* Generate list of (endpoint, request_id) pairs *)
132 let tasks = List.init num_requests (fun i ->
133 let endpoint = List.nth endpoints (i mod List.length endpoints) in
134 (endpoint, i)
135 ) in
136
137 (* Run all requests concurrently with fiber limit *)
138 Eio.Fiber.List.iter ~max_fibers:200 (fun (endpoint, req_id) ->
139 try
140 Conpool.with_connection pool endpoint (fun flow ->
141 let test_msg = Printf.sprintf "Request %d" req_id in
142 Eio.Flow.copy_string (test_msg ^ "\n") flow;
143
144 let buf = Eio.Buf_read.of_flow flow ~max_size:1024 in
145 let _response = Eio.Buf_read.line buf in
146 let count = Atomic.fetch_and_add success_count 1 + 1 in
147
148 (* Progress indicator every 5000 requests *)
149 if count / 5000 > !last_progress then begin
150 last_progress := count / 5000;
151 traceln " Progress: %d/%d (%.1f%%)"
152 count num_requests
153 (100.0 *. float_of_int count /. float_of_int num_requests)
154 end
155 )
156 with e ->
157 Atomic.incr error_count;
158 if Atomic.get error_count <= 10 then
159 traceln "Request %d to %a failed: %s"
160 req_id Conpool.Endpoint.pp endpoint (Printexc.to_string e)
161 ) tasks;
162
163 let end_time = Unix.gettimeofday () in
164 let duration = end_time -. start_time in
165 let successful = Atomic.get success_count in
166 let failed = Atomic.get error_count in
167
168 traceln "\n=== Stress test results ===";
169 traceln "Total requests: %d" num_requests;
170 traceln "Successful: %d" successful;
171 traceln "Failed: %d" failed;
172 traceln "Duration: %.2fs" duration;
173 traceln "Throughput: %.0f req/s" (float_of_int successful /. duration);
174 traceln "Average latency: %.2fms" (duration *. 1000.0 /. float_of_int successful);
175
176 traceln "\n=== Connection pool statistics ===";
177 let all_stats = Conpool.all_stats pool in
178
179 (* Calculate totals *)
180 let total_created = List.fold_left (fun acc (_, s) -> acc + Conpool.Stats.total_created s) 0 all_stats in
181 let total_reused = List.fold_left (fun acc (_, s) -> acc + Conpool.Stats.total_reused s) 0 all_stats in
182 let total_closed = List.fold_left (fun acc (_, s) -> acc + Conpool.Stats.total_closed s) 0 all_stats in
183 let total_errors = List.fold_left (fun acc (_, s) -> acc + Conpool.Stats.errors s) 0 all_stats in
184
185 traceln "Total connections created: %d" total_created;
186 traceln "Total connections reused: %d" total_reused;
187 traceln "Total connections closed: %d" total_closed;
188 traceln "Total errors: %d" total_errors;
189 traceln "Connection reuse ratio: %.2fx (reused/created)"
190 (if total_created > 0 then float_of_int total_reused /. float_of_int total_created else 0.0);
191 traceln "Pool efficiency: %.1f%% (avoided creating %d connections)"
192 (if successful > 0 then 100.0 *. float_of_int total_reused /. float_of_int successful else 0.0)
193 total_reused;
194
195 traceln "\nPer-endpoint breakdown:";
196 List.iter (fun (endpoint, stats) ->
197 traceln " %a: created=%d reused=%d active=%d idle=%d"
198 Conpool.Endpoint.pp endpoint
199 (Conpool.Stats.total_created stats)
200 (Conpool.Stats.total_reused stats)
201 (Conpool.Stats.active stats)
202 (Conpool.Stats.idle stats)
203 ) all_stats;
204
205 traceln "\n=== Verifying server-side connection counts ===";
206 List.iter2 (fun (addr_str, _addr, port) conn_ref ->
207 let count = Atomic.get conn_ref in
208 traceln "Server %s:%d - Active connections: %d" addr_str port count
209 ) servers connection_refs;
210
211 traceln "\n=== Test completed successfully ==="