TCP/TLS connection pooling for Eio

Compare changes

Choose any two refs to compare.

+17 -1
.gitignore
···
-
_build
+
# OCaml build artifacts
+
_build/
+
*.install
+
*.merlin
+
+
# Third-party sources (fetch locally with opam source)
+
third_party/
+
+
# Editor and OS files
+
.DS_Store
+
*.swp
+
*~
+
.vscode/
+
.idea/
+
+
# Opam local switch
+
_opam/
+1
.ocamlformat
···
+
version=0.28.1
+54
.tangled/workflows/build.yml
···
+
when:
+
- event: ["push", "pull_request"]
+
branch: ["main"]
+
+
engine: nixery
+
+
dependencies:
+
nixpkgs:
+
- shell
+
- stdenv
+
- findutils
+
- binutils
+
- libunwind
+
- ncurses
+
- opam
+
- git
+
- gawk
+
- gnupatch
+
- gnum4
+
- gnumake
+
- gnutar
+
- gnused
+
- gnugrep
+
- diffutils
+
- gzip
+
- bzip2
+
- gcc
+
- ocaml
+
- pkg-config
+
+
steps:
+
- name: opam
+
command: |
+
opam init --disable-sandboxing -a -y
+
- name: repo
+
command: |
+
opam repo add aoah https://tangled.org/anil.recoil.org/aoah-opam-repo.git
+
- name: switch
+
command: |
+
export PKG_CONFIG_PATH="${PKG_CONFIG_PATH}:$(nix build nixpkgs#gmp.dev --no-link --print-out-paths)/lib/pkgconfig"
+
opam install . --confirm-level=unsafe-yes --deps-only
+
- name: build
+
command: |
+
opam exec -- dune build -p conpool
+
- name: switch-test
+
command: |
+
opam install . --confirm-level=unsafe-yes --deps-only --with-test
+
- name: test
+
command: |
+
opam exec -- dune runtest --verbose
+
- name: doc
+
command: |
+
opam install -y odoc
+
opam exec -- dune build @doc
+3
CHANGES.md
···
+
# v1.0.0 (dev)
+
+
- Initial release of Conpool
+15
LICENSE.md
···
+
ISC License
+
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>
+
+
Permission to use, copy, modify, and distribute this software for any
+
purpose with or without fee is hereby granted, provided that the above
+
copyright notice and this permission notice appear in all copies.
+
+
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+110
README.md
···
+
# Conpool - Protocol-agnostic Connection Pooling for Eio
+
+
Conpool is a connection pooling library built on Eio that manages TCP connection lifecycles, validates connection health, and provides per-endpoint resource limiting for any TCP-based protocol.
+
+
## Key Features
+
+
- **Protocol-agnostic**: Works with HTTP, Redis, PostgreSQL, or any TCP-based protocol
+
- **Health validation**: Automatically validates connections before reuse
+
- **Per-endpoint limits**: Independent connection limits and pooling for each endpoint
+
- **TLS support**: Optional TLS configuration for secure connections
+
- **Statistics & monitoring**: Track connection usage, hits/misses, and health status
+
- **Built on Eio**: Leverages Eio's structured concurrency and resource management
+
+
## Usage
+
+
Basic example establishing a connection pool:
+
+
```ocaml
+
open Eio.Std
+
+
let run env =
+
Switch.run (fun sw ->
+
(* Create a connection pool *)
+
let pool = Conpool.create
+
~sw
+
~net:(Eio.Stdenv.net env)
+
~clock:(Eio.Stdenv.clock env)
+
()
+
in
+
+
(* Define an endpoint *)
+
let endpoint = Conpool.Endpoint.make ~host:"example.com" ~port:80 in
+
+
(* Use a connection from the pool *)
+
Conpool.with_connection pool endpoint (fun conn ->
+
Eio.Flow.copy_string "GET / HTTP/1.1\r\nHost: example.com\r\n\r\n" conn;
+
let buf = Eio.Buf_read.of_flow conn ~max_size:4096 in
+
Eio.Buf_read.take_all buf
+
)
+
)
+
```
+
+
With TLS configuration:
+
+
```ocaml
+
let run env =
+
Switch.run (fun sw ->
+
(* Create TLS configuration - SNI servername is automatically set to the endpoint's hostname *)
+
let tls_config = Tls.Config.client ~authenticator:(Ca_certs.authenticator ()) () in
+
+
(* Create pool with TLS *)
+
let pool = Conpool.create
+
~sw
+
~net:(Eio.Stdenv.net env)
+
~clock:(Eio.Stdenv.clock env)
+
~tls:tls_config
+
()
+
in
+
+
let endpoint = Conpool.Endpoint.make ~host:"example.com" ~port:443 in
+
Conpool.with_connection pool endpoint (fun conn ->
+
(* Use TLS-encrypted connection *)
+
...
+
)
+
)
+
```
+
+
Custom pool configuration:
+
+
```ocaml
+
let config = Conpool.Config.make
+
~max_connections_per_endpoint:20
+
~max_idle_per_endpoint:5
+
~connection_timeout:10.0
+
~validation_interval:300.0
+
()
+
in
+
+
let pool = Conpool.create ~sw ~net ~clock ~config ()
+
```
+
+
Monitor pool statistics:
+
+
```ocaml
+
let stats = Conpool.stats pool endpoint in
+
Printf.printf "Active: %d, Idle: %d, Hits: %d, Misses: %d\n"
+
(Conpool.Stats.active_connections stats)
+
(Conpool.Stats.idle_connections stats)
+
(Conpool.Stats.cache_hits stats)
+
(Conpool.Stats.cache_misses stats)
+
```
+
+
## Installation
+
+
```
+
opam install conpool
+
```
+
+
## Documentation
+
+
API documentation is available at https://tangled.org/@anil.recoil.org/ocaml-conpool or via:
+
+
```
+
opam install conpool
+
odig doc conpool
+
```
+
+
## License
+
+
ISC
+8 -8
conpool.opam
···
synopsis: "Protocol-agnostic TCP/IP connection pooling library for Eio"
description:
"Conpool is a connection pooling library built on Eio.Pool that manages TCP connection lifecycles, validates connection health, and provides per-endpoint resource limiting for any TCP-based protocol (HTTP, Redis, PostgreSQL, etc.)"
-
maintainer: ["Your Name"]
-
authors: ["Your Name"]
-
license: "MIT"
-
homepage: "https://github.com/username/conpool"
-
bug-reports: "https://github.com/username/conpool/issues"
+
maintainer: ["Anil Madhavapeddy <anil@recoil.org>"]
+
authors: ["Anil Madhavapeddy <anil@recoil.org>"]
+
license: "ISC"
+
homepage: "https://tangled.org/@anil.recoil.org/ocaml-conpool"
+
bug-reports: "https://tangled.org/@anil.recoil.org/ocaml-conpool/issues"
depends: [
-
"ocaml"
-
"dune" {>= "3.0" & >= "3.0"}
+
"ocaml" {>= "5.1.0"}
+
"dune" {>= "3.20" & >= "3.0"}
"eio"
"tls-eio" {>= "1.0"}
"logs"
···
"@doc" {with-doc}
]
]
-
dev-repo: "git+https://github.com/username/conpool.git"
+
x-maintenance-intent: ["(latest)"]
+11 -11
dune-project
···
-
(lang dune 3.0)
+
(lang dune 3.20)
+
(name conpool)
(generate_opam_files true)
-
(source
-
(github username/conpool))
-
-
(authors "Your Name")
-
-
(maintainers "Your Name")
-
-
(license MIT)
+
(license ISC)
+
(authors "Anil Madhavapeddy <anil@recoil.org>")
+
(homepage "https://tangled.org/@anil.recoil.org/ocaml-conpool")
+
(maintainers "Anil Madhavapeddy <anil@recoil.org>")
+
(bug_reports "https://tangled.org/@anil.recoil.org/ocaml-conpool/issues")
+
(maintenance_intent "(latest)")
(package
(name conpool)
(synopsis "Protocol-agnostic TCP/IP connection pooling library for Eio")
(description "Conpool is a connection pooling library built on Eio.Pool that manages TCP connection lifecycles, validates connection health, and provides per-endpoint resource limiting for any TCP-based protocol (HTTP, Redis, PostgreSQL, etc.)")
(depends
-
ocaml
+
(ocaml (>= 5.1.0))
(dune (>= 3.0))
eio
(tls-eio (>= 1.0))
logs
fmt
-
cmdliner))
+
cmdliner
+
(odoc :with-doc)))
+30 -25
lib/cmd.ml
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
(** Cmdliner terms for connection pool configuration *)
open Cmdliner
let max_connections_per_endpoint =
let doc = "Maximum concurrent connections per endpoint." in
-
Arg.(value & opt int 10 & info ["max-connections-per-endpoint"] ~doc ~docv:"NUM")
+
Arg.(
+
value & opt int 10
+
& info [ "max-connections-per-endpoint" ] ~doc ~docv:"NUM")
let max_idle_time =
let doc = "Maximum time a connection can sit idle in seconds." in
-
Arg.(value & opt float 60.0 & info ["max-idle-time"] ~doc ~docv:"SECONDS")
+
Arg.(value & opt float 60.0 & info [ "max-idle-time" ] ~doc ~docv:"SECONDS")
let max_connection_lifetime =
let doc = "Maximum connection age in seconds." in
-
Arg.(value & opt float 300.0 & info ["max-connection-lifetime"] ~doc ~docv:"SECONDS")
+
Arg.(
+
value & opt float 300.0
+
& info [ "max-connection-lifetime" ] ~doc ~docv:"SECONDS")
let max_connection_uses =
let doc = "Maximum times a connection can be reused (omit for unlimited)." in
-
Arg.(value & opt (some int) None & info ["max-connection-uses"] ~doc ~docv:"NUM")
+
Arg.(
+
value
+
& opt (some int) None
+
& info [ "max-connection-uses" ] ~doc ~docv:"NUM")
let connect_timeout =
let doc = "Connection timeout in seconds." in
-
Arg.(value & opt float 10.0 & info ["connect-timeout"] ~doc ~docv:"SECONDS")
+
Arg.(value & opt float 10.0 & info [ "connect-timeout" ] ~doc ~docv:"SECONDS")
let connect_retry_count =
let doc = "Number of connection retry attempts." in
-
Arg.(value & opt int 3 & info ["connect-retry-count"] ~doc ~docv:"NUM")
+
Arg.(value & opt int 3 & info [ "connect-retry-count" ] ~doc ~docv:"NUM")
let connect_retry_delay =
let doc = "Initial retry delay in seconds (with exponential backoff)." in
-
Arg.(value & opt float 0.1 & info ["connect-retry-delay"] ~doc ~docv:"SECONDS")
+
Arg.(
+
value & opt float 0.1 & info [ "connect-retry-delay" ] ~doc ~docv:"SECONDS")
let config =
-
let make max_conn max_idle max_lifetime max_uses timeout retry_count retry_delay =
-
Config.make
-
~max_connections_per_endpoint:max_conn
-
~max_idle_time:max_idle
-
~max_connection_lifetime:max_lifetime
-
?max_connection_uses:max_uses
-
~connect_timeout:timeout
-
~connect_retry_count:retry_count
-
~connect_retry_delay:retry_delay
-
()
+
let make max_conn max_idle max_lifetime max_uses timeout retry_count
+
retry_delay =
+
Config.make ~max_connections_per_endpoint:max_conn ~max_idle_time:max_idle
+
~max_connection_lifetime:max_lifetime ?max_connection_uses:max_uses
+
~connect_timeout:timeout ~connect_retry_count:retry_count
+
~connect_retry_delay:retry_delay ()
in
-
Term.(const make
-
$ max_connections_per_endpoint
-
$ max_idle_time
-
$ max_connection_lifetime
-
$ max_connection_uses
-
$ connect_timeout
-
$ connect_retry_count
-
$ connect_retry_delay)
+
Term.(
+
const make $ max_connections_per_endpoint $ max_idle_time
+
$ max_connection_lifetime $ max_connection_uses $ connect_timeout
+
$ connect_retry_count $ connect_retry_delay)
+20 -22
lib/cmd.mli
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
(** Cmdliner terms for connection pool configuration *)
(** {1 Configuration Terms} *)
val max_connections_per_endpoint : int Cmdliner.Term.t
-
(** Cmdliner term for maximum connections per endpoint.
-
Default: 10
-
Flag: [--max-connections-per-endpoint] *)
+
(** Cmdliner term for maximum connections per endpoint. Default: 10 Flag:
+
[--max-connections-per-endpoint] *)
val max_idle_time : float Cmdliner.Term.t
-
(** Cmdliner term for maximum idle time in seconds.
-
Default: 60.0
-
Flag: [--max-idle-time] *)
+
(** Cmdliner term for maximum idle time in seconds. Default: 60.0 Flag:
+
[--max-idle-time] *)
val max_connection_lifetime : float Cmdliner.Term.t
-
(** Cmdliner term for maximum connection lifetime in seconds.
-
Default: 300.0
+
(** Cmdliner term for maximum connection lifetime in seconds. Default: 300.0
Flag: [--max-connection-lifetime] *)
val max_connection_uses : int option Cmdliner.Term.t
-
(** Cmdliner term for maximum connection uses.
-
Default: None (unlimited)
-
Flag: [--max-connection-uses] *)
+
(** Cmdliner term for maximum connection uses. Default: None (unlimited) Flag:
+
[--max-connection-uses] *)
val connect_timeout : float Cmdliner.Term.t
-
(** Cmdliner term for connection timeout in seconds.
-
Default: 10.0
-
Flag: [--connect-timeout] *)
+
(** Cmdliner term for connection timeout in seconds. Default: 10.0 Flag:
+
[--connect-timeout] *)
val connect_retry_count : int Cmdliner.Term.t
-
(** Cmdliner term for number of connection retry attempts.
-
Default: 3
-
Flag: [--connect-retry-count] *)
+
(** Cmdliner term for number of connection retry attempts. Default: 3 Flag:
+
[--connect-retry-count] *)
val connect_retry_delay : float Cmdliner.Term.t
-
(** Cmdliner term for initial retry delay in seconds.
-
Default: 0.1
-
Flag: [--connect-retry-delay] *)
+
(** Cmdliner term for initial retry delay in seconds. Default: 0.1 Flag:
+
[--connect-retry-delay] *)
(** {1 Combined Terms} *)
val config : Config.t Cmdliner.Term.t
(** Cmdliner term that combines all configuration options into a {!Config.t}.
-
This term can be used in your application's main command to accept
-
all connection pool configuration options from the command line. *)
+
This term can be used in your application's main command to accept all
+
connection pool configuration options from the command line. *)
+42 -38
lib/config.ml
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
(** Configuration for connection pools *)
let src = Logs.Src.create "conpool.config" ~doc:"Connection pool configuration"
+
module Log = (val Logs.src_log src : Logs.LOG)
type t = {
···
max_idle_time : float;
max_connection_lifetime : float;
max_connection_uses : int option;
-
health_check : ([`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t -> bool) option;
+
health_check :
+
([Eio.Resource.close_ty | Eio.Flow.two_way_ty] Eio.Resource.t -> bool) option;
connect_timeout : float option;
connect_retry_count : int;
connect_retry_delay : float;
···
on_connection_reused : (Endpoint.t -> unit) option;
}
-
let make
-
?(max_connections_per_endpoint = 10)
-
?(max_idle_time = 60.0)
-
?(max_connection_lifetime = 300.0)
-
?max_connection_uses
-
?health_check
-
?(connect_timeout = 10.0)
-
?(connect_retry_count = 3)
-
?(connect_retry_delay = 0.1)
-
?on_connection_created
-
?on_connection_closed
-
?on_connection_reused
-
() =
+
let make ?(max_connections_per_endpoint = 10) ?(max_idle_time = 60.0)
+
?(max_connection_lifetime = 300.0) ?max_connection_uses ?health_check
+
?(connect_timeout = 10.0) ?(connect_retry_count = 3)
+
?(connect_retry_delay = 0.1) ?on_connection_created ?on_connection_closed
+
?on_connection_reused () =
(* Validate parameters *)
if max_connections_per_endpoint <= 0 then
-
invalid_arg (Printf.sprintf "max_connections_per_endpoint must be positive, got %d"
-
max_connections_per_endpoint);
+
invalid_arg
+
(Printf.sprintf "max_connections_per_endpoint must be positive, got %d"
+
max_connections_per_endpoint);
if max_idle_time <= 0.0 then
-
invalid_arg (Printf.sprintf "max_idle_time must be positive, got %.2f" max_idle_time);
+
invalid_arg
+
(Printf.sprintf "max_idle_time must be positive, got %.2f" max_idle_time);
if max_connection_lifetime <= 0.0 then
-
invalid_arg (Printf.sprintf "max_connection_lifetime must be positive, got %.2f"
-
max_connection_lifetime);
+
invalid_arg
+
(Printf.sprintf "max_connection_lifetime must be positive, got %.2f"
+
max_connection_lifetime);
(match max_connection_uses with
-
| Some n when n <= 0 ->
-
invalid_arg (Printf.sprintf "max_connection_uses must be positive, got %d" n)
-
| _ -> ());
+
| Some n when n <= 0 ->
+
invalid_arg
+
(Printf.sprintf "max_connection_uses must be positive, got %d" n)
+
| _ -> ());
if connect_timeout <= 0.0 then
-
invalid_arg (Printf.sprintf "connect_timeout must be positive, got %.2f" connect_timeout);
+
invalid_arg
+
(Printf.sprintf "connect_timeout must be positive, got %.2f"
+
connect_timeout);
if connect_retry_count < 0 then
-
invalid_arg (Printf.sprintf "connect_retry_count must be non-negative, got %d"
-
connect_retry_count);
+
invalid_arg
+
(Printf.sprintf "connect_retry_count must be non-negative, got %d"
+
connect_retry_count);
if connect_retry_delay <= 0.0 then
-
invalid_arg (Printf.sprintf "connect_retry_delay must be positive, got %.2f"
-
connect_retry_delay);
+
invalid_arg
+
(Printf.sprintf "connect_retry_delay must be positive, got %.2f"
+
connect_retry_delay);
-
Log.debug (fun m ->
-
m "Creating config: max_connections=%d, max_idle=%.1fs, max_lifetime=%.1fs"
-
max_connections_per_endpoint max_idle_time max_connection_lifetime);
{
max_connections_per_endpoint;
max_idle_time;
···
}
let default = make ()
-
let max_connections_per_endpoint t = t.max_connections_per_endpoint
let max_idle_time t = t.max_idle_time
let max_connection_lifetime t = t.max_connection_lifetime
···
- connect_timeout: %s@,\
- connect_retry_count: %d@,\
- connect_retry_delay: %.2fs@]"
-
t.max_connections_per_endpoint
-
t.max_idle_time
-
t.max_connection_lifetime
-
(match t.max_connection_uses with Some n -> string_of_int n | None -> "unlimited")
-
(match t.connect_timeout with Some f -> Fmt.str "%.1fs" f | None -> "none")
-
t.connect_retry_count
-
t.connect_retry_delay
+
t.max_connections_per_endpoint t.max_idle_time t.max_connection_lifetime
+
(match t.max_connection_uses with
+
| Some n -> string_of_int n
+
| None -> "unlimited")
+
(match t.connect_timeout with
+
| Some f -> Fmt.str "%.1fs" f
+
| None -> "none")
+
t.connect_retry_count t.connect_retry_delay
+24 -15
lib/config.mli
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
(** Configuration for connection pools *)
(** {1 Logging} *)
···
val src : Logs.Src.t
(** Logs source for configuration operations. Configure logging with:
{[
-
Logs.Src.set_level Conpool.Config.src (Some Logs.Debug);
-
]}
-
*)
+
Logs.Src.set_level Conpool.Config.src (Some Logs.Debug)
+
]} *)
(** {1 Type} *)
···
?max_idle_time:float ->
?max_connection_lifetime:float ->
?max_connection_uses:int ->
-
?health_check:([ `Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t -> bool) ->
+
?health_check:([Eio.Resource.close_ty | Eio.Flow.two_way_ty] Eio.Resource.t -> bool) ->
?connect_timeout:float ->
?connect_retry_count:int ->
?connect_retry_delay:float ->
?on_connection_created:(Endpoint.t -> unit) ->
?on_connection_closed:(Endpoint.t -> unit) ->
?on_connection_reused:(Endpoint.t -> unit) ->
-
unit -> t
+
unit ->
+
t
(** Create pool configuration with optional parameters.
-
@param max_connections_per_endpoint Maximum concurrent connections per endpoint (default: 10)
-
@param max_idle_time Maximum time a connection can sit idle in seconds (default: 60.0)
-
@param max_connection_lifetime Maximum connection age in seconds (default: 300.0)
-
@param max_connection_uses Maximum times a connection can be reused (default: unlimited)
+
@param max_connections_per_endpoint
+
Maximum concurrent connections per endpoint (default: 10)
+
@param max_idle_time
+
Maximum time a connection can sit idle in seconds (default: 60.0)
+
@param max_connection_lifetime
+
Maximum connection age in seconds (default: 300.0)
+
@param max_connection_uses
+
Maximum times a connection can be reused (default: unlimited)
@param health_check Custom health check function (default: none)
@param connect_timeout Connection timeout in seconds (default: 10.0)
@param connect_retry_count Number of connection retry attempts (default: 3)
-
@param connect_retry_delay Initial retry delay in seconds, with exponential backoff (default: 0.1)
+
@param connect_retry_delay
+
Initial retry delay in seconds, with exponential backoff (default: 0.1)
@param on_connection_created Hook called when a connection is created
@param on_connection_closed Hook called when a connection is closed
-
@param on_connection_reused Hook called when a connection is reused
-
*)
+
@param on_connection_reused Hook called when a connection is reused *)
val default : t
(** Sensible defaults for most use cases:
···
- connect_timeout: 10.0s
- connect_retry_count: 3
- connect_retry_delay: 0.1s
-
- hooks: none
-
*)
+
- hooks: none *)
(** {1 Accessors} *)
···
val max_connection_uses : t -> int option
(** Get maximum connection uses, if any. *)
-
val health_check : t -> ([ `Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t -> bool) option
+
val health_check :
+
t -> ([Eio.Resource.close_ty | Eio.Flow.two_way_ty] Eio.Resource.t -> bool) option
(** Get custom health check function, if any. *)
val connect_timeout : t -> float option
+17 -16
lib/connection.ml
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
(** Internal connection representation - not exposed in public API *)
-
let src = Logs.Src.create "conpool.connection" ~doc:"Connection pool internal connection management"
+
let src =
+
Logs.Src.create "conpool.connection"
+
~doc:"Connection pool internal connection management"
+
module Log = (val Logs.src_log src : Logs.LOG)
type t = {
-
flow : [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t;
+
flow : [Eio.Resource.close_ty | Eio.Flow.two_way_ty] Eio.Resource.t;
created_at : float;
mutable last_used : float;
mutable use_count : int;
···
let flow t = t.flow
let endpoint t = t.endpoint
let created_at t = t.created_at
-
-
let last_used t =
-
Eio.Mutex.use_ro t.mutex (fun () -> t.last_used)
-
-
let use_count t =
-
Eio.Mutex.use_ro t.mutex (fun () -> t.use_count)
+
let last_used t = t.last_used
+
let use_count t = t.use_count
let update_usage t ~now =
Eio.Mutex.use_rw ~protect:true t.mutex (fun () ->
-
t.last_used <- now;
-
t.use_count <- t.use_count + 1
-
)
+
t.last_used <- now;
+
t.use_count <- t.use_count + 1)
let pp ppf t =
-
let uses = Eio.Mutex.use_ro t.mutex (fun () -> t.use_count) in
-
Fmt.pf ppf "Connection(endpoint=%a, age=%.2fs, uses=%d)"
-
Endpoint.pp t.endpoint
-
(Unix.gettimeofday () -. t.created_at)
-
uses
+
let uses = t.use_count in
+
Fmt.pf ppf "Connection(endpoint=%a, created_at=%.2f, uses=%d)" Endpoint.pp
+
t.endpoint t.created_at uses
+345 -315
lib/conpool.ml
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
(** Conpool - Protocol-agnostic TCP/IP connection pooling library for Eio *)
let src = Logs.Src.create "conpool" ~doc:"Connection pooling library"
+
module Log = (val Logs.src_log src : Logs.LOG)
(* Re-export submodules *)
module Endpoint = Endpoint
-
module Tls_config = Tls_config
module Config = Config
module Stats = Stats
module Cmd = Cmd
···
type error =
| Dns_resolution_failed of { hostname : string }
-
| Connection_failed of { endpoint : Endpoint.t; attempts : int; last_error : string }
+
| Connection_failed of {
+
endpoint : Endpoint.t;
+
attempts : int;
+
last_error : string;
+
}
| Connection_timeout of { endpoint : Endpoint.t; timeout : float }
| Invalid_config of string
| Invalid_endpoint of string
-
exception Pool_error of error
-
let pp_error ppf = function
| Dns_resolution_failed { hostname } ->
Fmt.pf ppf "DNS resolution failed for hostname: %s" hostname
| Connection_failed { endpoint; attempts; last_error } ->
-
Fmt.pf ppf "Failed to connect to %a after %d attempts: %s"
-
Endpoint.pp endpoint attempts last_error
+
Fmt.pf ppf "Failed to connect to %a after %d attempts: %s" Endpoint.pp
+
endpoint attempts last_error
| Connection_timeout { endpoint; timeout } ->
-
Fmt.pf ppf "Connection timeout to %a after %.2fs"
-
Endpoint.pp endpoint timeout
-
| Invalid_config msg ->
-
Fmt.pf ppf "Invalid configuration: %s" msg
-
| Invalid_endpoint msg ->
-
Fmt.pf ppf "Invalid endpoint: %s" msg
+
Fmt.pf ppf "Connection timeout to %a after %.2fs" Endpoint.pp endpoint
+
timeout
+
| Invalid_config msg -> Fmt.pf ppf "Invalid configuration: %s" msg
+
| Invalid_endpoint msg -> Fmt.pf ppf "Invalid endpoint: %s" msg
+
+
type Eio.Exn.err += E of error
+
+
let err e = Eio.Exn.create (E e)
+
+
let () =
+
Eio.Exn.register_pp (fun f -> function
+
| E e ->
+
Fmt.string f "Conpool ";
+
pp_error f e;
+
true
+
| _ -> false)
+
+
(** {1 Connection Types} *)
+
+
type connection_ty = [Eio.Resource.close_ty | Eio.Flow.two_way_ty]
+
type connection = connection_ty Eio.Resource.t
type endp_stats = {
mutable active : int;
···
net : 'net;
clock : 'clock;
config : Config.t;
-
tls : Tls_config.t option;
+
tls : Tls.Config.client option;
endpoints : (Endpoint.t, endpoint_pool) Hashtbl.t;
endpoints_mutex : Eio.Mutex.t;
}
type t = T : ('clock Eio.Time.clock, 'net Eio.Net.t) internal -> t
-
module EndpointTbl = Hashtbl.Make(struct
+
module EndpointTbl = Hashtbl.Make (struct
type t = Endpoint.t
+
let equal = Endpoint.equal
let hash = Endpoint.hash
end)
-
let get_time (pool : ('clock, 'net) internal) =
-
Eio.Time.now pool.clock
+
let get_time (pool : ('clock, 'net) internal) = Eio.Time.now pool.clock
-
let create_endp_stats () = {
-
active = 0;
-
idle = 0;
-
total_created = 0;
-
total_reused = 0;
-
total_closed = 0;
-
errors = 0;
-
}
+
let create_endp_stats () =
+
{
+
active = 0;
+
idle = 0;
+
total_created = 0;
+
total_reused = 0;
+
total_closed = 0;
+
errors = 0;
+
}
let snapshot_stats (stats : endp_stats) : Stats.t =
-
Stats.make
-
~active:stats.active
-
~idle:stats.idle
-
~total_created:stats.total_created
-
~total_reused:stats.total_reused
-
~total_closed:stats.total_closed
-
~errors:stats.errors
+
Stats.make ~active:stats.active ~idle:stats.idle
+
~total_created:stats.total_created ~total_reused:stats.total_reused
+
~total_closed:stats.total_closed ~errors:stats.errors
(** {1 DNS Resolution} *)
let resolve_endpoint (pool : ('clock, 'net) internal) endpoint =
-
Log.debug (fun m -> m "Resolving %a..." Endpoint.pp endpoint);
-
let addrs = Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint) ~service:(string_of_int (Endpoint.port endpoint)) in
-
Log.debug (fun m -> m "Got address list for %a" Endpoint.pp endpoint);
-
match addrs with
-
| addr :: _ ->
-
Log.debug (fun m -> m "Resolved %a to %a"
-
Endpoint.pp endpoint Eio.Net.Sockaddr.pp addr);
-
addr
-
| [] ->
-
Log.err (fun m -> m "Failed to resolve hostname: %s" (Endpoint.host endpoint));
-
raise (Pool_error (Dns_resolution_failed { hostname = Endpoint.host endpoint }))
+
Log.debug (fun m -> m "Resolving %a" Endpoint.pp endpoint);
+
try
+
let addrs =
+
Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint)
+
~service:(string_of_int (Endpoint.port endpoint))
+
in
+
match addrs with
+
| addr :: _ ->
+
Log.debug (fun m ->
+
m "Resolved %a to %a" Endpoint.pp endpoint Eio.Net.Sockaddr.pp addr);
+
addr
+
| [] ->
+
(* Raise exception with error code - context will be added when caught *)
+
raise (err (Dns_resolution_failed { hostname = Endpoint.host endpoint }))
+
with Eio.Io _ as ex ->
+
let bt = Printexc.get_raw_backtrace () in
+
Eio.Exn.reraise_with_context ex bt "resolving %a" Endpoint.pp endpoint
(** {1 Connection Creation with Retry} *)
-
let rec create_connection_with_retry (pool : ('clock, 'net) internal) endpoint attempt last_error =
+
let rec create_connection_with_retry (pool : ('clock, 'net) internal) endpoint
+
attempt last_error =
let retry_count = Config.connect_retry_count pool.config in
-
if attempt > retry_count then begin
-
Log.err (fun m -> m "Failed to connect to %a after %d attempts"
-
Endpoint.pp endpoint retry_count);
-
raise (Pool_error (Connection_failed { endpoint; attempts = retry_count; last_error }))
-
end;
+
if attempt > retry_count then
+
(* Raise exception with error code - context will be added when caught *)
+
raise (err (Connection_failed { endpoint; attempts = retry_count; last_error }));
-
Log.debug (fun m -> m "Connecting to %a (attempt %d/%d)"
-
Endpoint.pp endpoint attempt retry_count);
+
Log.debug (fun m ->
+
m "Connecting to %a (attempt %d/%d)" Endpoint.pp endpoint attempt
+
retry_count);
try
let addr = resolve_endpoint pool endpoint in
-
Log.debug (fun m -> m "Resolved %a to address" Endpoint.pp endpoint);
(* Connect with optional timeout *)
let socket =
-
match Config.connect_timeout pool.config with
-
| Some timeout ->
-
Eio.Time.with_timeout_exn pool.clock timeout
-
(fun () -> Eio.Net.connect ~sw:pool.sw pool.net addr)
-
| None ->
-
Eio.Net.connect ~sw:pool.sw pool.net addr
+
try
+
match Config.connect_timeout pool.config with
+
| Some timeout ->
+
Eio.Time.with_timeout_exn pool.clock timeout (fun () ->
+
Eio.Net.connect ~sw:pool.sw pool.net addr)
+
| None -> Eio.Net.connect ~sw:pool.sw pool.net addr
+
with Eio.Io _ as ex ->
+
let bt = Printexc.get_raw_backtrace () in
+
Eio.Exn.reraise_with_context ex bt "connecting to %a" Endpoint.pp endpoint
in
-
Log.debug (fun m -> m "TCP connection established to %a" Endpoint.pp endpoint);
+
Log.debug (fun m ->
+
m "TCP connection established to %a" Endpoint.pp endpoint);
-
let flow = match pool.tls with
-
| None -> (socket :> [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t)
-
| Some tls_cfg ->
-
Log.debug (fun m -> m "Initiating TLS handshake with %a" Endpoint.pp endpoint);
-
let host = match Tls_config.servername tls_cfg with
-
| Some name -> Domain_name.(host_exn (of_string_exn name))
-
| None -> Domain_name.(host_exn (of_string_exn (Endpoint.host endpoint)))
-
in
-
let tls_flow = Tls_eio.client_of_flow ~host (Tls_config.config tls_cfg) socket in
-
Log.info (fun m -> m "TLS connection established to %a" Endpoint.pp endpoint);
-
(tls_flow :> [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t)
+
let flow =
+
match pool.tls with
+
| None ->
+
(socket :> connection)
+
| Some tls_config ->
+
try
+
Log.debug (fun m ->
+
m "Initiating TLS handshake with %a" Endpoint.pp endpoint);
+
let host =
+
Domain_name.(host_exn (of_string_exn (Endpoint.host endpoint)))
+
in
+
let tls_flow = Tls_eio.client_of_flow ~host tls_config socket in
+
Log.info (fun m ->
+
m "TLS connection established to %a" Endpoint.pp endpoint);
+
(tls_flow :> connection)
+
with Eio.Io _ as ex ->
+
let bt = Printexc.get_raw_backtrace () in
+
Eio.Exn.reraise_with_context ex bt "TLS handshake with %a" Endpoint.pp endpoint
in
let now = get_time pool in
···
endpoint;
mutex = Eio.Mutex.create ();
}
-
with
-
| Eio.Time.Timeout as e ->
-
Log.warn (fun m -> m "Connection timeout to %a (attempt %d)" Endpoint.pp endpoint attempt);
-
let error_msg = Printexc.to_string e in
+
| Eio.Time.Timeout ->
+
Log.warn (fun m ->
+
m "Connection timeout to %a (attempt %d)" Endpoint.pp endpoint attempt);
if attempt >= Config.connect_retry_count pool.config then
(* Last attempt - convert to our error type *)
match Config.connect_timeout pool.config with
| Some timeout ->
-
raise (Pool_error (Connection_timeout { endpoint; timeout }))
+
raise (err (Connection_timeout { endpoint; timeout }))
| None ->
-
raise (Pool_error (Connection_failed { endpoint; attempts = attempt; last_error = error_msg }))
+
raise (err (Connection_failed
+
{ endpoint; attempts = attempt; last_error = "Timeout" }))
else begin
(* Retry with exponential backoff *)
-
let delay = Config.connect_retry_delay pool.config *. (2.0 ** float_of_int (attempt - 1)) in
+
let delay =
+
Config.connect_retry_delay pool.config
+
*. (2.0 ** float_of_int (attempt - 1))
+
in
Eio.Time.sleep pool.clock delay;
-
create_connection_with_retry pool endpoint (attempt + 1) error_msg
+
create_connection_with_retry pool endpoint (attempt + 1) "Timeout"
end
-
| e ->
-
(* Other errors - retry with backoff *)
-
let error_msg = Printexc.to_string e in
-
Log.warn (fun m -> m "Connection attempt %d to %a failed: %s"
-
attempt Endpoint.pp endpoint error_msg);
+
| Eio.Io _ as ex ->
+
(* Eio IO errors - retry with backoff and add context on final failure *)
+
let error_msg = Printexc.to_string ex in
+
Log.warn (fun m ->
+
m "Connection attempt %d to %a failed: %s" attempt Endpoint.pp
+
endpoint error_msg);
if attempt < Config.connect_retry_count pool.config then (
-
let delay = Config.connect_retry_delay pool.config *. (2.0 ** float_of_int (attempt - 1)) in
+
let delay =
+
Config.connect_retry_delay pool.config
+
*. (2.0 ** float_of_int (attempt - 1))
+
in
Eio.Time.sleep pool.clock delay;
-
create_connection_with_retry pool endpoint (attempt + 1) error_msg
-
) else
-
raise (Pool_error (Connection_failed { endpoint; attempts = attempt; last_error = error_msg }))
+
create_connection_with_retry pool endpoint (attempt + 1) error_msg)
+
else
+
let bt = Printexc.get_raw_backtrace () in
+
Eio.Exn.reraise_with_context ex bt "after %d retry attempts" attempt
let create_connection (pool : ('clock, 'net) internal) endpoint =
create_connection_with_retry pool endpoint 1 "No attempts made"
···
let age = now -. Connection.created_at conn in
let max_lifetime = Config.max_connection_lifetime pool.config in
if age > max_lifetime then begin
-
Log.debug (fun m -> m "Connection to %a unhealthy: exceeded max lifetime (%.2fs > %.2fs)"
-
Endpoint.pp (Connection.endpoint conn) age max_lifetime);
+
Log.debug (fun m ->
+
m "Connection to %a unhealthy: exceeded max lifetime (%.2fs > %.2fs)"
+
Endpoint.pp (Connection.endpoint conn) age max_lifetime);
false
end
-
(* Check idle time *)
-
else begin
+
else begin
let max_idle = Config.max_idle_time pool.config in
-
if (now -. Connection.last_used conn) > max_idle then begin
+
if now -. Connection.last_used conn > max_idle then begin
let idle_time = now -. Connection.last_used conn in
-
Log.debug (fun m -> m "Connection to %a unhealthy: exceeded max idle time (%.2fs > %.2fs)"
-
Endpoint.pp (Connection.endpoint conn) idle_time max_idle);
+
Log.debug (fun m ->
+
m "Connection to %a unhealthy: exceeded max idle time (%.2fs > %.2fs)"
+
Endpoint.pp (Connection.endpoint conn) idle_time max_idle);
false
-
end
-
-
(* Check use count *)
-
else if (match Config.max_connection_uses pool.config with
-
| Some max -> Connection.use_count conn >= max
-
| None -> false) then begin
-
Log.debug (fun m -> m "Connection to %a unhealthy: exceeded max use count (%d)"
-
Endpoint.pp (Connection.endpoint conn) (Connection.use_count conn));
+
end (* Check use count *)
+
else if
+
match Config.max_connection_uses pool.config with
+
| Some max -> Connection.use_count conn >= max
+
| None -> false
+
then begin
+
Log.debug (fun m ->
+
m "Connection to %a unhealthy: exceeded max use count (%d)"
+
Endpoint.pp (Connection.endpoint conn)
+
(Connection.use_count conn));
false
-
end
-
-
(* Optional: custom health check *)
-
else if (match Config.health_check pool.config with
-
| Some check ->
-
(try
-
let healthy = check (Connection.flow conn) in
-
if not healthy then
-
Log.debug (fun m -> m "Connection to %a failed custom health check"
-
Endpoint.pp (Connection.endpoint conn));
-
not healthy
-
with e ->
-
Log.debug (fun m -> m "Connection to %a health check raised exception: %s"
-
Endpoint.pp (Connection.endpoint conn) (Printexc.to_string e));
-
true) (* Exception in health check = unhealthy *)
-
| None -> false) then
-
false
-
-
(* Optional: check if socket still connected *)
+
end (* Optional: custom health check *)
+
else if
+
match Config.health_check pool.config with
+
| Some check -> (
+
try
+
let healthy = check (Connection.flow conn) in
+
if not healthy then
+
Log.debug (fun m ->
+
m "Connection to %a failed custom health check" Endpoint.pp
+
(Connection.endpoint conn));
+
not healthy
+
with e ->
+
Log.debug (fun m ->
+
m "Connection to %a health check raised exception: %s"
+
Endpoint.pp (Connection.endpoint conn) (Printexc.to_string e));
+
true (* Exception in health check = unhealthy *))
+
| None -> false
+
then false (* Optional: check if socket still connected *)
else if check_readable then
try
(* TODO avsm: a sockopt for this? *)
true
-
with
-
| _ -> false
-
+
with _ -> false
else begin
-
Log.debug (fun m -> m "Connection to %a is healthy (age=%.2fs, idle=%.2fs, uses=%d)"
-
Endpoint.pp (Connection.endpoint conn)
-
age
-
(now -. Connection.last_used conn)
-
(Connection.use_count conn));
+
Log.debug (fun m ->
+
m "Connection to %a is healthy (age=%.2fs, idle=%.2fs, uses=%d)"
+
Endpoint.pp (Connection.endpoint conn) age
+
(now -. Connection.last_used conn)
+
(Connection.use_count conn));
true
end
end
···
(** {1 Internal Pool Operations} *)
let close_internal (pool : ('clock, 'net) internal) conn =
-
Log.debug (fun m -> m "Closing connection to %a (age=%.2fs, uses=%d)"
-
Endpoint.pp (Connection.endpoint conn)
-
(get_time pool -. Connection.created_at conn)
-
(Connection.use_count conn));
+
Log.debug (fun m ->
+
m "Closing connection to %a (age=%.2fs, uses=%d)" Endpoint.pp
+
(Connection.endpoint conn)
+
(get_time pool -. Connection.created_at conn)
+
(Connection.use_count conn));
Eio.Cancel.protect (fun () ->
-
try
-
Eio.Flow.close (Connection.flow conn)
-
with _ -> ()
-
);
+
try Eio.Flow.close (Connection.flow conn) with _ -> ());
(* Call hook if configured *)
-
Option.iter (fun f -> f (Connection.endpoint conn)) (Config.on_connection_closed pool.config)
+
Option.iter
+
(fun f -> f (Connection.endpoint conn))
+
(Config.on_connection_closed pool.config)
let get_or_create_endpoint_pool (pool : ('clock, 'net) internal) endpoint =
-
Log.debug (fun m -> m "Getting or creating endpoint pool for %a" Endpoint.pp endpoint);
-
(* First try with read lock *)
-
match Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
-
Hashtbl.find_opt pool.endpoints endpoint
-
) with
+
match
+
Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
+
Hashtbl.find_opt pool.endpoints endpoint)
+
with
| Some ep_pool ->
-
Log.debug (fun m -> m "Found existing endpoint pool for %a" Endpoint.pp endpoint);
ep_pool
| None ->
-
Log.debug (fun m -> m "No existing pool, need to create for %a" Endpoint.pp endpoint);
(* Need to create - use write lock *)
Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () ->
-
(* Check again in case another fiber created it *)
-
match Hashtbl.find_opt pool.endpoints endpoint with
-
| Some ep_pool ->
-
Log.debug (fun m -> m "Another fiber created pool for %a" Endpoint.pp endpoint);
-
ep_pool
-
| None ->
+
(* Check again in case another fiber created it *)
+
match Hashtbl.find_opt pool.endpoints endpoint with
+
| Some ep_pool ->
+
ep_pool
+
| None ->
(* Create new endpoint pool *)
let stats = create_endp_stats () in
let mutex = Eio.Mutex.create () in
-
Log.info (fun m -> m "Creating new endpoint pool for %a (max_connections=%d)"
-
Endpoint.pp endpoint (Config.max_connections_per_endpoint pool.config));
+
Log.info (fun m ->
+
m "Creating endpoint pool for %a (max_connections=%d)"
+
Endpoint.pp endpoint
+
(Config.max_connections_per_endpoint pool.config));
-
Log.debug (fun m -> m "About to create Eio.Pool for %a" Endpoint.pp endpoint);
+
let eio_pool =
+
Eio.Pool.create
+
(Config.max_connections_per_endpoint pool.config)
+
~validate:(fun conn ->
+
let healthy = is_healthy pool ~check_readable:false conn in
+
if healthy then (
+
(* Update stats for reuse *)
+
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
+
stats.total_reused <- stats.total_reused + 1);
-
let eio_pool = Eio.Pool.create
-
(Config.max_connections_per_endpoint pool.config)
-
~validate:(fun conn ->
-
Log.debug (fun m -> m "Validate called for connection to %a" Endpoint.pp endpoint);
-
(* Called before reusing from pool *)
-
let healthy = is_healthy pool ~check_readable:false conn in
+
(* Call hook if configured *)
+
Option.iter
+
(fun f -> f endpoint)
+
(Config.on_connection_reused pool.config);
-
if healthy then (
-
Log.debug (fun m -> m "Reusing connection to %a from pool" Endpoint.pp endpoint);
+
(* Run health check if configured *)
+
match Config.health_check pool.config with
+
| Some check -> (
+
try check (Connection.flow conn) with _ -> false)
+
| None -> true)
+
else
+
false)
+
~dispose:(fun conn ->
+
(* Called when removing from pool *)
+
Eio.Cancel.protect (fun () ->
+
close_internal pool conn;
-
(* Update stats for reuse *)
-
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
-
stats.total_reused <- stats.total_reused + 1
-
);
+
(* Update stats *)
+
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
+
stats.total_closed <- stats.total_closed + 1)))
+
(fun () ->
+
try
+
let conn = create_connection pool endpoint in
-
(* Call hook if configured *)
-
Option.iter (fun f -> f endpoint) (Config.on_connection_reused pool.config);
+
(* Update stats *)
+
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
+
stats.total_created <- stats.total_created + 1);
-
(* Run health check if configured *)
-
match Config.health_check pool.config with
-
| Some check ->
-
(try check (Connection.flow conn)
-
with _ -> false)
-
| None -> true
-
) else begin
-
Log.debug (fun m -> m "Connection to %a failed validation, creating new one" Endpoint.pp endpoint);
-
false
-
end
-
)
-
~dispose:(fun conn ->
-
(* Called when removing from pool *)
-
Eio.Cancel.protect (fun () ->
-
close_internal pool conn;
+
(* Call hook if configured *)
+
Option.iter
+
(fun f -> f endpoint)
+
(Config.on_connection_created pool.config);
-
(* Update stats *)
-
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
-
stats.total_closed <- stats.total_closed + 1
-
)
-
)
-
)
-
(fun () ->
-
Log.debug (fun m -> m "Factory function called for %a" Endpoint.pp endpoint);
-
try
-
let conn = create_connection pool endpoint in
-
-
Log.debug (fun m -> m "Connection created successfully for %a" Endpoint.pp endpoint);
-
-
(* Update stats *)
-
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
-
stats.total_created <- stats.total_created + 1
-
);
-
-
(* Call hook if configured *)
-
Option.iter (fun f -> f endpoint) (Config.on_connection_created pool.config);
-
-
conn
-
with e ->
-
Log.err (fun m -> m "Factory function failed for %a: %s"
-
Endpoint.pp endpoint (Printexc.to_string e));
-
(* Update error stats *)
-
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
-
stats.errors <- stats.errors + 1
-
);
-
raise e
-
)
+
conn
+
with Eio.Io _ as ex ->
+
(* Eio.Io exceptions already have full context from create_connection.
+
Just update error stats and let the exception propagate. *)
+
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
+
stats.errors <- stats.errors + 1);
+
raise ex)
in
-
Log.debug (fun m -> m "Eio.Pool created successfully for %a" Endpoint.pp endpoint);
-
-
let ep_pool = {
-
pool = eio_pool;
-
stats;
-
mutex;
-
} in
-
+
let ep_pool = { pool = eio_pool; stats; mutex } in
Hashtbl.add pool.endpoints endpoint ep_pool;
-
Log.debug (fun m -> m "Endpoint pool added to hashtable for %a" Endpoint.pp endpoint);
-
ep_pool
-
)
+
ep_pool)
(** {1 Public API - Pool Creation} *)
-
let create ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock) ?tls ?(config = Config.default) () : t =
-
Log.info (fun m -> m "Creating new connection pool (max_per_endpoint=%d, max_idle=%.1fs, max_lifetime=%.1fs)"
-
(Config.max_connections_per_endpoint config)
-
(Config.max_idle_time config)
-
(Config.max_connection_lifetime config));
+
let create ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock) ?tls
+
?(config = Config.default) () : t =
+
Log.info (fun m ->
+
m
+
"Creating new connection pool (max_per_endpoint=%d, max_idle=%.1fs, \
+
max_lifetime=%.1fs)"
+
(Config.max_connections_per_endpoint config)
+
(Config.max_idle_time config)
+
(Config.max_connection_lifetime config));
-
let pool = {
-
sw;
-
net;
-
clock;
-
config;
-
tls;
-
endpoints = Hashtbl.create 16;
-
endpoints_mutex = Eio.Mutex.create ();
-
} in
+
let pool =
+
{
+
sw;
+
net;
+
clock;
+
config;
+
tls;
+
endpoints = Hashtbl.create 16;
+
endpoints_mutex = Eio.Mutex.create ();
+
}
+
in
(* Auto-cleanup on switch release *)
Eio.Switch.on_release sw (fun () ->
-
Eio.Cancel.protect (fun () ->
-
Log.info (fun m -> m "Closing connection pool");
-
(* Close all idle connections - active ones will be cleaned up by switch *)
-
Hashtbl.iter (fun _endpoint _ep_pool ->
-
(* Connections are bound to the switch and will be auto-closed *)
-
()
-
) pool.endpoints;
+
Eio.Cancel.protect (fun () ->
+
Log.info (fun m -> m "Closing connection pool");
+
(* Close all idle connections - active ones will be cleaned up by switch *)
+
Hashtbl.iter
+
(fun _endpoint _ep_pool ->
+
(* Connections are bound to the switch and will be auto-closed *)
+
())
+
pool.endpoints;
-
Hashtbl.clear pool.endpoints
-
)
-
);
+
Hashtbl.clear pool.endpoints));
T pool
(** {1 Public API - Connection Management} *)
-
let with_connection (T pool) endpoint f =
+
let connection_internal ~sw (T pool) endpoint =
Log.debug (fun m -> m "Acquiring connection to %a" Endpoint.pp endpoint);
let ep_pool = get_or_create_endpoint_pool pool endpoint in
+
+
(* Create promises for connection handoff and cleanup signal *)
+
let conn_promise, conn_resolver = Eio.Promise.create () in
+
let done_promise, done_resolver = Eio.Promise.create () in
(* Increment active count *)
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
-
ep_pool.stats.active <- ep_pool.stats.active + 1
-
);
+
ep_pool.stats.active <- ep_pool.stats.active + 1);
+
+
(* Fork a daemon fiber to manage the connection lifecycle.
+
Important: Fork under pool.sw, not the caller's sw, so the daemon
+
survives when the caller's switch ends and can return the connection
+
to the pool for reuse. *)
+
Eio.Fiber.fork_daemon ~sw:pool.sw (fun () ->
+
Fun.protect
+
~finally:(fun () ->
+
(* Decrement active count *)
+
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
+
ep_pool.stats.active <- ep_pool.stats.active - 1);
+
Log.debug (fun m -> m "Released connection to %a" Endpoint.pp endpoint))
+
(fun () ->
+
(* Use Eio.Pool for resource management *)
+
Eio.Pool.use ep_pool.pool (fun conn ->
+
Log.debug (fun m ->
+
m "Using connection to %a (uses=%d)" Endpoint.pp endpoint
+
(Connection.use_count conn));
+
+
(* Update last used time and use count *)
+
Connection.update_usage conn ~now:(get_time pool);
+
+
(* Update idle stats (connection taken from idle pool) *)
+
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
+
ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
+
+
(* Hand off connection to caller *)
+
Eio.Promise.resolve conn_resolver conn.flow;
-
Fun.protect
-
~finally:(fun () ->
-
(* Decrement active count *)
-
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
-
ep_pool.stats.active <- ep_pool.stats.active - 1
-
);
-
Log.debug (fun m -> m "Released connection to %a" Endpoint.pp endpoint)
-
)
-
(fun () ->
-
(* Use Eio.Pool for resource management *)
-
Eio.Pool.use ep_pool.pool (fun conn ->
-
Log.debug (fun m -> m "Using connection to %a (uses=%d)"
-
Endpoint.pp endpoint (Connection.use_count conn));
+
try
+
(* Wait for switch to signal cleanup *)
+
Eio.Promise.await done_promise;
+
+
(* Success - connection will be returned to pool by Eio.Pool *)
+
(* Update idle stats (connection returned to idle pool) *)
+
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
+
ep_pool.stats.idle <- ep_pool.stats.idle + 1);
-
(* Update last used time and use count *)
-
Connection.update_usage conn ~now:(get_time pool);
+
`Stop_daemon
+
with e ->
+
(* Error during connection usage - close so it won't be reused.
+
The exception already has context from where it was raised. *)
+
close_internal pool conn;
-
(* Update idle stats (connection taken from idle pool) *)
-
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
-
ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1)
-
);
+
(* Update error stats *)
+
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
+
ep_pool.stats.errors <- ep_pool.stats.errors + 1);
-
try
-
let result = f conn.flow in
+
raise e)));
-
(* Success - connection will be returned to pool by Eio.Pool *)
-
(* Update idle stats (connection returned to idle pool) *)
-
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
-
ep_pool.stats.idle <- ep_pool.stats.idle + 1
-
);
+
(* Signal cleanup when switch ends *)
+
Eio.Switch.on_release sw (fun () ->
+
Eio.Promise.resolve done_resolver ());
-
result
-
with e ->
-
(* Error - close connection so it won't be reused *)
-
Log.warn (fun m -> m "Error using connection to %a: %s"
-
Endpoint.pp endpoint (Printexc.to_string e));
-
close_internal pool conn;
+
(* Return the connection *)
+
Eio.Promise.await conn_promise
-
(* Update error stats *)
-
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
-
ep_pool.stats.errors <- ep_pool.stats.errors + 1
-
);
+
let connection ~sw t endpoint = connection_internal ~sw t endpoint
-
raise e
-
)
-
)
+
let with_connection t endpoint f =
+
Eio.Switch.run (fun sw -> f (connection ~sw t endpoint))
(** {1 Public API - Statistics} *)
let stats (T pool) endpoint =
match Hashtbl.find_opt pool.endpoints endpoint with
| Some ep_pool ->
-
Eio.Mutex.use_ro ep_pool.mutex (fun () ->
-
snapshot_stats ep_pool.stats
-
)
+
Eio.Mutex.use_ro ep_pool.mutex (fun () -> snapshot_stats ep_pool.stats)
| None ->
(* No pool for this endpoint yet *)
-
Stats.make
-
~active:0
-
~idle:0
-
~total_created:0
-
~total_reused:0
-
~total_closed:0
-
~errors:0
+
Stats.make ~active:0 ~idle:0 ~total_created:0 ~total_reused:0
+
~total_closed:0 ~errors:0
let all_stats (T pool) =
Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
-
Hashtbl.fold (fun endpoint ep_pool acc ->
-
let stats = Eio.Mutex.use_ro ep_pool.mutex (fun () ->
-
snapshot_stats ep_pool.stats
-
) in
-
(endpoint, stats) :: acc
-
) pool.endpoints []
-
)
+
Hashtbl.fold
+
(fun endpoint ep_pool acc ->
+
let stats =
+
Eio.Mutex.use_ro ep_pool.mutex (fun () ->
+
snapshot_stats ep_pool.stats)
+
in
+
(endpoint, stats) :: acc)
+
pool.endpoints [])
(** {1 Public API - Pool Management} *)
···
match Hashtbl.find_opt pool.endpoints endpoint with
| Some _ep_pool ->
Eio.Cancel.protect (fun () ->
-
(* Remove endpoint pool from hashtable *)
-
(* Idle connections will be discarded *)
-
(* Active connections will be closed when returned *)
-
Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () ->
-
Hashtbl.remove pool.endpoints endpoint
-
)
-
)
+
(* Remove endpoint pool from hashtable *)
+
(* Idle connections will be discarded *)
+
(* Active connections will be closed when returned *)
+
Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () ->
+
Hashtbl.remove pool.endpoints endpoint))
| None ->
-
Log.debug (fun m -> m "No endpoint pool found for %a" Endpoint.pp endpoint)
+
Log.debug (fun m ->
+
m "No endpoint pool found for %a" Endpoint.pp endpoint)
+102 -63
lib/conpool.mli
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
(** Conpool - Protocol-agnostic TCP/IP connection pooling library for Eio *)
(** {1 Logging} *)
···
(** Logs source for the main connection pool. Configure logging with:
{[
Logs.Src.set_level Conpool.src (Some Logs.Debug);
-
Logs.set_reporter (Logs_fmt.reporter ());
+
Logs.set_reporter (Logs_fmt.reporter ())
]}
Each submodule also exposes its own log source for fine-grained control:
- {!Endpoint.src} - endpoint operations
-
- {!Tls_config.src} - TLS configuration
-
- {!Config.src} - pool configuration
-
*)
+
- {!Config.src} - pool configuration *)
(** {1 Core Types} *)
+
module Endpoint = Endpoint
(** Network endpoint representation *)
-
module Endpoint : module type of Endpoint
-
-
(** TLS configuration for connection pools *)
-
module Tls_config : module type of Tls_config
+
module Config = Config
(** Configuration for connection pools *)
-
module Config : module type of Config
+
module Stats = Stats
(** Statistics for connection pool endpoints *)
-
module Stats : module type of Stats
+
module Cmd = Cmd
(** Cmdliner terms for connection pool configuration *)
-
module Cmd : module type of Cmd
(** {1 Errors} *)
type error =
| Dns_resolution_failed of { hostname : string }
-
(** DNS resolution failed for the given hostname *)
+
(** DNS resolution failed for the given hostname *)
+
| Connection_failed of {
+
endpoint : Endpoint.t;
+
attempts : int;
+
last_error : string;
+
} (** Failed to establish connection after all retry attempts *)
+
| Connection_timeout of { endpoint : Endpoint.t; timeout : float }
+
(** Connection attempt timed out *)
+
| Invalid_config of string (** Invalid configuration parameter *)
+
| Invalid_endpoint of string (** Invalid endpoint specification *)
-
| Connection_failed of { endpoint : Endpoint.t; attempts : int; last_error : string }
-
(** Failed to establish connection after all retry attempts *)
+
type Eio.Exn.err += E of error
+
(** Extension of Eio's error type for connection pool errors.
-
| Connection_timeout of { endpoint : Endpoint.t; timeout : float }
-
(** Connection attempt timed out *)
+
Pool operations raise [Eio.Io] exceptions with context information added at
+
each layer. The innermost error is often [E error], wrapped with context
+
strings that describe the operation being performed.
-
| Invalid_config of string
-
(** Invalid configuration parameter *)
+
Example error message:
+
{[
+
Eio.Io Conpool Dns_resolution_failed { hostname = "invalid.example" },
+
resolving invalid.example:443,
+
connecting to invalid.example:443,
+
after 3 retry attempts
+
]}
-
| Invalid_endpoint of string
-
(** Invalid endpoint specification *)
+
Use {!pp_error} to format just the error code, or let Eio format the full
+
exception with context. *)
-
exception Pool_error of error
-
(** Exception raised by pool operations.
+
val err : error -> exn
+
(** [err e] is [Eio.Exn.create (E e)].
-
Most pool operations can raise this exception. Use {!pp_error} to get
-
human-readable error messages. *)
+
This converts a connection pool error to an Eio exception, allowing it to
+
be handled uniformly with other Eio I/O errors and enabling context to be
+
added via [Eio.Exn.reraise_with_context]. *)
val pp_error : error Fmt.t
-
(** Pretty-printer for error values. *)
+
(** Pretty-printer for error values (without context).
+
+
For full error messages including context, use [Eio.Exn.pp] or simply let
+
the exception be printed naturally. *)
+
+
(** {1 Connection Types} *)
+
+
type connection_ty = [Eio.Resource.close_ty | Eio.Flow.two_way_ty]
+
(** The type tags for a pooled connection.
+
Connections support reading, writing, shutdown, and closing. *)
+
+
type connection = connection_ty Eio.Resource.t
+
(** A connection resource from the pool. *)
(** {1 Connection Pool} *)
···
sw:Eio.Switch.t ->
net:'net Eio.Net.t ->
clock:'clock Eio.Time.clock ->
-
?tls:Tls_config.t ->
+
?tls:Tls.Config.client ->
?config:Config.t ->
-
unit -> t
-
(** Create connection pool bound to switch.
-
All connections will be closed when switch is released.
+
unit ->
+
t
+
(** Create connection pool bound to switch. All connections will be closed when
+
switch is released.
@param sw Switch for resource management
@param net Network interface for creating connections
@param clock Clock for timeouts and time-based validation
-
@param tls Optional TLS configuration applied to all connections
-
@param config Optional pool configuration (uses Config.default if not provided) *)
+
@param tls
+
Optional TLS client configuration applied to all connections. SNI
+
servername is automatically set to the endpoint's hostname.
+
@param config
+
Optional pool configuration (uses Config.default if not provided) *)
(** {1 Connection Usage} *)
-
val with_connection :
-
t ->
-
Endpoint.t ->
-
([ `Close | `Flow | `R | `Shutdown | `W ] Eio.Resource.t -> 'a) ->
-
'a
-
(** Acquire connection, use it, automatically release back to pool.
+
val connection : sw:Eio.Switch.t -> t -> Endpoint.t -> connection
+
(** [connection ~sw pool endpoint] acquires a connection from the pool.
+
+
The connection is automatically returned to the pool when [sw] finishes.
+
If the connection becomes unhealthy or an error occurs during use, it is
+
closed instead of being returned to the pool.
-
If idle connection available and healthy:
-
- Reuse from pool (validates health first)
-
Else:
-
- Create new connection (may block if endpoint at limit)
+
If an idle connection is available and healthy:
+
- Reuse from pool (validates health first)
-
On success: connection returned to pool for reuse
-
On error: connection closed, not returned to pool
+
Otherwise:
+
- Create new connection (may block if endpoint at limit)
Example:
{[
let endpoint = Conpool.Endpoint.make ~host:"example.com" ~port:443 in
-
Conpool.with_connection pool endpoint (fun conn ->
-
(* Use conn for HTTP request, Redis command, etc. *)
-
Eio.Flow.copy_string "GET / HTTP/1.1\r\n\r\n" conn;
-
let buf = Eio.Buf_read.of_flow conn ~max_size:4096 in
-
Eio.Buf_read.take_all buf
-
)
+
Eio.Switch.run (fun sw ->
+
let conn = Conpool.connection ~sw pool endpoint in
+
Eio.Flow.copy_string "GET / HTTP/1.1\r\n\r\n" conn;
+
let buf = Eio.Buf_read.of_flow conn ~max_size:4096 in
+
Eio.Buf_read.take_all buf)
+
]} *)
+
+
val with_connection : t -> Endpoint.t -> (connection -> 'a) -> 'a
+
(** [with_connection pool endpoint fn] is a convenience wrapper around
+
{!val:connection}.
+
+
Equivalent to:
+
{[
+
Eio.Switch.run (fun sw -> fn (connection ~sw pool endpoint))
]}
-
*)
+
+
Example:
+
{[
+
let endpoint = Conpool.Endpoint.make ~host:"example.com" ~port:443 in
+
Conpool.with_connection pool endpoint (fun conn ->
+
(* Use conn for HTTP request, Redis command, etc. *)
+
Eio.Flow.copy_string "GET / HTTP/1.1\r\n\r\n" conn;
+
let buf = Eio.Buf_read.of_flow conn ~max_size:4096 in
+
Eio.Buf_read.take_all buf)
+
]} *)
(** {1 Statistics & Monitoring} *)
-
val stats :
-
t ->
-
Endpoint.t ->
-
Stats.t
+
val stats : t -> Endpoint.t -> Stats.t
(** Get statistics for specific endpoint *)
-
val all_stats :
-
t ->
-
(Endpoint.t * Stats.t) list
+
val all_stats : t -> (Endpoint.t * Stats.t) list
(** Get statistics for all endpoints in pool *)
(** {1 Pool Management} *)
-
val clear_endpoint :
-
t ->
-
Endpoint.t ->
-
unit
+
val clear_endpoint : t -> Endpoint.t -> unit
(** Clear all cached connections for a specific endpoint.
This removes the endpoint from the pool, discarding all idle connections.
Active connections will continue to work but won't be returned to the pool.
-
Use this when you know an endpoint's connections are no longer valid
-
(e.g., server restarted, network reconfigured, credentials changed).
+
Use this when you know an endpoint's connections are no longer valid (e.g.,
+
server restarted, network reconfigured, credentials changed).
The pool will be automatically cleaned up when its switch is released. *)
+14 -16
lib/endpoint.ml
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
(** Network endpoint representation *)
-
let src = Logs.Src.create "conpool.endpoint" ~doc:"Connection pool endpoint operations"
+
let src =
+
Logs.Src.create "conpool.endpoint" ~doc:"Connection pool endpoint operations"
+
module Log = (val Logs.src_log src : Logs.LOG)
-
type t = {
-
host : string;
-
port : int;
-
}
+
type t = { host : string; port : int }
let make ~host ~port =
(* Validate port range *)
if port < 1 || port > 65535 then
-
invalid_arg (Printf.sprintf "Invalid port number: %d (must be 1-65535)" port);
+
invalid_arg
+
(Printf.sprintf "Invalid port number: %d (must be 1-65535)" port);
(* Validate hostname is not empty *)
-
if String.trim host = "" then
-
invalid_arg "Hostname cannot be empty";
+
if String.trim host = "" then invalid_arg "Hostname cannot be empty";
-
Log.debug (fun m -> m "Creating endpoint: %s:%d" host port);
{ host; port }
let host t = t.host
let port t = t.port
-
-
let equal t1 t2 =
-
String.equal t1.host t2.host && t1.port = t2.port
-
-
let hash t =
-
Hashtbl.hash (t.host, t.port)
-
+
let equal t1 t2 = String.equal t1.host t2.host && t1.port = t2.port
+
let hash t = Hashtbl.hash (t.host, t.port)
let pp = Fmt.of_to_string (fun t -> Printf.sprintf "%s:%d" t.host t.port)
+7 -3
lib/endpoint.mli
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
(** Network endpoint representation *)
(** {1 Logging} *)
···
val src : Logs.Src.t
(** Logs source for endpoint operations. Configure logging with:
{[
-
Logs.Src.set_level Conpool.Endpoint.src (Some Logs.Debug);
-
]}
-
*)
+
Logs.Src.set_level Conpool.Endpoint.src (Some Logs.Debug)
+
]} *)
(** {1 Type} *)
+6 -6
lib/stats.ml
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
(** Statistics for connection pool endpoints *)
type t = {
···
- Reused: %d@,\
- Closed: %d@,\
- Errors: %d@]"
-
t.active
-
t.idle
-
t.total_created
-
t.total_reused
-
t.total_closed
-
t.errors
+
t.active t.idle t.total_created t.total_reused t.total_closed t.errors
+5
lib/stats.mli
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
(** Statistics for connection pool endpoints *)
(** {1 Type} *)
-22
lib/tls_config.ml
···
-
(** TLS configuration for connection pools *)
-
-
let src = Logs.Src.create "conpool.tls" ~doc:"Connection pool TLS configuration"
-
module Log = (val Logs.src_log src : Logs.LOG)
-
-
type t = {
-
config : Tls.Config.client;
-
servername : string option;
-
}
-
-
let make ~config ?servername () =
-
Log.debug (fun m ->
-
m "Creating TLS config with servername: %s"
-
(match servername with Some s -> s | None -> "<default>"));
-
{ config; servername }
-
-
let config t = t.config
-
let servername t = t.servername
-
-
let pp ppf t =
-
Fmt.pf ppf "TLS(servername=%s)"
-
(match t.servername with Some s -> s | None -> "<default>")
-37
lib/tls_config.mli
···
-
(** TLS configuration for connection pools *)
-
-
(** {1 Logging} *)
-
-
val src : Logs.Src.t
-
(** Logs source for TLS configuration operations. Configure logging with:
-
{[
-
Logs.Src.set_level Conpool.Tls_config.src (Some Logs.Debug);
-
]}
-
*)
-
-
(** {1 Type} *)
-
-
type t
-
(** TLS configuration applied to all connections in a pool *)
-
-
(** {1 Construction} *)
-
-
val make : config:Tls.Config.client -> ?servername:string -> unit -> t
-
(** Create TLS configuration.
-
-
@param config TLS client configuration for all connections
-
@param servername Optional SNI server name override. If [None], uses the endpoint's hostname
-
*)
-
-
(** {1 Accessors} *)
-
-
val config : t -> Tls.Config.client
-
(** Get the TLS client configuration. *)
-
-
val servername : t -> string option
-
(** Get the SNI server name override, if any. *)
-
-
(** {1 Pretty-printing} *)
-
-
val pp : t Fmt.t
-
(** Pretty-printer for TLS configuration. *)
+9
test/dune
···
+
(executable
+
(name stress_test)
+
(modules stress_test)
+
(libraries conpool eio eio_main unix))
+
+
(rule
+
(alias runtest)
+
(deps stress_test.exe)
+
(action (run ./stress_test.exe --all -o stress_test_results.json)))
+993
test/stress_test.ml
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
+
(** Stress test framework for conpool
+
+
Spawns variable number of echo servers on random ports, then exercises
+
the connection pool with multiple parallel client fibers.
+
Collects detailed event traces for visualization.
+
*)
+
+
(** Configuration for the stress test *)
+
type config = {
+
name : string; (** Test name for identification *)
+
num_servers : int; (** Number of echo servers to spawn *)
+
num_clients : int; (** Number of client connections per server *)
+
messages_per_client : int; (** Number of messages each client sends *)
+
max_parallel_clients : int; (** Maximum concurrent client fibers *)
+
message_size : int; (** Size of each message in bytes *)
+
pool_size : int; (** Max connections per endpoint *)
+
}
+
+
let default_config = {
+
name = "default";
+
num_servers = 3;
+
num_clients = 10;
+
messages_per_client = 5;
+
max_parallel_clients = 20;
+
message_size = 64;
+
pool_size = 5;
+
}
+
+
(** Test presets for different scenarios *)
+
let presets = [
+
(* High connection reuse - few connections, many messages *)
+
{ name = "high_reuse";
+
num_servers = 2;
+
num_clients = 20;
+
messages_per_client = 50;
+
max_parallel_clients = 10;
+
message_size = 32;
+
pool_size = 3;
+
};
+
(* Many endpoints - test endpoint scaling *)
+
{ name = "many_endpoints";
+
num_servers = 10;
+
num_clients = 10;
+
messages_per_client = 10;
+
max_parallel_clients = 50;
+
message_size = 64;
+
pool_size = 5;
+
};
+
(* High concurrency - stress parallel connections *)
+
{ name = "high_concurrency";
+
num_servers = 3;
+
num_clients = 100;
+
messages_per_client = 5;
+
max_parallel_clients = 100;
+
message_size = 64;
+
pool_size = 20;
+
};
+
(* Large messages - test throughput *)
+
{ name = "large_messages";
+
num_servers = 3;
+
num_clients = 20;
+
messages_per_client = 20;
+
max_parallel_clients = 30;
+
message_size = 1024;
+
pool_size = 10;
+
};
+
(* Constrained pool - force queuing *)
+
{ name = "constrained_pool";
+
num_servers = 2;
+
num_clients = 50;
+
messages_per_client = 10;
+
max_parallel_clients = 50;
+
message_size = 64;
+
pool_size = 2;
+
};
+
(* Burst traffic - many clients, few messages each *)
+
{ name = "burst_traffic";
+
num_servers = 5;
+
num_clients = 200;
+
messages_per_client = 2;
+
max_parallel_clients = 100;
+
message_size = 32;
+
pool_size = 15;
+
};
+
]
+
+
(** Extended stress test - 100x messages, 10x clients/servers *)
+
let extended_preset = {
+
name = "extended_stress";
+
num_servers = 30;
+
num_clients = 1000;
+
messages_per_client = 100;
+
max_parallel_clients = 500;
+
message_size = 128;
+
pool_size = 50;
+
}
+
+
(** Statistics collected during test *)
+
type latency_stats = {
+
mutable count : int;
+
mutable total : float;
+
mutable min : float;
+
mutable max : float;
+
mutable latencies : (float * float) list; (* (timestamp, latency) pairs *)
+
}
+
+
let create_latency_stats () = {
+
count = 0;
+
total = 0.0;
+
min = Float.infinity;
+
max = 0.0;
+
latencies = [];
+
}
+
+
let update_latency stats latency timestamp =
+
stats.count <- stats.count + 1;
+
stats.total <- stats.total +. latency;
+
stats.min <- min stats.min latency;
+
stats.max <- max stats.max latency;
+
stats.latencies <- (timestamp, latency) :: stats.latencies
+
+
(** Generate a random message of given size *)
+
let generate_message size =
+
let chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" in
+
let len = String.length chars in
+
String.init size (fun _ -> chars.[Random.int len])
+
+
(** Echo server handler - echoes back everything it receives *)
+
let handle_echo_client flow _addr =
+
let buf = Cstruct.create 4096 in
+
let rec loop () =
+
match Eio.Flow.single_read flow buf with
+
| n ->
+
let data = Cstruct.sub buf 0 n in
+
Eio.Flow.write flow [data];
+
loop ()
+
| exception End_of_file -> ()
+
in
+
loop ()
+
+
(** Start an echo server on a random port, returns the port number *)
+
let start_echo_server ~sw net =
+
let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 0) in
+
let listening_socket = Eio.Net.listen net ~sw ~backlog:128 ~reuse_addr:true addr in
+
let actual_addr = Eio.Net.listening_addr listening_socket in
+
let port = match actual_addr with
+
| `Tcp (_, port) -> port
+
| _ -> failwith "Expected TCP address"
+
in
+
+
Eio.Fiber.fork_daemon ~sw (fun () ->
+
try
+
while true do
+
Eio.Net.accept_fork ~sw listening_socket
+
~on_error:(fun _ -> ())
+
handle_echo_client
+
done;
+
`Stop_daemon
+
with Eio.Cancel.Cancelled _ ->
+
`Stop_daemon
+
);
+
+
port
+
+
(** Client test: connect via pool, send message, verify echo *)
+
let run_client_test ~clock ~test_start_time pool endpoint message latency_stats errors =
+
let msg_len = String.length message in
+
let start_time = Eio.Time.now clock in
+
+
try
+
Conpool.with_connection pool endpoint (fun flow ->
+
(* Send message *)
+
Eio.Flow.copy_string message flow;
+
Eio.Flow.copy_string "\n" flow;
+
+
(* Read echo response *)
+
let response = Eio.Buf_read.of_flow flow ~max_size:(msg_len + 1) in
+
let echoed = Eio.Buf_read.line response in
+
+
let end_time = Eio.Time.now clock in
+
let latency = (end_time -. start_time) *. 1000.0 in (* Convert to ms *)
+
let relative_time = (end_time -. test_start_time) *. 1000.0 in (* ms since test start *)
+
+
if String.equal echoed message then begin
+
update_latency latency_stats latency relative_time
+
end else begin
+
incr errors
+
end
+
)
+
with _ex ->
+
incr errors
+
+
(** Run a single client that sends multiple messages *)
+
let run_client ~clock ~test_start_time pool endpoints (cfg : config) latency_stats errors client_id =
+
for _ = 1 to cfg.messages_per_client do
+
let endpoint_idx = Random.int (Array.length endpoints) in
+
let endpoint = endpoints.(endpoint_idx) in
+
let message = Printf.sprintf "c%d-%s" client_id (generate_message cfg.message_size) in
+
run_client_test ~clock ~test_start_time pool endpoint message latency_stats errors
+
done
+
+
(** Pool statistics aggregated from all endpoints *)
+
type pool_stats = {
+
total_created : int;
+
total_reused : int;
+
total_closed : int;
+
active : int;
+
idle : int;
+
pool_errors : int;
+
}
+
+
(** Test result type *)
+
type test_result = {
+
test_name : string;
+
num_servers : int;
+
num_clients : int;
+
messages_per_client : int;
+
pool_size : int;
+
duration : float;
+
total_messages : int;
+
total_errors : int;
+
throughput : float;
+
avg_latency : float;
+
min_latency : float;
+
max_latency : float;
+
latency_data : (float * float) list; (* (timestamp, latency) pairs for visualization *)
+
pool_stats : pool_stats;
+
}
+
+
(** Main stress test runner - returns a test result *)
+
let run_stress_test ~env (cfg : config) : test_result =
+
let net = Eio.Stdenv.net env in
+
let clock = Eio.Stdenv.clock env in
+
+
let latency_stats = create_latency_stats () in
+
let errors = ref 0 in
+
let ports = ref [||] in
+
+
let result : test_result option ref = ref None in
+
+
begin
+
try
+
Eio.Switch.run @@ fun sw ->
+
(* Start echo servers *)
+
ports := Array.init cfg.num_servers (fun _ ->
+
start_echo_server ~sw net
+
);
+
+
Eio.Time.sleep clock 0.05;
+
+
let endpoints = Array.map (fun port ->
+
Conpool.Endpoint.make ~host:"127.0.0.1" ~port
+
) !ports in
+
+
(* Create connection pool *)
+
let pool_config = Conpool.Config.make
+
~max_connections_per_endpoint:cfg.pool_size
+
~max_idle_time:30.0
+
~max_connection_lifetime:120.0
+
~connect_timeout:5.0
+
~connect_retry_count:3
+
()
+
in
+
+
let pool = Conpool.create ~sw ~net ~clock ~config:pool_config () in
+
+
(* Record start time *)
+
let start_time = Eio.Time.now clock in
+
+
(* Run clients in parallel *)
+
let total_clients = cfg.num_servers * cfg.num_clients in
+
let client_ids = List.init total_clients (fun i -> i) in
+
Eio.Fiber.List.iter ~max_fibers:cfg.max_parallel_clients
+
(fun client_id ->
+
run_client ~clock ~test_start_time:start_time pool endpoints cfg latency_stats errors client_id)
+
client_ids;
+
+
let end_time = Eio.Time.now clock in
+
let duration = end_time -. start_time in
+
+
(* Collect pool statistics from all endpoints *)
+
let all_stats = Conpool.all_stats pool in
+
let pool_stats = List.fold_left (fun acc (_, stats) ->
+
{
+
total_created = acc.total_created + Conpool.Stats.total_created stats;
+
total_reused = acc.total_reused + Conpool.Stats.total_reused stats;
+
total_closed = acc.total_closed + Conpool.Stats.total_closed stats;
+
active = acc.active + Conpool.Stats.active stats;
+
idle = acc.idle + Conpool.Stats.idle stats;
+
pool_errors = acc.pool_errors + Conpool.Stats.errors stats;
+
}
+
) { total_created = 0; total_reused = 0; total_closed = 0; active = 0; idle = 0; pool_errors = 0 } all_stats in
+
+
(* Build result *)
+
let r : test_result = {
+
test_name = cfg.name;
+
num_servers = cfg.num_servers;
+
num_clients = cfg.num_clients;
+
messages_per_client = cfg.messages_per_client;
+
pool_size = cfg.pool_size;
+
duration;
+
total_messages = latency_stats.count;
+
total_errors = !errors;
+
throughput = float_of_int latency_stats.count /. duration;
+
avg_latency = if latency_stats.count > 0
+
then latency_stats.total /. float_of_int latency_stats.count
+
else 0.0;
+
min_latency = if latency_stats.count > 0 then latency_stats.min else 0.0;
+
max_latency = latency_stats.max;
+
latency_data = List.rev latency_stats.latencies;
+
pool_stats;
+
} in
+
result := Some r;
+
+
Eio.Switch.fail sw Exit
+
with Exit -> ()
+
end;
+
+
match !result with
+
| Some r -> r
+
| None -> failwith "Test failed to produce result"
+
+
(** Convert result to JSON string *)
+
let result_to_json result =
+
Printf.sprintf {|{
+
"test_name": "%s",
+
"num_servers": %d,
+
"num_clients": %d,
+
"messages_per_client": %d,
+
"duration": %.3f,
+
"total_messages": %d,
+
"total_errors": %d,
+
"throughput": %.2f,
+
"avg_latency": %.2f,
+
"min_latency": %.2f,
+
"max_latency": %.2f
+
}|}
+
result.test_name
+
result.num_servers
+
result.num_clients
+
result.messages_per_client
+
result.duration
+
result.total_messages
+
result.total_errors
+
result.throughput
+
result.avg_latency
+
result.min_latency
+
result.max_latency
+
+
(** Escape strings for JavaScript *)
+
let js_escape s =
+
let buf = Buffer.create (String.length s) in
+
String.iter (fun c ->
+
match c with
+
| '\\' -> Buffer.add_string buf "\\\\"
+
| '"' -> Buffer.add_string buf "\\\""
+
| '\n' -> Buffer.add_string buf "\\n"
+
| '\r' -> Buffer.add_string buf "\\r"
+
| '\t' -> Buffer.add_string buf "\\t"
+
| _ -> Buffer.add_char buf c
+
) s;
+
Buffer.contents buf
+
+
(** Calculate histogram buckets for latency data *)
+
let calculate_histogram latencies num_buckets =
+
if List.length latencies = 0 then ([], []) else
+
let latency_values = List.map snd latencies in
+
let min_lat = List.fold_left min Float.infinity latency_values in
+
let max_lat = List.fold_left max 0.0 latency_values in
+
let bucket_width = (max_lat -. min_lat) /. float_of_int num_buckets in
+
+
let buckets = Array.make num_buckets 0 in
+
List.iter (fun lat ->
+
let bucket_idx = min (num_buckets - 1) (int_of_float ((lat -. min_lat) /. bucket_width)) in
+
buckets.(bucket_idx) <- buckets.(bucket_idx) + 1
+
) latency_values;
+
+
let bucket_labels = List.init num_buckets (fun i ->
+
let start = min_lat +. (float_of_int i *. bucket_width) in
+
Printf.sprintf "%.2f" start
+
) in
+
let bucket_counts = Array.to_list buckets in
+
(bucket_labels, bucket_counts)
+
+
(** Generate HTML report from test results *)
+
let generate_html_report results =
+
let timestamp = Unix.time () |> Unix.gmtime in
+
let date_str = Printf.sprintf "%04d-%02d-%02d %02d:%02d:%02d UTC"
+
(timestamp.Unix.tm_year + 1900)
+
(timestamp.Unix.tm_mon + 1)
+
timestamp.Unix.tm_mday
+
timestamp.Unix.tm_hour
+
timestamp.Unix.tm_min
+
timestamp.Unix.tm_sec
+
in
+
+
(* Calculate summary statistics *)
+
let total_messages = List.fold_left (fun acc r -> acc + r.total_messages) 0 results in
+
let total_errors = List.fold_left (fun acc r -> acc + r.total_errors) 0 results in
+
let total_duration = List.fold_left (fun acc r -> acc +. r.duration) 0.0 results in
+
+
(* Generate JavaScript arrays for comparison charts *)
+
let test_names = String.concat ", " (List.map (fun r -> Printf.sprintf "\"%s\"" (js_escape r.test_name)) results) in
+
let throughputs = String.concat ", " (List.map (fun r -> Printf.sprintf "%.2f" r.throughput) results) in
+
let avg_latencies = String.concat ", " (List.map (fun r -> Printf.sprintf "%.2f" r.avg_latency) results) in
+
let error_rates = String.concat ", " (List.map (fun r ->
+
if r.total_messages > 0 then
+
Printf.sprintf "%.2f" (float_of_int r.total_errors /. float_of_int r.total_messages *. 100.0)
+
else "0.0"
+
) results) in
+
+
(* Generate per-test detailed sections with histograms and timelines *)
+
let test_details = String.concat "\n" (List.mapi (fun idx r ->
+
let (hist_labels, hist_counts) = calculate_histogram r.latency_data 20 in
+
let hist_labels_str = String.concat ", " (List.map (fun s -> Printf.sprintf "\"%s\"" s) hist_labels) in
+
let hist_counts_str = String.concat ", " (List.map string_of_int hist_counts) in
+
+
(* Sample data points for timeline (take every Nth point if too many) *)
+
let max_points = 500 in
+
let sample_rate = max 1 ((List.length r.latency_data) / max_points) in
+
let sampled_data = List.filteri (fun i _ -> i mod sample_rate = 0) r.latency_data in
+
let timeline_data = String.concat ", " (List.map (fun (t, l) ->
+
Printf.sprintf "{x: %.2f, y: %.3f}" t l
+
) sampled_data) in
+
+
Printf.sprintf {|
+
<div class="test-detail">
+
<h3>%s</h3>
+
<div class="compact-grid">
+
<div class="compact-metric"><span class="label">Servers:</span> <span class="value">%d</span></div>
+
<div class="compact-metric"><span class="label">Clients:</span> <span class="value">%d</span></div>
+
<div class="compact-metric"><span class="label">Msgs/Client:</span> <span class="value">%d</span></div>
+
<div class="compact-metric"><span class="label">Pool Size:</span> <span class="value">%d</span></div>
+
<div class="compact-metric"><span class="label">Total Msgs:</span> <span class="value">%d</span></div>
+
<div class="compact-metric"><span class="label">Duration:</span> <span class="value">%.2fs</span></div>
+
<div class="compact-metric highlight"><span class="label">Throughput:</span> <span class="value">%.0f/s</span></div>
+
<div class="compact-metric highlight"><span class="label">Avg Lat:</span> <span class="value">%.2fms</span></div>
+
<div class="compact-metric"><span class="label">Min Lat:</span> <span class="value">%.2fms</span></div>
+
<div class="compact-metric"><span class="label">Max Lat:</span> <span class="value">%.2fms</span></div>
+
<div class="compact-metric %s"><span class="label">Errors:</span> <span class="value">%d</span></div>
+
</div>
+
<div class="compact-grid" style="margin-top: 0.5rem;">
+
<div class="compact-metric"><span class="label">Conns Created:</span> <span class="value">%d</span></div>
+
<div class="compact-metric"><span class="label">Conns Reused:</span> <span class="value">%d</span></div>
+
<div class="compact-metric"><span class="label">Conns Closed:</span> <span class="value">%d</span></div>
+
<div class="compact-metric"><span class="label">Active:</span> <span class="value">%d</span></div>
+
<div class="compact-metric"><span class="label">Idle:</span> <span class="value">%d</span></div>
+
<div class="compact-metric"><span class="label">Reuse Rate:</span> <span class="value">%.1f%%%%</span></div>
+
</div>
+
<div class="chart-row">
+
<div class="chart-half">
+
<h4>Latency Distribution</h4>
+
<canvas id="hist_%d"></canvas>
+
</div>
+
<div class="chart-half">
+
<h4>Latency Timeline</h4>
+
<canvas id="timeline_%d"></canvas>
+
</div>
+
</div>
+
</div>
+
<script>
+
new Chart(document.getElementById('hist_%d'), {
+
type: 'bar',
+
data: {
+
labels: [%s],
+
datasets: [{
+
label: 'Count',
+
data: [%s],
+
backgroundColor: 'rgba(102, 126, 234, 0.6)',
+
borderColor: 'rgba(102, 126, 234, 1)',
+
borderWidth: 1
+
}]
+
},
+
options: {
+
responsive: true,
+
maintainAspectRatio: false,
+
plugins: { legend: { display: false } },
+
scales: {
+
x: { title: { display: true, text: 'Latency (ms)' } },
+
y: { beginAtZero: true, title: { display: true, text: 'Count' } }
+
}
+
}
+
});
+
+
new Chart(document.getElementById('timeline_%d'), {
+
type: 'scatter',
+
data: {
+
datasets: [{
+
label: 'Latency',
+
data: [%s],
+
backgroundColor: 'rgba(118, 75, 162, 0.5)',
+
borderColor: 'rgba(118, 75, 162, 0.8)',
+
pointRadius: 2
+
}]
+
},
+
options: {
+
responsive: true,
+
maintainAspectRatio: false,
+
plugins: { legend: { display: false } },
+
scales: {
+
x: { title: { display: true, text: 'Time (ms)' } },
+
y: { beginAtZero: true, title: { display: true, text: 'Latency (ms)' } }
+
}
+
}
+
});
+
</script>|}
+
(js_escape r.test_name)
+
r.num_servers
+
r.num_clients
+
r.messages_per_client
+
r.pool_size
+
r.total_messages
+
r.duration
+
r.throughput
+
r.avg_latency
+
r.min_latency
+
r.max_latency
+
(if r.total_errors > 0 then "error" else "")
+
r.total_errors
+
r.pool_stats.total_created
+
r.pool_stats.total_reused
+
r.pool_stats.total_closed
+
r.pool_stats.active
+
r.pool_stats.idle
+
(if r.pool_stats.total_created > 0 then
+
(float_of_int r.pool_stats.total_reused /. float_of_int r.pool_stats.total_created *. 100.0)
+
else 0.0)
+
idx idx idx
+
hist_labels_str
+
hist_counts_str
+
idx
+
timeline_data
+
) results) in
+
+
Printf.sprintf {|<!DOCTYPE html>
+
<html lang="en">
+
<head>
+
<meta charset="UTF-8">
+
<meta name="viewport" content="width=device-width, initial-scale=1.0">
+
<title>Connection Pool Stress Test Results</title>
+
<script src="https://cdn.jsdelivr.net/npm/chart.js@4.4.0/dist/chart.umd.min.js"></script>
+
<style>
+
* { margin: 0; padding: 0; box-sizing: border-box; }
+
body {
+
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
+
background: #f5f5f5;
+
padding: 1rem;
+
color: #333;
+
font-size: 14px;
+
}
+
.container { max-width: 1600px; margin: 0 auto; }
+
h1 {
+
color: #667eea;
+
text-align: center;
+
margin-bottom: 0.3rem;
+
font-size: 1.8rem;
+
}
+
.subtitle {
+
text-align: center;
+
margin-bottom: 1rem;
+
font-size: 0.9rem;
+
color: #666;
+
}
+
.summary {
+
background: white;
+
border-radius: 6px;
+
padding: 1rem;
+
margin-bottom: 1rem;
+
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
+
}
+
.summary h2 {
+
color: #667eea;
+
margin-bottom: 0.8rem;
+
font-size: 1.2rem;
+
}
+
.summary-grid {
+
display: grid;
+
grid-template-columns: repeat(auto-fit, minmax(120px, 1fr));
+
gap: 0.8rem;
+
}
+
.summary-metric {
+
text-align: center;
+
padding: 0.8rem;
+
background: linear-gradient(135deg, #667eea 0%%, #764ba2 100%%);
+
border-radius: 4px;
+
color: white;
+
}
+
.summary-metric-label {
+
font-size: 0.75rem;
+
opacity: 0.9;
+
margin-bottom: 0.3rem;
+
}
+
.summary-metric-value {
+
font-size: 1.4rem;
+
font-weight: bold;
+
}
+
.comparison {
+
background: white;
+
border-radius: 6px;
+
padding: 1rem;
+
margin-bottom: 1rem;
+
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
+
}
+
.comparison h2 {
+
color: #667eea;
+
margin-bottom: 0.8rem;
+
font-size: 1.2rem;
+
}
+
.comparison-charts {
+
display: grid;
+
grid-template-columns: repeat(3, 1fr);
+
gap: 1rem;
+
}
+
.comparison-chart {
+
height: 200px;
+
position: relative;
+
}
+
.test-detail {
+
background: white;
+
border-radius: 6px;
+
padding: 1rem;
+
margin-bottom: 1rem;
+
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
+
border-left: 3px solid #667eea;
+
}
+
.test-detail h3 {
+
color: #764ba2;
+
margin-bottom: 0.6rem;
+
font-size: 1.1rem;
+
}
+
.test-detail h4 {
+
color: #666;
+
margin-bottom: 0.4rem;
+
font-size: 0.9rem;
+
font-weight: 500;
+
}
+
.compact-grid {
+
display: grid;
+
grid-template-columns: repeat(auto-fit, minmax(100px, 1fr));
+
gap: 0.4rem;
+
margin-bottom: 0.8rem;
+
font-size: 0.85rem;
+
}
+
.compact-metric {
+
background: #f8f9fa;
+
padding: 0.4rem 0.6rem;
+
border-radius: 3px;
+
display: flex;
+
justify-content: space-between;
+
align-items: center;
+
}
+
.compact-metric .label {
+
color: #666;
+
font-weight: 500;
+
}
+
.compact-metric .value {
+
color: #333;
+
font-weight: 600;
+
}
+
.compact-metric.highlight {
+
background: linear-gradient(135deg, #667eea 0%%, #764ba2 100%%);
+
color: white;
+
}
+
.compact-metric.highlight .label,
+
.compact-metric.highlight .value {
+
color: white;
+
}
+
.compact-metric.error {
+
background: #fee;
+
border: 1px solid #fcc;
+
}
+
.chart-row {
+
display: grid;
+
grid-template-columns: 1fr 1fr;
+
gap: 1rem;
+
}
+
.chart-half {
+
position: relative;
+
height: 220px;
+
}
+
@media (max-width: 1200px) {
+
.comparison-charts { grid-template-columns: 1fr; }
+
.chart-row { grid-template-columns: 1fr; }
+
}
+
@media (max-width: 768px) {
+
.compact-grid { grid-template-columns: repeat(2, 1fr); }
+
}
+
</style>
+
</head>
+
<body>
+
<div class="container">
+
<h1>Connection Pool Stress Test Results</h1>
+
<div class="subtitle">%s</div>
+
+
<div class="summary">
+
<h2>Summary</h2>
+
<div class="summary-grid">
+
<div class="summary-metric">
+
<div class="summary-metric-label">Tests</div>
+
<div class="summary-metric-value">%d</div>
+
</div>
+
<div class="summary-metric">
+
<div class="summary-metric-label">Messages</div>
+
<div class="summary-metric-value">%s</div>
+
</div>
+
<div class="summary-metric">
+
<div class="summary-metric-label">Errors</div>
+
<div class="summary-metric-value">%d</div>
+
</div>
+
<div class="summary-metric">
+
<div class="summary-metric-label">Duration</div>
+
<div class="summary-metric-value">%.1fs</div>
+
</div>
+
</div>
+
</div>
+
+
<div class="comparison">
+
<h2>Comparison</h2>
+
<div class="comparison-charts">
+
<div class="comparison-chart"><canvas id="cmpThroughput"></canvas></div>
+
<div class="comparison-chart"><canvas id="cmpLatency"></canvas></div>
+
<div class="comparison-chart"><canvas id="cmpErrors"></canvas></div>
+
</div>
+
</div>
+
+
%s
+
</div>
+
+
<script>
+
const testNames = [%s];
+
const throughputs = [%s];
+
const avgLatencies = [%s];
+
const errorRates = [%s];
+
+
const cc = {
+
primary: 'rgba(102, 126, 234, 0.8)',
+
secondary: 'rgba(118, 75, 162, 0.8)',
+
danger: 'rgba(220, 53, 69, 0.8)',
+
};
+
+
new Chart(document.getElementById('cmpThroughput'), {
+
type: 'bar',
+
data: {
+
labels: testNames,
+
datasets: [{
+
label: 'msg/s',
+
data: throughputs,
+
backgroundColor: cc.primary,
+
borderColor: cc.primary,
+
borderWidth: 1
+
}]
+
},
+
options: {
+
responsive: true,
+
maintainAspectRatio: false,
+
plugins: {
+
legend: { display: false },
+
title: { display: true, text: 'Throughput (msg/s)' }
+
},
+
scales: { y: { beginAtZero: true } }
+
}
+
});
+
+
new Chart(document.getElementById('cmpLatency'), {
+
type: 'bar',
+
data: {
+
labels: testNames,
+
datasets: [{
+
label: 'ms',
+
data: avgLatencies,
+
backgroundColor: cc.secondary,
+
borderColor: cc.secondary,
+
borderWidth: 1
+
}]
+
},
+
options: {
+
responsive: true,
+
maintainAspectRatio: false,
+
plugins: {
+
legend: { display: false },
+
title: { display: true, text: 'Avg Latency (ms)' }
+
},
+
scales: { y: { beginAtZero: true } }
+
}
+
});
+
+
new Chart(document.getElementById('cmpErrors'), {
+
type: 'bar',
+
data: {
+
labels: testNames,
+
datasets: [{
+
label: '%%',
+
data: errorRates,
+
backgroundColor: cc.danger,
+
borderColor: cc.danger,
+
borderWidth: 1
+
}]
+
},
+
options: {
+
responsive: true,
+
maintainAspectRatio: false,
+
plugins: {
+
legend: { display: false },
+
title: { display: true, text: 'Error Rate (%%)' }
+
},
+
scales: { y: { beginAtZero: true } }
+
}
+
});
+
</script>
+
</body>
+
</html>|}
+
date_str
+
(List.length results)
+
(if total_messages >= 1000 then
+
Printf.sprintf "%d,%03d" (total_messages / 1000) (total_messages mod 1000)
+
else
+
string_of_int total_messages)
+
total_errors
+
total_duration
+
test_details
+
test_names
+
throughputs
+
avg_latencies
+
error_rates
+
+
(** Run all preset tests and return results *)
+
let run_all_presets ~env =
+
List.map (fun config ->
+
Printf.eprintf "Running test: %s\n%!" config.name;
+
run_stress_test ~env config
+
) presets
+
+
(** Parse command line arguments *)
+
type mode =
+
| Single of config
+
| AllPresets
+
| Extended
+
| ListPresets
+
+
let parse_args () =
+
let mode = ref (Single default_config) in
+
let name = ref default_config.name in
+
let num_servers = ref default_config.num_servers in
+
let num_clients = ref default_config.num_clients in
+
let messages_per_client = ref default_config.messages_per_client in
+
let max_parallel = ref default_config.max_parallel_clients in
+
let message_size = ref default_config.message_size in
+
let pool_size = ref default_config.pool_size in
+
let output_file = ref "stress_test_results.json" in
+
+
let specs = [
+
("--all", Arg.Unit (fun () -> mode := AllPresets),
+
"Run all preset test configurations");
+
("--extended", Arg.Unit (fun () -> mode := Extended),
+
"Run extended stress test (30 servers, 1000 clients, 100 msgs each = 3M messages)");
+
("--list", Arg.Unit (fun () -> mode := ListPresets),
+
"List available presets");
+
("--preset", Arg.String (fun p ->
+
match List.find_opt (fun c -> c.name = p) presets with
+
| Some c -> mode := Single c
+
| None -> failwith (Printf.sprintf "Unknown preset: %s" p)),
+
"Use a named preset configuration");
+
("-n", Arg.Set_string name, "Test name");
+
("-s", Arg.Set_int num_servers, Printf.sprintf "Number of servers (default: %d)" default_config.num_servers);
+
("-c", Arg.Set_int num_clients, Printf.sprintf "Clients per server (default: %d)" default_config.num_clients);
+
("-m", Arg.Set_int messages_per_client, Printf.sprintf "Messages per client (default: %d)" default_config.messages_per_client);
+
("-p", Arg.Set_int max_parallel, Printf.sprintf "Max parallel clients (default: %d)" default_config.max_parallel_clients);
+
("-b", Arg.Set_int message_size, Printf.sprintf "Message size (default: %d)" default_config.message_size);
+
("-P", Arg.Set_int pool_size, Printf.sprintf "Pool size per endpoint (default: %d)" default_config.pool_size);
+
("-o", Arg.Set_string output_file, "Output JSON file (default: stress_test_results.json)");
+
] in
+
+
let usage = "Usage: stress_test [options]\n\nOptions:" in
+
Arg.parse specs (fun _ -> ()) usage;
+
+
let config = {
+
name = !name;
+
num_servers = !num_servers;
+
num_clients = !num_clients;
+
messages_per_client = !messages_per_client;
+
max_parallel_clients = !max_parallel;
+
message_size = !message_size;
+
pool_size = !pool_size;
+
} in
+
+
(!mode, config, !output_file)
+
+
let () =
+
Random.self_init ();
+
let (mode, custom_config, output_file) = parse_args () in
+
+
match mode with
+
| ListPresets ->
+
Printf.printf "Available presets:\n";
+
List.iter (fun c ->
+
Printf.printf " %s: %d servers, %d clients, %d msgs/client, pool=%d\n"
+
c.name c.num_servers c.num_clients c.messages_per_client c.pool_size
+
) presets
+
+
| Single config ->
+
let config = if config.name = "default" then custom_config else config in
+
Eio_main.run @@ fun env ->
+
let result = run_stress_test ~env config in
+
let results = [result] in
+
+
(* Write JSON *)
+
let json = Printf.sprintf "[%s]" (result_to_json result) in
+
let oc = open_out output_file in
+
output_string oc json;
+
close_out oc;
+
Printf.printf "Results written to %s\n" output_file;
+
+
(* Write HTML *)
+
let html_file =
+
if Filename.check_suffix output_file ".json" then
+
Filename.chop_suffix output_file ".json" ^ ".html"
+
else
+
output_file ^ ".html"
+
in
+
let html = generate_html_report results in
+
let oc_html = open_out html_file in
+
output_string oc_html html;
+
close_out oc_html;
+
Printf.printf "HTML report written to %s\n" html_file;
+
+
Printf.printf "Test: %s - %d messages, %.2f msg/s, %.2fms avg latency, %d errors\n"
+
result.test_name result.total_messages result.throughput result.avg_latency result.total_errors
+
+
| AllPresets ->
+
Eio_main.run @@ fun env ->
+
let results = run_all_presets ~env in
+
+
(* Write JSON *)
+
let json = "[" ^ String.concat ",\n" (List.map result_to_json results) ^ "]" in
+
let oc = open_out output_file in
+
output_string oc json;
+
close_out oc;
+
Printf.printf "Results written to %s\n" output_file;
+
+
(* Write HTML *)
+
let html_file =
+
if Filename.check_suffix output_file ".json" then
+
Filename.chop_suffix output_file ".json" ^ ".html"
+
else
+
output_file ^ ".html"
+
in
+
let html = generate_html_report results in
+
let oc_html = open_out html_file in
+
output_string oc_html html;
+
close_out oc_html;
+
Printf.printf "HTML report written to %s\n" html_file;
+
+
List.iter (fun r ->
+
Printf.printf " %s: %d messages, %.2f msg/s, %.2fms avg latency, %d errors\n"
+
r.test_name r.total_messages r.throughput r.avg_latency r.total_errors
+
) results
+
+
| Extended ->
+
Printf.printf "Running extended stress test: %d servers, %d clients/server, %d msgs/client\n"
+
extended_preset.num_servers extended_preset.num_clients extended_preset.messages_per_client;
+
Printf.printf "Total messages: %d\n%!"
+
(extended_preset.num_servers * extended_preset.num_clients * extended_preset.messages_per_client);
+
Eio_main.run @@ fun env ->
+
let result = run_stress_test ~env extended_preset in
+
let results = [result] in
+
+
(* Write JSON *)
+
let json = Printf.sprintf "[%s]" (result_to_json result) in
+
let oc = open_out output_file in
+
output_string oc json;
+
close_out oc;
+
Printf.printf "Results written to %s\n" output_file;
+
+
(* Write HTML *)
+
let html_file =
+
if Filename.check_suffix output_file ".json" then
+
Filename.chop_suffix output_file ".json" ^ ".html"
+
else
+
output_file ^ ".html"
+
in
+
let html = generate_html_report results in
+
let oc_html = open_out html_file in
+
output_string oc_html html;
+
close_out oc_html;
+
Printf.printf "HTML report written to %s\n" html_file;
+
+
Printf.printf "Test: %s - %d messages, %.2f msg/s, %.2fms avg latency, %d errors\n"
+
result.test_name result.total_messages result.throughput result.avg_latency result.total_errors