···
Spawns variable number of echo servers on random ports, then exercises
the connection pool with multiple parallel client fibers.
+
Collects detailed event traces for visualization.
(** Configuration for the stress test *)
+
name : string; (** Test name for identification *)
num_servers : int; (** Number of echo servers to spawn *)
num_clients : int; (** Number of client connections per server *)
messages_per_client : int; (** Number of messages each client sends *)
···
···
+
(** Test presets for different scenarios *)
+
(* High connection reuse - few connections, many messages *)
+
messages_per_client = 50;
+
max_parallel_clients = 10;
+
(* Many endpoints - test endpoint scaling *)
+
{ name = "many_endpoints";
+
messages_per_client = 10;
+
max_parallel_clients = 50;
+
(* High concurrency - stress parallel connections *)
+
{ name = "high_concurrency";
+
messages_per_client = 5;
+
max_parallel_clients = 100;
+
(* Large messages - test throughput *)
+
{ name = "large_messages";
+
messages_per_client = 20;
+
max_parallel_clients = 30;
+
(* Constrained pool - force queuing *)
+
{ name = "constrained_pool";
+
messages_per_client = 10;
+
max_parallel_clients = 50;
+
(* Burst traffic - many clients, few messages each *)
+
{ name = "burst_traffic";
+
messages_per_client = 2;
+
max_parallel_clients = 100;
+
(** Extended stress test - 100x messages, 10x clients/servers *)
+
let extended_preset = {
+
name = "extended_stress";
+
messages_per_client = 100;
+
max_parallel_clients = 500;
(** Statistics collected during test *)
+
let create_latency_stats () = {
let update_latency stats latency =
+
stats.count <- stats.count + 1;
+
stats.total <- stats.total +. latency;
+
stats.min <- min stats.min latency;
+
stats.max <- max stats.max latency
(** Generate a random message of given size *)
let generate_message size =
···
String.init size (fun _ -> chars.[Random.int len])
(** Echo server handler - echoes back everything it receives *)
+
let handle_echo_client flow _addr =
let buf = Cstruct.create 4096 in
match Eio.Flow.single_read flow buf with
···
let data = Cstruct.sub buf 0 n in
Eio.Flow.write flow [data];
+
| exception End_of_file -> ()
(** Start an echo server on a random port, returns the port number *)
let start_echo_server ~sw net =
let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 0) in
let listening_socket = Eio.Net.listen net ~sw ~backlog:128 ~reuse_addr:true addr in
let actual_addr = Eio.Net.listening_addr listening_socket in
let port = match actual_addr with
| _ -> failwith "Expected TCP address"
Eio.Fiber.fork_daemon ~sw (fun () ->
Eio.Net.accept_fork ~sw listening_socket
+
~on_error:(fun _ -> ())
···
(** Client test: connect via pool, send message, verify echo *)
+
let run_client_test ~clock ~collector pool endpoint endpoint_id message client_id latency_stats errors =
let msg_len = String.length message in
let start_time = Eio.Time.now clock in
+
(* Get or create connection ID for tracking *)
+
let conn_id = Trace.next_connection_id collector in
Conpool.with_connection pool endpoint (fun flow ->
+
(* Record acquire event *)
+
Trace.record collector ~clock ~event_type:Trace.Connection_acquired
+
~endpoint_id ~connection_id:conn_id ~client_id ();
Eio.Flow.copy_string message flow;
+
Eio.Flow.copy_string "\n" flow;
+
Trace.record collector ~clock ~event_type:Trace.Message_sent
+
~endpoint_id ~connection_id:conn_id ~client_id ();
let response = Eio.Buf_read.of_flow flow ~max_size:(msg_len + 1) in
let echoed = Eio.Buf_read.line response in
+
Trace.record collector ~clock ~event_type:Trace.Message_received
+
~endpoint_id ~connection_id:conn_id ~client_id ();
let end_time = Eio.Time.now clock in
+
let latency = (end_time -. start_time) *. 1000.0 in (* Convert to ms *)
if String.equal echoed message then begin
+
update_latency latency_stats latency;
+
Trace.record collector ~clock ~event_type:Trace.Message_verified
+
~endpoint_id ~connection_id:conn_id ~client_id ()
+
Trace.record collector ~clock ~event_type:(Trace.Connection_error "echo_mismatch")
+
~endpoint_id ~connection_id:conn_id ~client_id ()
+
(* Record release event *)
+
Trace.record collector ~clock ~event_type:Trace.Connection_released
+
~endpoint_id ~connection_id:conn_id ~client_id ()
+
Trace.record collector ~clock ~event_type:(Trace.Connection_error (Printexc.to_string ex))
+
~endpoint_id ~connection_id:conn_id ~client_id ()
(** Run a single client that sends multiple messages *)
+
let run_client ~clock ~collector pool endpoints config latency_stats errors client_id =
+
for _ = 1 to config.messages_per_client do
let endpoint_idx = Random.int (Array.length endpoints) in
let endpoint = endpoints.(endpoint_idx) in
+
let message = Printf.sprintf "c%d-%s" client_id (generate_message config.message_size) in
+
run_client_test ~clock ~collector pool endpoint endpoint_idx message client_id latency_stats errors
+
(** Main stress test runner - returns a test trace *)
+
let run_stress_test ~env config : Trace.test_trace =
let net = Eio.Stdenv.net env in
let clock = Eio.Stdenv.clock env in
+
let collector = Trace.create_collector () in
+
let latency_stats = create_latency_stats () in
+
let ports = ref [||] in
+
let trace_config : Trace.test_config = {
+
num_servers = config.num_servers;
+
num_clients = config.num_clients;
+
messages_per_client = config.messages_per_client;
+
max_parallel_clients = config.max_parallel_clients;
+
message_size = config.message_size;
+
pool_size = config.pool_size;
+
let start_unix_time = Unix.gettimeofday () in
+
let result = ref None in
+
Eio.Switch.run @@ fun sw ->
+
(* Start echo servers *)
+
ports := Array.init config.num_servers (fun _ ->
+
start_echo_server ~sw net
+
Eio.Time.sleep clock 0.05;
+
let endpoints = Array.map (fun port ->
+
Conpool.Endpoint.make ~host:"127.0.0.1" ~port
+
(* Create connection pool with hooks to track events *)
+
let pool_config = Conpool.Config.make
+
~max_connections_per_endpoint:config.pool_size
+
~max_connection_lifetime:120.0
+
~on_connection_created:(fun ep ->
+
let port = Conpool.Endpoint.port ep in
+
let endpoint_id = Array.to_list !ports
+
|> List.mapi (fun i p -> (i, p))
+
|> List.find (fun (_, p) -> p = port)
+
let conn_id = Trace.next_connection_id collector in
+
Trace.record collector ~clock ~event_type:Trace.Connection_created
+
~endpoint_id ~connection_id:conn_id ()
+
~on_connection_reused:(fun ep ->
+
let port = Conpool.Endpoint.port ep in
+
let endpoint_id = Array.to_list !ports
+
|> List.mapi (fun i p -> (i, p))
+
|> List.find (fun (_, p) -> p = port)
+
let conn_id = Trace.next_connection_id collector in
+
Trace.record collector ~clock ~event_type:Trace.Connection_reused
+
~endpoint_id ~connection_id:conn_id ()
+
~on_connection_closed:(fun ep ->
+
let port = Conpool.Endpoint.port ep in
+
let endpoint_id = Array.to_list !ports
+
|> List.mapi (fun i p -> (i, p))
+
|> List.find (fun (_, p) -> p = port)
+
let conn_id = Trace.next_connection_id collector in
+
Trace.record collector ~clock ~event_type:Trace.Connection_closed
+
~endpoint_id ~connection_id:conn_id ()
+
let pool = Conpool.create ~sw ~net ~clock ~config:pool_config () in
+
(* Record start time *)
+
let start_time = Eio.Time.now clock in
+
Trace.set_start_time collector start_time;
+
(* Run clients in parallel *)
+
let total_clients = config.num_servers * config.num_clients in
+
let client_ids = List.init total_clients (fun i -> i) in
+
Eio.Fiber.List.iter ~max_fibers:config.max_parallel_clients
+
run_client ~clock ~collector pool endpoints config latency_stats errors client_id)
+
let end_time = Eio.Time.now clock in
+
let duration = end_time -. start_time in
+
let events = Trace.get_events collector in
+
let endpoint_summaries = Trace.compute_endpoint_summaries events config.num_servers !ports in
+
Trace.test_name = config.name;
+
start_time = start_unix_time;
+
total_messages = latency_stats.count;
+
total_errors = !errors;
+
throughput = float_of_int latency_stats.count /. duration;
+
avg_latency = if latency_stats.count > 0
+
then latency_stats.total /. float_of_int latency_stats.count
+
min_latency = if latency_stats.count > 0 then latency_stats.min else 0.0;
+
max_latency = latency_stats.max;
+
Eio.Switch.fail sw Exit
+
| None -> failwith "Test failed to produce result"
+
(** Run all preset tests and return traces *)
+
let run_all_presets ~env =
+
List.map (fun config ->
+
Printf.eprintf "Running test: %s\n%!" config.name;
+
run_stress_test ~env config
(** Parse command line arguments *)
+
let mode = ref (Single default_config) in
+
let name = ref default_config.name in
let num_servers = ref default_config.num_servers in
let num_clients = ref default_config.num_clients in
let messages_per_client = ref default_config.messages_per_client in
let max_parallel = ref default_config.max_parallel_clients in
let message_size = ref default_config.message_size in
let pool_size = ref default_config.pool_size in
+
let output_file = ref "stress_test_results.json" in
+
("--all", Arg.Unit (fun () -> mode := AllPresets),
+
"Run all preset test configurations");
+
("--extended", Arg.Unit (fun () -> mode := Extended),
+
"Run extended stress test (30 servers, 1000 clients, 100 msgs each = 3M messages)");
+
("--list", Arg.Unit (fun () -> mode := ListPresets),
+
"List available presets");
+
("--preset", Arg.String (fun p ->
+
match List.find_opt (fun c -> c.name = p) presets with
+
| Some c -> mode := Single c
+
| None -> failwith (Printf.sprintf "Unknown preset: %s" p)),
+
"Use a named preset configuration");
+
("-n", Arg.Set_string name, "Test name");
+
("-s", Arg.Set_int num_servers, Printf.sprintf "Number of servers (default: %d)" default_config.num_servers);
+
("-c", Arg.Set_int num_clients, Printf.sprintf "Clients per server (default: %d)" default_config.num_clients);
+
("-m", Arg.Set_int messages_per_client, Printf.sprintf "Messages per client (default: %d)" default_config.messages_per_client);
+
("-p", Arg.Set_int max_parallel, Printf.sprintf "Max parallel clients (default: %d)" default_config.max_parallel_clients);
+
("-b", Arg.Set_int message_size, Printf.sprintf "Message size (default: %d)" default_config.message_size);
+
("-P", Arg.Set_int pool_size, Printf.sprintf "Pool size per endpoint (default: %d)" default_config.pool_size);
+
("-o", Arg.Set_string output_file, "Output JSON file (default: stress_test_results.json)");
+
let usage = "Usage: stress_test [options]\n\nOptions:" in
Arg.parse specs (fun _ -> ()) usage;
num_servers = !num_servers;
num_clients = !num_clients;
messages_per_client = !messages_per_client;
max_parallel_clients = !max_parallel;
message_size = !message_size;
+
(!mode, config, !output_file)
+
let (mode, custom_config, output_file) = parse_args () in
+
Printf.printf "Available presets:\n";
+
Printf.printf " %s: %d servers, %d clients, %d msgs/client, pool=%d\n"
+
c.name c.num_servers c.num_clients c.messages_per_client c.pool_size
+
let config = if config.name = "default" then custom_config else config in
+
Eio_main.run @@ fun env ->
+
let trace = run_stress_test ~env config in
+
let json = Printf.sprintf "[%s]" (Trace.trace_to_json trace) in
+
let oc = open_out output_file in
+
Printf.printf "Results written to %s\n" output_file;
+
Printf.printf "Test: %s - %d messages, %.2f msg/s, %.2fms avg latency, %d errors\n"
+
trace.test_name trace.total_messages trace.throughput trace.avg_latency trace.total_errors
+
Eio_main.run @@ fun env ->
+
let traces = run_all_presets ~env in
+
let json = "[" ^ String.concat ",\n" (List.map Trace.trace_to_json traces) ^ "]" in
+
let oc = open_out output_file in
+
Printf.printf "Results written to %s\n" output_file;
+
Printf.printf " %s: %d messages, %.2f msg/s, %.2fms avg latency, %d errors\n"
+
t.Trace.test_name t.total_messages t.throughput t.avg_latency t.total_errors
+
Printf.printf "Running extended stress test: %d servers, %d clients/server, %d msgs/client\n"
+
extended_preset.num_servers extended_preset.num_clients extended_preset.messages_per_client;
+
Printf.printf "Total messages: %d\n%!"
+
(extended_preset.num_servers * extended_preset.num_clients * extended_preset.messages_per_client);
+
Eio_main.run @@ fun env ->
+
let trace = run_stress_test ~env extended_preset in
+
let json = Printf.sprintf "[%s]" (Trace.trace_to_json trace) in
+
let oc = open_out output_file in
+
Printf.printf "Results written to %s\n" output_file;
+
Printf.printf "Test: %s - %d messages, %.2f msg/s, %.2fms avg latency, %d errors\n"
+
trace.test_name trace.total_messages trace.throughput trace.avg_latency trace.total_errors