TCP/TLS connection pooling for Eio

testcase

Changed files
+340
test
+3
test/dune
···
+
(executable
+
(name stress_test)
+
(libraries conpool eio eio_main logs logs.fmt fmt))
+337
test/stress_test.ml
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
+
(** Stress test framework for conpool
+
+
Spawns variable number of echo servers on random ports, then exercises
+
the connection pool with multiple parallel client fibers.
+
*)
+
+
let src = Logs.Src.create "stress_test" ~doc:"Connection pool stress test"
+
module Log = (val Logs.src_log src : Logs.LOG)
+
+
(** Configuration for the stress test *)
+
type config = {
+
num_servers : int; (** Number of echo servers to spawn *)
+
num_clients : int; (** Number of client connections per server *)
+
messages_per_client : int; (** Number of messages each client sends *)
+
max_parallel_clients : int; (** Maximum concurrent client fibers *)
+
message_size : int; (** Size of each message in bytes *)
+
pool_size : int; (** Max connections per endpoint *)
+
}
+
+
let default_config = {
+
num_servers = 3;
+
num_clients = 10;
+
messages_per_client = 5;
+
max_parallel_clients = 20;
+
message_size = 64;
+
pool_size = 5;
+
}
+
+
(** Statistics collected during test *)
+
type stats = {
+
mutable total_connections : int;
+
mutable total_messages : int;
+
mutable total_bytes : int;
+
mutable errors : int;
+
mutable min_latency : float;
+
mutable max_latency : float;
+
mutable total_latency : float;
+
}
+
+
let create_stats () = {
+
total_connections = 0;
+
total_messages = 0;
+
total_bytes = 0;
+
errors = 0;
+
min_latency = Float.infinity;
+
max_latency = 0.0;
+
total_latency = 0.0;
+
}
+
+
let update_latency stats latency =
+
stats.min_latency <- min stats.min_latency latency;
+
stats.max_latency <- max stats.max_latency latency;
+
stats.total_latency <- stats.total_latency +. latency
+
+
(** Generate a random message of given size *)
+
let generate_message size =
+
let chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" in
+
let len = String.length chars in
+
String.init size (fun _ -> chars.[Random.int len])
+
+
(** Echo server handler - echoes back everything it receives *)
+
let handle_echo_client flow addr =
+
Log.debug (fun m -> m "Echo server: accepted connection from %a"
+
Eio.Net.Sockaddr.pp addr);
+
let buf = Cstruct.create 4096 in
+
let rec loop () =
+
match Eio.Flow.single_read flow buf with
+
| n ->
+
let data = Cstruct.sub buf 0 n in
+
Eio.Flow.write flow [data];
+
loop ()
+
| exception End_of_file ->
+
Log.debug (fun m -> m "Echo server: client disconnected from %a"
+
Eio.Net.Sockaddr.pp addr)
+
in
+
loop ()
+
+
(** Start an echo server on a random port, returns the port number *)
+
let start_echo_server ~sw net =
+
(* Listen on port 0 to get a random available port *)
+
let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 0) in
+
let listening_socket = Eio.Net.listen net ~sw ~backlog:128 ~reuse_addr:true addr in
+
+
(* Get the actual port assigned *)
+
let actual_addr = Eio.Net.listening_addr listening_socket in
+
let port = match actual_addr with
+
| `Tcp (_, port) -> port
+
| _ -> failwith "Expected TCP address"
+
in
+
+
Log.info (fun m -> m "Echo server started on port %d" port);
+
+
(* Start accepting connections in a daemon fiber.
+
The daemon runs until cancelled when the switch finishes. *)
+
Eio.Fiber.fork_daemon ~sw (fun () ->
+
try
+
while true do
+
Eio.Net.accept_fork ~sw listening_socket
+
~on_error:(fun ex ->
+
Log.warn (fun m -> m "Echo server error: %a" Fmt.exn ex))
+
handle_echo_client
+
done;
+
`Stop_daemon
+
with Eio.Cancel.Cancelled _ ->
+
`Stop_daemon
+
);
+
+
port
+
+
(** Client test: connect via pool, send message, verify echo *)
+
let run_client_test ~clock pool endpoint message test_stats =
+
let msg_len = String.length message in
+
let start_time = Eio.Time.now clock in
+
+
try
+
Conpool.with_connection pool endpoint (fun flow ->
+
(* Send message *)
+
Eio.Flow.copy_string message flow;
+
Eio.Flow.copy_string "\n" flow; (* delimiter *)
+
+
(* Read echo response *)
+
let response = Eio.Buf_read.of_flow flow ~max_size:(msg_len + 1) in
+
let echoed = Eio.Buf_read.line response in
+
+
let end_time = Eio.Time.now clock in
+
let latency = end_time -. start_time in
+
+
if String.equal echoed message then begin
+
test_stats.total_messages <- test_stats.total_messages + 1;
+
test_stats.total_bytes <- test_stats.total_bytes + msg_len;
+
update_latency test_stats latency;
+
Log.debug (fun m -> m "Client: echoed %d bytes in %.3fms"
+
msg_len (latency *. 1000.0))
+
end else begin
+
test_stats.errors <- test_stats.errors + 1;
+
Log.err (fun m -> m "Client: echo mismatch! sent=%S got=%S" message echoed)
+
end
+
);
+
test_stats.total_connections <- test_stats.total_connections + 1
+
with ex ->
+
test_stats.errors <- test_stats.errors + 1;
+
Log.err (fun m -> m "Client error: %a" Fmt.exn ex)
+
+
(** Run a single client that sends multiple messages *)
+
let run_client ~clock pool endpoints config test_stats client_id =
+
Log.debug (fun m -> m "Starting client %d" client_id);
+
+
for msg_num = 1 to config.messages_per_client do
+
(* Pick a random endpoint *)
+
let endpoint_idx = Random.int (Array.length endpoints) in
+
let endpoint = endpoints.(endpoint_idx) in
+
+
(* Generate unique message *)
+
let message = Printf.sprintf "client%d-msg%d-%s"
+
client_id msg_num (generate_message config.message_size) in
+
+
run_client_test ~clock pool endpoint message test_stats
+
done;
+
+
Log.debug (fun m -> m "Client %d completed" client_id)
+
+
(** Main stress test runner *)
+
let run_stress_test ~env config =
+
let net = Eio.Stdenv.net env in
+
let clock = Eio.Stdenv.clock env in
+
+
Log.info (fun m -> m "=== Stress Test Configuration ===");
+
Log.info (fun m -> m "Servers: %d" config.num_servers);
+
Log.info (fun m -> m "Clients per server: %d" config.num_clients);
+
Log.info (fun m -> m "Messages per client: %d" config.messages_per_client);
+
Log.info (fun m -> m "Max parallel clients: %d" config.max_parallel_clients);
+
Log.info (fun m -> m "Message size: %d bytes" config.message_size);
+
Log.info (fun m -> m "Pool size per endpoint: %d" config.pool_size);
+
+
(* Use a sub-switch for servers so we can cancel them when done *)
+
let test_passed = ref false in
+
let expected_messages = ref 0 in
+
+
Eio.Switch.run @@ fun sw ->
+
(* Start echo servers *)
+
Log.info (fun m -> m "Starting %d echo servers..." config.num_servers);
+
let ports = Array.init config.num_servers (fun _ ->
+
start_echo_server ~sw net
+
) in
+
+
(* Small delay to ensure servers are ready *)
+
Eio.Time.sleep clock 0.1;
+
+
(* Create endpoints for all servers *)
+
let endpoints = Array.map (fun port ->
+
Conpool.Endpoint.make ~host:"127.0.0.1" ~port
+
) ports in
+
+
Log.info (fun m -> m "Servers ready on ports: %s"
+
(String.concat ", " (Array.to_list (Array.map string_of_int ports))));
+
+
(* Create connection pool *)
+
let pool_config = Conpool.Config.make
+
~max_connections_per_endpoint:config.pool_size
+
~max_idle_time:30.0
+
~max_connection_lifetime:120.0
+
~connect_timeout:5.0
+
~connect_retry_count:3
+
()
+
in
+
+
let pool = Conpool.create ~sw ~net ~clock ~config:pool_config () in
+
Log.info (fun m -> m "Connection pool created");
+
+
(* Initialize test statistics *)
+
let test_stats = create_stats () in
+
+
(* Calculate total clients *)
+
let total_clients = config.num_servers * config.num_clients in
+
expected_messages := total_clients * config.messages_per_client;
+
Log.info (fun m -> m "Running %d total clients..." total_clients);
+
+
let start_time = Eio.Time.now clock in
+
+
(* Run clients in parallel using Fiber.List *)
+
let client_ids = List.init total_clients (fun i -> i) in
+
Eio.Fiber.List.iter ~max_fibers:config.max_parallel_clients
+
(fun client_id ->
+
run_client ~clock pool endpoints config test_stats client_id)
+
client_ids;
+
+
let end_time = Eio.Time.now clock in
+
let total_time = end_time -. start_time in
+
+
(* Print results *)
+
Log.info (fun m -> m "");
+
Log.info (fun m -> m "=== Test Results ===");
+
Log.info (fun m -> m "Total time: %.3fs" total_time);
+
Log.info (fun m -> m "Total connections: %d" test_stats.total_connections);
+
Log.info (fun m -> m "Total messages: %d" test_stats.total_messages);
+
Log.info (fun m -> m "Total bytes transferred: %d" test_stats.total_bytes);
+
Log.info (fun m -> m "Errors: %d" test_stats.errors);
+
+
if test_stats.total_messages > 0 then begin
+
let avg_latency = test_stats.total_latency /.
+
float_of_int test_stats.total_messages in
+
Log.info (fun m -> m "Latency (min/avg/max): %.3fms / %.3fms / %.3fms"
+
(test_stats.min_latency *. 1000.0)
+
(avg_latency *. 1000.0)
+
(test_stats.max_latency *. 1000.0));
+
Log.info (fun m -> m "Throughput: %.1f messages/sec"
+
(float_of_int test_stats.total_messages /. total_time));
+
Log.info (fun m -> m "Bandwidth: %.1f KB/sec"
+
(float_of_int test_stats.total_bytes /. total_time /. 1024.0))
+
end;
+
+
(* Print pool statistics for each endpoint *)
+
Log.info (fun m -> m "");
+
Log.info (fun m -> m "=== Pool Statistics ===");
+
Array.iteri (fun i endpoint ->
+
let stats = Conpool.stats pool endpoint in
+
Log.info (fun m -> m "Endpoint %d (port %d):" i ports.(i));
+
Log.info (fun m -> m " Active: %d, Idle: %d"
+
(Conpool.Stats.active stats) (Conpool.Stats.idle stats));
+
Log.info (fun m -> m " Created: %d, Reused: %d, Closed: %d, Errors: %d"
+
(Conpool.Stats.total_created stats)
+
(Conpool.Stats.total_reused stats)
+
(Conpool.Stats.total_closed stats)
+
(Conpool.Stats.errors stats))
+
) endpoints;
+
+
(* Verify success *)
+
test_passed := test_stats.errors = 0 &&
+
test_stats.total_messages = !expected_messages;
+
+
if !test_passed then
+
Log.info (fun m -> m "TEST PASSED: All %d messages echoed successfully!"
+
!expected_messages)
+
else
+
Log.err (fun m -> m "TEST FAILED: Expected %d messages, got %d with %d errors"
+
!expected_messages test_stats.total_messages test_stats.errors);
+
+
(* Cancel the switch to stop servers and exit cleanly *)
+
Eio.Switch.fail sw Exit
+
+
(** Parse command line arguments *)
+
let parse_config () =
+
let num_servers = ref default_config.num_servers in
+
let num_clients = ref default_config.num_clients in
+
let messages_per_client = ref default_config.messages_per_client in
+
let max_parallel = ref default_config.max_parallel_clients in
+
let message_size = ref default_config.message_size in
+
let pool_size = ref default_config.pool_size in
+
let verbose = ref false in
+
+
let specs = [
+
("-s", Arg.Set_int num_servers,
+
Printf.sprintf "Number of echo servers (default: %d)" default_config.num_servers);
+
("-c", Arg.Set_int num_clients,
+
Printf.sprintf "Clients per server (default: %d)" default_config.num_clients);
+
("-m", Arg.Set_int messages_per_client,
+
Printf.sprintf "Messages per client (default: %d)" default_config.messages_per_client);
+
("-p", Arg.Set_int max_parallel,
+
Printf.sprintf "Max parallel clients (default: %d)" default_config.max_parallel_clients);
+
("-b", Arg.Set_int message_size,
+
Printf.sprintf "Message size in bytes (default: %d)" default_config.message_size);
+
("-P", Arg.Set_int pool_size,
+
Printf.sprintf "Pool size per endpoint (default: %d)" default_config.pool_size);
+
("-v", Arg.Set verbose, "Enable verbose/debug logging");
+
] in
+
+
let usage = "Usage: stress_test [options]" in
+
Arg.parse specs (fun _ -> ()) usage;
+
+
(* Configure logging *)
+
Logs.set_reporter (Logs_fmt.reporter ());
+
if !verbose then
+
Logs.set_level (Some Logs.Debug)
+
else
+
Logs.set_level (Some Logs.Info);
+
+
{
+
num_servers = !num_servers;
+
num_clients = !num_clients;
+
messages_per_client = !messages_per_client;
+
max_parallel_clients = !max_parallel;
+
message_size = !message_size;
+
pool_size = !pool_size;
+
}
+
+
let () =
+
Random.self_init ();
+
let config = parse_config () in
+
Eio_main.run @@ fun env ->
+
(* Catch Exit which is used to signal clean shutdown *)
+
try run_stress_test ~env config
+
with Exit -> ()