My agentic slop goes here. Not intended for anyone else!
at jsont 7.6 kB view raw
1(** Test conpool with 16 localhost servers on different 127.0.* addresses *) 2 3open Eio.Std 4 5(** Create a simple echo server on a specific address and port *) 6let create_server ~sw ~net ipaddr port connections_ref = 7 let socket = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:10 8 (`Tcp (ipaddr, port)) 9 in 10 11 Eio.Fiber.fork ~sw (fun () -> 12 try 13 while true do 14 Eio.Net.accept_fork socket ~sw ~on_error:(fun ex -> 15 traceln "Server %a error: %s" Eio.Net.Sockaddr.pp (`Tcp (ipaddr, port)) 16 (Printexc.to_string ex) 17 ) (fun flow _addr -> 18 (* Track this connection *) 19 Atomic.incr connections_ref; 20 21 (* Simple protocol: read lines and echo them back, until EOF *) 22 try 23 let buf = Eio.Buf_read.of_flow flow ~max_size:1024 in 24 while true do 25 let line = Eio.Buf_read.line buf in 26 traceln "Server on %a:%d received: %s" 27 Eio.Net.Ipaddr.pp ipaddr port line; 28 29 Eio.Flow.copy_string (line ^ "\n") flow 30 done 31 with 32 | End_of_file -> 33 traceln "Server on %a:%d client disconnected" 34 Eio.Net.Ipaddr.pp ipaddr port; 35 Eio.Flow.close flow; 36 Atomic.decr connections_ref 37 | ex -> 38 traceln "Server on %a:%d error handling connection: %s" 39 Eio.Net.Ipaddr.pp ipaddr port 40 (Printexc.to_string ex); 41 Eio.Flow.close flow; 42 Atomic.decr connections_ref 43 ) 44 done 45 with Eio.Cancel.Cancelled _ -> () 46 ) 47 48(** Generate 16 different servers on 127.0.0.1 with different ports *) 49let generate_localhost_addresses () = 50 List.init 16 (fun i -> 51 (* Use 127.0.0.1 for all, just different ports *) 52 let addr_str = "127.0.0.1" in 53 (* Create raw IPv4 address as 4 bytes *) 54 let raw_bytes = Bytes.create 4 in 55 Bytes.set raw_bytes 0 (Char.chr 127); 56 Bytes.set raw_bytes 1 (Char.chr 0); 57 Bytes.set raw_bytes 2 (Char.chr 0); 58 Bytes.set raw_bytes 3 (Char.chr 1); 59 let addr = Eio.Net.Ipaddr.of_raw (Bytes.to_string raw_bytes) in 60 (addr_str, addr, 10000 + i) 61 ) 62 63let () = 64 (* Setup logging *) 65 Logs.set_reporter (Logs_fmt.reporter ()); 66 Logs.set_level (Some Logs.Info); 67 Logs.Src.set_level Conpool.src (Some Logs.Debug); 68 69 Eio_main.run @@ fun env -> 70 Switch.run @@ fun sw -> 71 72 traceln "=== Starting 16 localhost servers ==="; 73 74 (* Generate addresses *) 75 let servers = generate_localhost_addresses () in 76 77 (* Create connection counters for each server *) 78 let connection_refs = List.map (fun _ -> Atomic.make 0) servers in 79 80 (* Start all servers *) 81 List.iter2 (fun (_addr_str, addr, port) conn_ref -> 82 traceln "Starting server on %a:%d" 83 Eio.Net.Ipaddr.pp addr port; 84 create_server ~sw ~net:env#net addr port conn_ref 85 ) servers connection_refs; 86 87 (* Give servers time to start *) 88 Eio.Time.sleep env#clock 0.5; 89 90 traceln "\n=== Creating connection pool ==="; 91 92 (* Create connection pool *) 93 let pool_config = Conpool.Config.make 94 ~max_connections_per_endpoint:5 95 ~max_idle_time:30.0 96 ~max_connection_lifetime:60.0 97 () 98 in 99 100 let pool = Conpool.create 101 ~sw 102 ~net:env#net 103 ~clock:env#clock 104 ~config:pool_config 105 () 106 in 107 108 traceln "\n=== Stress testing with thousands of concurrent connections ==="; 109 110 (* Disable debug logging for stress test *) 111 Logs.Src.set_level Conpool.src (Some Logs.Info); 112 113 (* Create endpoints for all servers *) 114 let endpoints = List.map (fun (addr_str, _addr, port) -> 115 Conpool.Endpoint.make ~host:addr_str ~port 116 ) servers in 117 118 (* Stress test: thousands of concurrent requests across all 16 servers *) 119 let num_requests = 50000 in 120 121 traceln "Launching %d concurrent requests across %d endpoints..." 122 num_requests (List.length endpoints); 123 traceln "Pool config: max %d connections per endpoint" 124 (Conpool.Config.max_connections_per_endpoint pool_config); 125 126 let start_time = Unix.gettimeofday () in 127 let success_count = Atomic.make 0 in 128 let error_count = Atomic.make 0 in 129 let last_progress = ref 0 in 130 131 (* Generate list of (endpoint, request_id) pairs *) 132 let tasks = List.init num_requests (fun i -> 133 let endpoint = List.nth endpoints (i mod List.length endpoints) in 134 (endpoint, i) 135 ) in 136 137 (* Run all requests concurrently with fiber limit *) 138 Eio.Fiber.List.iter ~max_fibers:200 (fun (endpoint, req_id) -> 139 try 140 Conpool.with_connection pool endpoint (fun flow -> 141 let test_msg = Printf.sprintf "Request %d" req_id in 142 Eio.Flow.copy_string (test_msg ^ "\n") flow; 143 144 let buf = Eio.Buf_read.of_flow flow ~max_size:1024 in 145 let _response = Eio.Buf_read.line buf in 146 let count = Atomic.fetch_and_add success_count 1 + 1 in 147 148 (* Progress indicator every 5000 requests *) 149 if count / 5000 > !last_progress then begin 150 last_progress := count / 5000; 151 traceln " Progress: %d/%d (%.1f%%)" 152 count num_requests 153 (100.0 *. float_of_int count /. float_of_int num_requests) 154 end 155 ) 156 with e -> 157 Atomic.incr error_count; 158 if Atomic.get error_count <= 10 then 159 traceln "Request %d to %a failed: %s" 160 req_id Conpool.Endpoint.pp endpoint (Printexc.to_string e) 161 ) tasks; 162 163 let end_time = Unix.gettimeofday () in 164 let duration = end_time -. start_time in 165 let successful = Atomic.get success_count in 166 let failed = Atomic.get error_count in 167 168 traceln "\n=== Stress test results ==="; 169 traceln "Total requests: %d" num_requests; 170 traceln "Successful: %d" successful; 171 traceln "Failed: %d" failed; 172 traceln "Duration: %.2fs" duration; 173 traceln "Throughput: %.0f req/s" (float_of_int successful /. duration); 174 traceln "Average latency: %.2fms" (duration *. 1000.0 /. float_of_int successful); 175 176 traceln "\n=== Connection pool statistics ==="; 177 let all_stats = Conpool.all_stats pool in 178 179 (* Calculate totals *) 180 let total_created = List.fold_left (fun acc (_, s) -> acc + Conpool.Stats.total_created s) 0 all_stats in 181 let total_reused = List.fold_left (fun acc (_, s) -> acc + Conpool.Stats.total_reused s) 0 all_stats in 182 let total_closed = List.fold_left (fun acc (_, s) -> acc + Conpool.Stats.total_closed s) 0 all_stats in 183 let total_errors = List.fold_left (fun acc (_, s) -> acc + Conpool.Stats.errors s) 0 all_stats in 184 185 traceln "Total connections created: %d" total_created; 186 traceln "Total connections reused: %d" total_reused; 187 traceln "Total connections closed: %d" total_closed; 188 traceln "Total errors: %d" total_errors; 189 traceln "Connection reuse ratio: %.2fx (reused/created)" 190 (if total_created > 0 then float_of_int total_reused /. float_of_int total_created else 0.0); 191 traceln "Pool efficiency: %.1f%% (avoided creating %d connections)" 192 (if successful > 0 then 100.0 *. float_of_int total_reused /. float_of_int successful else 0.0) 193 total_reused; 194 195 traceln "\nPer-endpoint breakdown:"; 196 List.iter (fun (endpoint, stats) -> 197 traceln " %a: created=%d reused=%d active=%d idle=%d" 198 Conpool.Endpoint.pp endpoint 199 (Conpool.Stats.total_created stats) 200 (Conpool.Stats.total_reused stats) 201 (Conpool.Stats.active stats) 202 (Conpool.Stats.idle stats) 203 ) all_stats; 204 205 traceln "\n=== Verifying server-side connection counts ==="; 206 List.iter2 (fun (addr_str, _addr, port) conn_ref -> 207 let count = Atomic.get conn_ref in 208 traceln "Server %s:%d - Active connections: %d" addr_str port count 209 ) servers connection_refs; 210 211 traceln "\n=== Test completed successfully ==="