TCP/TLS connection pooling for Eio

init

+17 -1
.gitignore
···
-
_build
+
# OCaml build artifacts
+
_build/
+
*.install
+
*.merlin
+
+
# Third-party sources (fetch locally with opam source)
+
third_party/
+
+
# Editor and OS files
+
.DS_Store
+
*.swp
+
*~
+
.vscode/
+
.idea/
+
+
# Opam local switch
+
_opam/
+1
.ocamlformat
···
+
version=0.28.1
+53
.tangled/workflows/build.yml
···
+
when:
+
- event: ["push", "pull_request"]
+
branch: ["main"]
+
+
engine: nixery
+
+
dependencies:
+
nixpkgs:
+
- shell
+
- stdenv
+
- findutils
+
- binutils
+
- libunwind
+
- ncurses
+
- opam
+
- git
+
- gawk
+
- gnupatch
+
- gnum4
+
- gnumake
+
- gnutar
+
- gnused
+
- gnugrep
+
- diffutils
+
- gzip
+
- bzip2
+
- gcc
+
- ocaml
+
- pkg-config
+
+
steps:
+
- name: opam
+
command: |
+
opam init --disable-sandboxing -a -y
+
- name: repo
+
command: |
+
opam repo add aoah https://tangled.org/anil.recoil.org/aoah-opam-repo.git
+
- name: switch
+
command: |
+
opam install . --confirm-level=unsafe-yes --deps-only
+
- name: build
+
command: |
+
opam exec -- dune build -p conpool
+
- name: switch-test
+
command: |
+
opam install . --confirm-level=unsafe-yes --deps-only --with-test
+
- name: test
+
command: |
+
opam exec -- dune runtest --verbose
+
- name: doc
+
command: |
+
opam install -y odoc
+
opam exec -- dune build @doc
+3
CHANGES.md
···
+
# v1.0.0 (dev)
+
+
- Initial release of Conpool
+15
LICENSE.md
···
+
ISC License
+
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>
+
+
Permission to use, copy, modify, and distribute this software for any
+
purpose with or without fee is hereby granted, provided that the above
+
copyright notice and this permission notice appear in all copies.
+
+
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+113
README.md
···
+
# Conpool - Protocol-agnostic Connection Pooling for Eio
+
+
Conpool is a connection pooling library built on Eio that manages TCP connection lifecycles, validates connection health, and provides per-endpoint resource limiting for any TCP-based protocol.
+
+
## Key Features
+
+
- **Protocol-agnostic**: Works with HTTP, Redis, PostgreSQL, or any TCP-based protocol
+
- **Health validation**: Automatically validates connections before reuse
+
- **Per-endpoint limits**: Independent connection limits and pooling for each endpoint
+
- **TLS support**: Optional TLS configuration for secure connections
+
- **Statistics & monitoring**: Track connection usage, hits/misses, and health status
+
- **Built on Eio**: Leverages Eio's structured concurrency and resource management
+
+
## Usage
+
+
Basic example establishing a connection pool:
+
+
```ocaml
+
open Eio.Std
+
+
let run env =
+
Switch.run (fun sw ->
+
(* Create a connection pool *)
+
let pool = Conpool.create
+
~sw
+
~net:(Eio.Stdenv.net env)
+
~clock:(Eio.Stdenv.clock env)
+
()
+
in
+
+
(* Define an endpoint *)
+
let endpoint = Conpool.Endpoint.make ~host:"example.com" ~port:80 in
+
+
(* Use a connection from the pool *)
+
Conpool.with_connection pool endpoint (fun conn ->
+
Eio.Flow.copy_string "GET / HTTP/1.1\r\nHost: example.com\r\n\r\n" conn;
+
let buf = Eio.Buf_read.of_flow conn ~max_size:4096 in
+
Eio.Buf_read.take_all buf
+
)
+
)
+
```
+
+
With TLS configuration:
+
+
```ocaml
+
let run env =
+
Switch.run (fun sw ->
+
(* Create TLS configuration *)
+
let tls = Conpool.Tls_config.make
+
~authenticator:(Ca_certs.authenticator ())
+
()
+
in
+
+
(* Create pool with TLS *)
+
let pool = Conpool.create
+
~sw
+
~net:(Eio.Stdenv.net env)
+
~clock:(Eio.Stdenv.clock env)
+
~tls
+
()
+
in
+
+
let endpoint = Conpool.Endpoint.make ~host:"example.com" ~port:443 in
+
Conpool.with_connection pool endpoint (fun conn ->
+
(* Use TLS-encrypted connection *)
+
...
+
)
+
)
+
```
+
+
Custom pool configuration:
+
+
```ocaml
+
let config = Conpool.Config.make
+
~max_connections_per_endpoint:20
+
~max_idle_per_endpoint:5
+
~connection_timeout:10.0
+
~validation_interval:300.0
+
()
+
in
+
+
let pool = Conpool.create ~sw ~net ~clock ~config ()
+
```
+
+
Monitor pool statistics:
+
+
```ocaml
+
let stats = Conpool.stats pool endpoint in
+
Printf.printf "Active: %d, Idle: %d, Hits: %d, Misses: %d\n"
+
(Conpool.Stats.active_connections stats)
+
(Conpool.Stats.idle_connections stats)
+
(Conpool.Stats.cache_hits stats)
+
(Conpool.Stats.cache_misses stats)
+
```
+
+
## Installation
+
+
```
+
opam install conpool
+
```
+
+
## Documentation
+
+
API documentation is available at https://tangled.org/@anil.recoil.org/ocaml-conpool or via:
+
+
```
+
opam install conpool
+
odig doc conpool
+
```
+
+
## License
+
+
ISC
+8 -8
conpool.opam
···
synopsis: "Protocol-agnostic TCP/IP connection pooling library for Eio"
description:
"Conpool is a connection pooling library built on Eio.Pool that manages TCP connection lifecycles, validates connection health, and provides per-endpoint resource limiting for any TCP-based protocol (HTTP, Redis, PostgreSQL, etc.)"
-
maintainer: ["Your Name"]
-
authors: ["Your Name"]
-
license: "MIT"
-
homepage: "https://github.com/username/conpool"
-
bug-reports: "https://github.com/username/conpool/issues"
+
maintainer: ["Anil Madhavapeddy <anil@recoil.org>"]
+
authors: ["Anil Madhavapeddy <anil@recoil.org>"]
+
license: "ISC"
+
homepage: "https://tangled.org/@anil.recoil.org/ocaml-conpool"
+
bug-reports: "https://tangled.org/@anil.recoil.org/ocaml-conpool/issues"
depends: [
-
"ocaml"
-
"dune" {>= "3.0" & >= "3.0"}
+
"ocaml" {>= "5.1.0"}
+
"dune" {>= "3.20" & >= "3.0"}
"eio"
"tls-eio" {>= "1.0"}
"logs"
···
"@doc" {with-doc}
]
]
-
dev-repo: "git+https://github.com/username/conpool.git"
+
x-maintenance-intent: ["(latest)"]
+11 -11
dune-project
···
-
(lang dune 3.0)
+
(lang dune 3.20)
+
(name conpool)
(generate_opam_files true)
-
(source
-
(github username/conpool))
-
-
(authors "Your Name")
-
-
(maintainers "Your Name")
-
-
(license MIT)
+
(license ISC)
+
(authors "Anil Madhavapeddy <anil@recoil.org>")
+
(homepage "https://tangled.org/@anil.recoil.org/ocaml-conpool")
+
(maintainers "Anil Madhavapeddy <anil@recoil.org>")
+
(bug_reports "https://tangled.org/@anil.recoil.org/ocaml-conpool/issues")
+
(maintenance_intent "(latest)")
(package
(name conpool)
(synopsis "Protocol-agnostic TCP/IP connection pooling library for Eio")
(description "Conpool is a connection pooling library built on Eio.Pool that manages TCP connection lifecycles, validates connection health, and provides per-endpoint resource limiting for any TCP-based protocol (HTTP, Redis, PostgreSQL, etc.)")
(depends
-
ocaml
+
(ocaml (>= 5.1.0))
(dune (>= 3.0))
eio
(tls-eio (>= 1.0))
logs
fmt
-
cmdliner))
+
cmdliner
+
(odoc :with-doc)))
+30 -25
lib/cmd.ml
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
(** Cmdliner terms for connection pool configuration *)
open Cmdliner
let max_connections_per_endpoint =
let doc = "Maximum concurrent connections per endpoint." in
-
Arg.(value & opt int 10 & info ["max-connections-per-endpoint"] ~doc ~docv:"NUM")
+
Arg.(
+
value & opt int 10
+
& info [ "max-connections-per-endpoint" ] ~doc ~docv:"NUM")
let max_idle_time =
let doc = "Maximum time a connection can sit idle in seconds." in
-
Arg.(value & opt float 60.0 & info ["max-idle-time"] ~doc ~docv:"SECONDS")
+
Arg.(value & opt float 60.0 & info [ "max-idle-time" ] ~doc ~docv:"SECONDS")
let max_connection_lifetime =
let doc = "Maximum connection age in seconds." in
-
Arg.(value & opt float 300.0 & info ["max-connection-lifetime"] ~doc ~docv:"SECONDS")
+
Arg.(
+
value & opt float 300.0
+
& info [ "max-connection-lifetime" ] ~doc ~docv:"SECONDS")
let max_connection_uses =
let doc = "Maximum times a connection can be reused (omit for unlimited)." in
-
Arg.(value & opt (some int) None & info ["max-connection-uses"] ~doc ~docv:"NUM")
+
Arg.(
+
value
+
& opt (some int) None
+
& info [ "max-connection-uses" ] ~doc ~docv:"NUM")
let connect_timeout =
let doc = "Connection timeout in seconds." in
-
Arg.(value & opt float 10.0 & info ["connect-timeout"] ~doc ~docv:"SECONDS")
+
Arg.(value & opt float 10.0 & info [ "connect-timeout" ] ~doc ~docv:"SECONDS")
let connect_retry_count =
let doc = "Number of connection retry attempts." in
-
Arg.(value & opt int 3 & info ["connect-retry-count"] ~doc ~docv:"NUM")
+
Arg.(value & opt int 3 & info [ "connect-retry-count" ] ~doc ~docv:"NUM")
let connect_retry_delay =
let doc = "Initial retry delay in seconds (with exponential backoff)." in
-
Arg.(value & opt float 0.1 & info ["connect-retry-delay"] ~doc ~docv:"SECONDS")
+
Arg.(
+
value & opt float 0.1 & info [ "connect-retry-delay" ] ~doc ~docv:"SECONDS")
let config =
-
let make max_conn max_idle max_lifetime max_uses timeout retry_count retry_delay =
-
Config.make
-
~max_connections_per_endpoint:max_conn
-
~max_idle_time:max_idle
-
~max_connection_lifetime:max_lifetime
-
?max_connection_uses:max_uses
-
~connect_timeout:timeout
-
~connect_retry_count:retry_count
-
~connect_retry_delay:retry_delay
-
()
+
let make max_conn max_idle max_lifetime max_uses timeout retry_count
+
retry_delay =
+
Config.make ~max_connections_per_endpoint:max_conn ~max_idle_time:max_idle
+
~max_connection_lifetime:max_lifetime ?max_connection_uses:max_uses
+
~connect_timeout:timeout ~connect_retry_count:retry_count
+
~connect_retry_delay:retry_delay ()
in
-
Term.(const make
-
$ max_connections_per_endpoint
-
$ max_idle_time
-
$ max_connection_lifetime
-
$ max_connection_uses
-
$ connect_timeout
-
$ connect_retry_count
-
$ connect_retry_delay)
+
Term.(
+
const make $ max_connections_per_endpoint $ max_idle_time
+
$ max_connection_lifetime $ max_connection_uses $ connect_timeout
+
$ connect_retry_count $ connect_retry_delay)
+20 -22
lib/cmd.mli
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
(** Cmdliner terms for connection pool configuration *)
(** {1 Configuration Terms} *)
val max_connections_per_endpoint : int Cmdliner.Term.t
-
(** Cmdliner term for maximum connections per endpoint.
-
Default: 10
-
Flag: [--max-connections-per-endpoint] *)
+
(** Cmdliner term for maximum connections per endpoint. Default: 10 Flag:
+
[--max-connections-per-endpoint] *)
val max_idle_time : float Cmdliner.Term.t
-
(** Cmdliner term for maximum idle time in seconds.
-
Default: 60.0
-
Flag: [--max-idle-time] *)
+
(** Cmdliner term for maximum idle time in seconds. Default: 60.0 Flag:
+
[--max-idle-time] *)
val max_connection_lifetime : float Cmdliner.Term.t
-
(** Cmdliner term for maximum connection lifetime in seconds.
-
Default: 300.0
+
(** Cmdliner term for maximum connection lifetime in seconds. Default: 300.0
Flag: [--max-connection-lifetime] *)
val max_connection_uses : int option Cmdliner.Term.t
-
(** Cmdliner term for maximum connection uses.
-
Default: None (unlimited)
-
Flag: [--max-connection-uses] *)
+
(** Cmdliner term for maximum connection uses. Default: None (unlimited) Flag:
+
[--max-connection-uses] *)
val connect_timeout : float Cmdliner.Term.t
-
(** Cmdliner term for connection timeout in seconds.
-
Default: 10.0
-
Flag: [--connect-timeout] *)
+
(** Cmdliner term for connection timeout in seconds. Default: 10.0 Flag:
+
[--connect-timeout] *)
val connect_retry_count : int Cmdliner.Term.t
-
(** Cmdliner term for number of connection retry attempts.
-
Default: 3
-
Flag: [--connect-retry-count] *)
+
(** Cmdliner term for number of connection retry attempts. Default: 3 Flag:
+
[--connect-retry-count] *)
val connect_retry_delay : float Cmdliner.Term.t
-
(** Cmdliner term for initial retry delay in seconds.
-
Default: 0.1
-
Flag: [--connect-retry-delay] *)
+
(** Cmdliner term for initial retry delay in seconds. Default: 0.1 Flag:
+
[--connect-retry-delay] *)
(** {1 Combined Terms} *)
val config : Config.t Cmdliner.Term.t
(** Cmdliner term that combines all configuration options into a {!Config.t}.
-
This term can be used in your application's main command to accept
-
all connection pool configuration options from the command line. *)
+
This term can be used in your application's main command to accept all
+
connection pool configuration options from the command line. *)
+46 -37
lib/config.ml
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
(** Configuration for connection pools *)
let src = Logs.Src.create "conpool.config" ~doc:"Connection pool configuration"
+
module Log = (val Logs.src_log src : Logs.LOG)
type t = {
···
max_idle_time : float;
max_connection_lifetime : float;
max_connection_uses : int option;
-
health_check : ([`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t -> bool) option;
+
health_check :
+
([ `Close | `Flow | `R | `Shutdown | `W ] Eio.Resource.t -> bool) option;
connect_timeout : float option;
connect_retry_count : int;
connect_retry_delay : float;
···
on_connection_reused : (Endpoint.t -> unit) option;
}
-
let make
-
?(max_connections_per_endpoint = 10)
-
?(max_idle_time = 60.0)
-
?(max_connection_lifetime = 300.0)
-
?max_connection_uses
-
?health_check
-
?(connect_timeout = 10.0)
-
?(connect_retry_count = 3)
-
?(connect_retry_delay = 0.1)
-
?on_connection_created
-
?on_connection_closed
-
?on_connection_reused
-
() =
+
let make ?(max_connections_per_endpoint = 10) ?(max_idle_time = 60.0)
+
?(max_connection_lifetime = 300.0) ?max_connection_uses ?health_check
+
?(connect_timeout = 10.0) ?(connect_retry_count = 3)
+
?(connect_retry_delay = 0.1) ?on_connection_created ?on_connection_closed
+
?on_connection_reused () =
(* Validate parameters *)
if max_connections_per_endpoint <= 0 then
-
invalid_arg (Printf.sprintf "max_connections_per_endpoint must be positive, got %d"
-
max_connections_per_endpoint);
+
invalid_arg
+
(Printf.sprintf "max_connections_per_endpoint must be positive, got %d"
+
max_connections_per_endpoint);
if max_idle_time <= 0.0 then
-
invalid_arg (Printf.sprintf "max_idle_time must be positive, got %.2f" max_idle_time);
+
invalid_arg
+
(Printf.sprintf "max_idle_time must be positive, got %.2f" max_idle_time);
if max_connection_lifetime <= 0.0 then
-
invalid_arg (Printf.sprintf "max_connection_lifetime must be positive, got %.2f"
-
max_connection_lifetime);
+
invalid_arg
+
(Printf.sprintf "max_connection_lifetime must be positive, got %.2f"
+
max_connection_lifetime);
(match max_connection_uses with
-
| Some n when n <= 0 ->
-
invalid_arg (Printf.sprintf "max_connection_uses must be positive, got %d" n)
-
| _ -> ());
+
| Some n when n <= 0 ->
+
invalid_arg
+
(Printf.sprintf "max_connection_uses must be positive, got %d" n)
+
| _ -> ());
if connect_timeout <= 0.0 then
-
invalid_arg (Printf.sprintf "connect_timeout must be positive, got %.2f" connect_timeout);
+
invalid_arg
+
(Printf.sprintf "connect_timeout must be positive, got %.2f"
+
connect_timeout);
if connect_retry_count < 0 then
-
invalid_arg (Printf.sprintf "connect_retry_count must be non-negative, got %d"
-
connect_retry_count);
+
invalid_arg
+
(Printf.sprintf "connect_retry_count must be non-negative, got %d"
+
connect_retry_count);
if connect_retry_delay <= 0.0 then
-
invalid_arg (Printf.sprintf "connect_retry_delay must be positive, got %.2f"
-
connect_retry_delay);
+
invalid_arg
+
(Printf.sprintf "connect_retry_delay must be positive, got %.2f"
+
connect_retry_delay);
Log.debug (fun m ->
-
m "Creating config: max_connections=%d, max_idle=%.1fs, max_lifetime=%.1fs"
-
max_connections_per_endpoint max_idle_time max_connection_lifetime);
+
m
+
"Creating config: max_connections=%d, max_idle=%.1fs, \
+
max_lifetime=%.1fs"
+
max_connections_per_endpoint max_idle_time max_connection_lifetime);
{
max_connections_per_endpoint;
max_idle_time;
···
}
let default = make ()
-
let max_connections_per_endpoint t = t.max_connections_per_endpoint
let max_idle_time t = t.max_idle_time
let max_connection_lifetime t = t.max_connection_lifetime
···
- connect_timeout: %s@,\
- connect_retry_count: %d@,\
- connect_retry_delay: %.2fs@]"
-
t.max_connections_per_endpoint
-
t.max_idle_time
-
t.max_connection_lifetime
-
(match t.max_connection_uses with Some n -> string_of_int n | None -> "unlimited")
-
(match t.connect_timeout with Some f -> Fmt.str "%.1fs" f | None -> "none")
-
t.connect_retry_count
-
t.connect_retry_delay
+
t.max_connections_per_endpoint t.max_idle_time t.max_connection_lifetime
+
(match t.max_connection_uses with
+
| Some n -> string_of_int n
+
| None -> "unlimited")
+
(match t.connect_timeout with
+
| Some f -> Fmt.str "%.1fs" f
+
| None -> "none")
+
t.connect_retry_count t.connect_retry_delay
+25 -15
lib/config.mli
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
(** Configuration for connection pools *)
(** {1 Logging} *)
···
val src : Logs.Src.t
(** Logs source for configuration operations. Configure logging with:
{[
-
Logs.Src.set_level Conpool.Config.src (Some Logs.Debug);
-
]}
-
*)
+
Logs.Src.set_level Conpool.Config.src (Some Logs.Debug)
+
]} *)
(** {1 Type} *)
···
?max_idle_time:float ->
?max_connection_lifetime:float ->
?max_connection_uses:int ->
-
?health_check:([ `Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t -> bool) ->
+
?health_check:
+
([ `Close | `Flow | `R | `Shutdown | `W ] Eio.Resource.t -> bool) ->
?connect_timeout:float ->
?connect_retry_count:int ->
?connect_retry_delay:float ->
?on_connection_created:(Endpoint.t -> unit) ->
?on_connection_closed:(Endpoint.t -> unit) ->
?on_connection_reused:(Endpoint.t -> unit) ->
-
unit -> t
+
unit ->
+
t
(** Create pool configuration with optional parameters.
-
@param max_connections_per_endpoint Maximum concurrent connections per endpoint (default: 10)
-
@param max_idle_time Maximum time a connection can sit idle in seconds (default: 60.0)
-
@param max_connection_lifetime Maximum connection age in seconds (default: 300.0)
-
@param max_connection_uses Maximum times a connection can be reused (default: unlimited)
+
@param max_connections_per_endpoint
+
Maximum concurrent connections per endpoint (default: 10)
+
@param max_idle_time
+
Maximum time a connection can sit idle in seconds (default: 60.0)
+
@param max_connection_lifetime
+
Maximum connection age in seconds (default: 300.0)
+
@param max_connection_uses
+
Maximum times a connection can be reused (default: unlimited)
@param health_check Custom health check function (default: none)
@param connect_timeout Connection timeout in seconds (default: 10.0)
@param connect_retry_count Number of connection retry attempts (default: 3)
-
@param connect_retry_delay Initial retry delay in seconds, with exponential backoff (default: 0.1)
+
@param connect_retry_delay
+
Initial retry delay in seconds, with exponential backoff (default: 0.1)
@param on_connection_created Hook called when a connection is created
@param on_connection_closed Hook called when a connection is closed
-
@param on_connection_reused Hook called when a connection is reused
-
*)
+
@param on_connection_reused Hook called when a connection is reused *)
val default : t
(** Sensible defaults for most use cases:
···
- connect_timeout: 10.0s
- connect_retry_count: 3
- connect_retry_delay: 0.1s
-
- hooks: none
-
*)
+
- hooks: none *)
(** {1 Accessors} *)
···
val max_connection_uses : t -> int option
(** Get maximum connection uses, if any. *)
-
val health_check : t -> ([ `Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t -> bool) option
+
val health_check :
+
t -> ([ `Close | `Flow | `R | `Shutdown | `W ] Eio.Resource.t -> bool) option
(** Get custom health check function, if any. *)
val connect_timeout : t -> float option
+16 -13
lib/connection.ml
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
(** Internal connection representation - not exposed in public API *)
-
let src = Logs.Src.create "conpool.connection" ~doc:"Connection pool internal connection management"
+
let src =
+
Logs.Src.create "conpool.connection"
+
~doc:"Connection pool internal connection management"
+
module Log = (val Logs.src_log src : Logs.LOG)
type t = {
-
flow : [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t;
+
flow : [ `Close | `Flow | `R | `Shutdown | `W ] Eio.Resource.t;
created_at : float;
mutable last_used : float;
mutable use_count : int;
···
let flow t = t.flow
let endpoint t = t.endpoint
let created_at t = t.created_at
-
-
let last_used t =
-
Eio.Mutex.use_ro t.mutex (fun () -> t.last_used)
-
-
let use_count t =
-
Eio.Mutex.use_ro t.mutex (fun () -> t.use_count)
+
let last_used t = Eio.Mutex.use_ro t.mutex (fun () -> t.last_used)
+
let use_count t = Eio.Mutex.use_ro t.mutex (fun () -> t.use_count)
let update_usage t ~now =
Eio.Mutex.use_rw ~protect:true t.mutex (fun () ->
-
t.last_used <- now;
-
t.use_count <- t.use_count + 1
-
)
+
t.last_used <- now;
+
t.use_count <- t.use_count + 1)
let pp ppf t =
let uses = Eio.Mutex.use_ro t.mutex (fun () -> t.use_count) in
-
Fmt.pf ppf "Connection(endpoint=%a, age=%.2fs, uses=%d)"
-
Endpoint.pp t.endpoint
+
Fmt.pf ppf "Connection(endpoint=%a, age=%.2fs, uses=%d)" Endpoint.pp
+
t.endpoint
(Unix.gettimeofday () -. t.created_at)
uses
+318 -275
lib/conpool.ml
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
(** Conpool - Protocol-agnostic TCP/IP connection pooling library for Eio *)
let src = Logs.Src.create "conpool" ~doc:"Connection pooling library"
+
module Log = (val Logs.src_log src : Logs.LOG)
(* Re-export submodules *)
···
type error =
| Dns_resolution_failed of { hostname : string }
-
| Connection_failed of { endpoint : Endpoint.t; attempts : int; last_error : string }
+
| Connection_failed of {
+
endpoint : Endpoint.t;
+
attempts : int;
+
last_error : string;
+
}
| Connection_timeout of { endpoint : Endpoint.t; timeout : float }
| Invalid_config of string
| Invalid_endpoint of string
···
| Dns_resolution_failed { hostname } ->
Fmt.pf ppf "DNS resolution failed for hostname: %s" hostname
| Connection_failed { endpoint; attempts; last_error } ->
-
Fmt.pf ppf "Failed to connect to %a after %d attempts: %s"
-
Endpoint.pp endpoint attempts last_error
+
Fmt.pf ppf "Failed to connect to %a after %d attempts: %s" Endpoint.pp
+
endpoint attempts last_error
| Connection_timeout { endpoint; timeout } ->
-
Fmt.pf ppf "Connection timeout to %a after %.2fs"
-
Endpoint.pp endpoint timeout
-
| Invalid_config msg ->
-
Fmt.pf ppf "Invalid configuration: %s" msg
-
| Invalid_endpoint msg ->
-
Fmt.pf ppf "Invalid endpoint: %s" msg
+
Fmt.pf ppf "Connection timeout to %a after %.2fs" Endpoint.pp endpoint
+
timeout
+
| Invalid_config msg -> Fmt.pf ppf "Invalid configuration: %s" msg
+
| Invalid_endpoint msg -> Fmt.pf ppf "Invalid endpoint: %s" msg
type endp_stats = {
mutable active : int;
···
type t = T : ('clock Eio.Time.clock, 'net Eio.Net.t) internal -> t
-
module EndpointTbl = Hashtbl.Make(struct
+
module EndpointTbl = Hashtbl.Make (struct
type t = Endpoint.t
+
let equal = Endpoint.equal
let hash = Endpoint.hash
end)
-
let get_time (pool : ('clock, 'net) internal) =
-
Eio.Time.now pool.clock
+
let get_time (pool : ('clock, 'net) internal) = Eio.Time.now pool.clock
-
let create_endp_stats () = {
-
active = 0;
-
idle = 0;
-
total_created = 0;
-
total_reused = 0;
-
total_closed = 0;
-
errors = 0;
-
}
+
let create_endp_stats () =
+
{
+
active = 0;
+
idle = 0;
+
total_created = 0;
+
total_reused = 0;
+
total_closed = 0;
+
errors = 0;
+
}
let snapshot_stats (stats : endp_stats) : Stats.t =
-
Stats.make
-
~active:stats.active
-
~idle:stats.idle
-
~total_created:stats.total_created
-
~total_reused:stats.total_reused
-
~total_closed:stats.total_closed
-
~errors:stats.errors
+
Stats.make ~active:stats.active ~idle:stats.idle
+
~total_created:stats.total_created ~total_reused:stats.total_reused
+
~total_closed:stats.total_closed ~errors:stats.errors
(** {1 DNS Resolution} *)
let resolve_endpoint (pool : ('clock, 'net) internal) endpoint =
Log.debug (fun m -> m "Resolving %a..." Endpoint.pp endpoint);
-
let addrs = Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint) ~service:(string_of_int (Endpoint.port endpoint)) in
+
let addrs =
+
Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint)
+
~service:(string_of_int (Endpoint.port endpoint))
+
in
Log.debug (fun m -> m "Got address list for %a" Endpoint.pp endpoint);
match addrs with
| addr :: _ ->
-
Log.debug (fun m -> m "Resolved %a to %a"
-
Endpoint.pp endpoint Eio.Net.Sockaddr.pp addr);
+
Log.debug (fun m ->
+
m "Resolved %a to %a" Endpoint.pp endpoint Eio.Net.Sockaddr.pp addr);
addr
| [] ->
-
Log.err (fun m -> m "Failed to resolve hostname: %s" (Endpoint.host endpoint));
-
raise (Pool_error (Dns_resolution_failed { hostname = Endpoint.host endpoint }))
+
Log.err (fun m ->
+
m "Failed to resolve hostname: %s" (Endpoint.host endpoint));
+
raise
+
(Pool_error
+
(Dns_resolution_failed { hostname = Endpoint.host endpoint }))
(** {1 Connection Creation with Retry} *)
-
let rec create_connection_with_retry (pool : ('clock, 'net) internal) endpoint attempt last_error =
+
let rec create_connection_with_retry (pool : ('clock, 'net) internal) endpoint
+
attempt last_error =
let retry_count = Config.connect_retry_count pool.config in
if attempt > retry_count then begin
-
Log.err (fun m -> m "Failed to connect to %a after %d attempts"
-
Endpoint.pp endpoint retry_count);
-
raise (Pool_error (Connection_failed { endpoint; attempts = retry_count; last_error }))
+
Log.err (fun m ->
+
m "Failed to connect to %a after %d attempts" Endpoint.pp endpoint
+
retry_count);
+
raise
+
(Pool_error
+
(Connection_failed { endpoint; attempts = retry_count; last_error }))
end;
-
Log.debug (fun m -> m "Connecting to %a (attempt %d/%d)"
-
Endpoint.pp endpoint attempt retry_count);
+
Log.debug (fun m ->
+
m "Connecting to %a (attempt %d/%d)" Endpoint.pp endpoint attempt
+
retry_count);
try
let addr = resolve_endpoint pool endpoint in
···
let socket =
match Config.connect_timeout pool.config with
| Some timeout ->
-
Eio.Time.with_timeout_exn pool.clock timeout
-
(fun () -> Eio.Net.connect ~sw:pool.sw pool.net addr)
-
| None ->
-
Eio.Net.connect ~sw:pool.sw pool.net addr
+
Eio.Time.with_timeout_exn pool.clock timeout (fun () ->
+
Eio.Net.connect ~sw:pool.sw pool.net addr)
+
| None -> Eio.Net.connect ~sw:pool.sw pool.net addr
in
-
Log.debug (fun m -> m "TCP connection established to %a" Endpoint.pp endpoint);
+
Log.debug (fun m ->
+
m "TCP connection established to %a" Endpoint.pp endpoint);
-
let flow = match pool.tls with
-
| None -> (socket :> [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t)
+
let flow =
+
match pool.tls with
+
| None ->
+
(socket :> [ `Close | `Flow | `R | `Shutdown | `W ] Eio.Resource.t)
| Some tls_cfg ->
-
Log.debug (fun m -> m "Initiating TLS handshake with %a" Endpoint.pp endpoint);
-
let host = match Tls_config.servername tls_cfg with
+
Log.debug (fun m ->
+
m "Initiating TLS handshake with %a" Endpoint.pp endpoint);
+
let host =
+
match Tls_config.servername tls_cfg with
| Some name -> Domain_name.(host_exn (of_string_exn name))
-
| None -> Domain_name.(host_exn (of_string_exn (Endpoint.host endpoint)))
+
| None ->
+
Domain_name.(host_exn (of_string_exn (Endpoint.host endpoint)))
in
-
let tls_flow = Tls_eio.client_of_flow ~host (Tls_config.config tls_cfg) socket in
-
Log.info (fun m -> m "TLS connection established to %a" Endpoint.pp endpoint);
-
(tls_flow :> [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t)
+
let tls_flow =
+
Tls_eio.client_of_flow ~host (Tls_config.config tls_cfg) socket
+
in
+
Log.info (fun m ->
+
m "TLS connection established to %a" Endpoint.pp endpoint);
+
(tls_flow :> [ `Close | `Flow | `R | `Shutdown | `W ] Eio.Resource.t)
in
let now = get_time pool in
···
endpoint;
mutex = Eio.Mutex.create ();
}
-
with
| Eio.Time.Timeout as e ->
-
Log.warn (fun m -> m "Connection timeout to %a (attempt %d)" Endpoint.pp endpoint attempt);
+
Log.warn (fun m ->
+
m "Connection timeout to %a (attempt %d)" Endpoint.pp endpoint attempt);
let error_msg = Printexc.to_string e in
if attempt >= Config.connect_retry_count pool.config then
(* Last attempt - convert to our error type *)
···
| Some timeout ->
raise (Pool_error (Connection_timeout { endpoint; timeout }))
| None ->
-
raise (Pool_error (Connection_failed { endpoint; attempts = attempt; last_error = error_msg }))
+
raise
+
(Pool_error
+
(Connection_failed
+
{ endpoint; attempts = attempt; last_error = error_msg }))
else begin
(* Retry with exponential backoff *)
-
let delay = Config.connect_retry_delay pool.config *. (2.0 ** float_of_int (attempt - 1)) in
+
let delay =
+
Config.connect_retry_delay pool.config
+
*. (2.0 ** float_of_int (attempt - 1))
+
in
Eio.Time.sleep pool.clock delay;
create_connection_with_retry pool endpoint (attempt + 1) error_msg
end
| e ->
(* Other errors - retry with backoff *)
let error_msg = Printexc.to_string e in
-
Log.warn (fun m -> m "Connection attempt %d to %a failed: %s"
-
attempt Endpoint.pp endpoint error_msg);
+
Log.warn (fun m ->
+
m "Connection attempt %d to %a failed: %s" attempt Endpoint.pp
+
endpoint error_msg);
if attempt < Config.connect_retry_count pool.config then (
-
let delay = Config.connect_retry_delay pool.config *. (2.0 ** float_of_int (attempt - 1)) in
+
let delay =
+
Config.connect_retry_delay pool.config
+
*. (2.0 ** float_of_int (attempt - 1))
+
in
Eio.Time.sleep pool.clock delay;
-
create_connection_with_retry pool endpoint (attempt + 1) error_msg
-
) else
-
raise (Pool_error (Connection_failed { endpoint; attempts = attempt; last_error = error_msg }))
+
create_connection_with_retry pool endpoint (attempt + 1) error_msg)
+
else
+
raise
+
(Pool_error
+
(Connection_failed
+
{ endpoint; attempts = attempt; last_error = error_msg }))
let create_connection (pool : ('clock, 'net) internal) endpoint =
create_connection_with_retry pool endpoint 1 "No attempts made"
···
let age = now -. Connection.created_at conn in
let max_lifetime = Config.max_connection_lifetime pool.config in
if age > max_lifetime then begin
-
Log.debug (fun m -> m "Connection to %a unhealthy: exceeded max lifetime (%.2fs > %.2fs)"
-
Endpoint.pp (Connection.endpoint conn) age max_lifetime);
+
Log.debug (fun m ->
+
m "Connection to %a unhealthy: exceeded max lifetime (%.2fs > %.2fs)"
+
Endpoint.pp (Connection.endpoint conn) age max_lifetime);
false
end
-
(* Check idle time *)
-
else begin
+
else begin
let max_idle = Config.max_idle_time pool.config in
-
if (now -. Connection.last_used conn) > max_idle then begin
+
if now -. Connection.last_used conn > max_idle then begin
let idle_time = now -. Connection.last_used conn in
-
Log.debug (fun m -> m "Connection to %a unhealthy: exceeded max idle time (%.2fs > %.2fs)"
-
Endpoint.pp (Connection.endpoint conn) idle_time max_idle);
-
false
-
end
-
-
(* Check use count *)
-
else if (match Config.max_connection_uses pool.config with
-
| Some max -> Connection.use_count conn >= max
-
| None -> false) then begin
-
Log.debug (fun m -> m "Connection to %a unhealthy: exceeded max use count (%d)"
-
Endpoint.pp (Connection.endpoint conn) (Connection.use_count conn));
+
Log.debug (fun m ->
+
m "Connection to %a unhealthy: exceeded max idle time (%.2fs > %.2fs)"
+
Endpoint.pp (Connection.endpoint conn) idle_time max_idle);
false
-
end
-
-
(* Optional: custom health check *)
-
else if (match Config.health_check pool.config with
-
| Some check ->
-
(try
-
let healthy = check (Connection.flow conn) in
-
if not healthy then
-
Log.debug (fun m -> m "Connection to %a failed custom health check"
-
Endpoint.pp (Connection.endpoint conn));
-
not healthy
-
with e ->
-
Log.debug (fun m -> m "Connection to %a health check raised exception: %s"
-
Endpoint.pp (Connection.endpoint conn) (Printexc.to_string e));
-
true) (* Exception in health check = unhealthy *)
-
| None -> false) then
+
end (* Check use count *)
+
else if
+
match Config.max_connection_uses pool.config with
+
| Some max -> Connection.use_count conn >= max
+
| None -> false
+
then begin
+
Log.debug (fun m ->
+
m "Connection to %a unhealthy: exceeded max use count (%d)"
+
Endpoint.pp (Connection.endpoint conn)
+
(Connection.use_count conn));
false
-
-
(* Optional: check if socket still connected *)
+
end (* Optional: custom health check *)
+
else if
+
match Config.health_check pool.config with
+
| Some check -> (
+
try
+
let healthy = check (Connection.flow conn) in
+
if not healthy then
+
Log.debug (fun m ->
+
m "Connection to %a failed custom health check" Endpoint.pp
+
(Connection.endpoint conn));
+
not healthy
+
with e ->
+
Log.debug (fun m ->
+
m "Connection to %a health check raised exception: %s"
+
Endpoint.pp (Connection.endpoint conn) (Printexc.to_string e));
+
true (* Exception in health check = unhealthy *))
+
| None -> false
+
then false (* Optional: check if socket still connected *)
else if check_readable then
try
(* TODO avsm: a sockopt for this? *)
true
-
with
-
| _ -> false
-
+
with _ -> false
else begin
-
Log.debug (fun m -> m "Connection to %a is healthy (age=%.2fs, idle=%.2fs, uses=%d)"
-
Endpoint.pp (Connection.endpoint conn)
-
age
-
(now -. Connection.last_used conn)
-
(Connection.use_count conn));
+
Log.debug (fun m ->
+
m "Connection to %a is healthy (age=%.2fs, idle=%.2fs, uses=%d)"
+
Endpoint.pp (Connection.endpoint conn) age
+
(now -. Connection.last_used conn)
+
(Connection.use_count conn));
true
end
end
···
(** {1 Internal Pool Operations} *)
let close_internal (pool : ('clock, 'net) internal) conn =
-
Log.debug (fun m -> m "Closing connection to %a (age=%.2fs, uses=%d)"
-
Endpoint.pp (Connection.endpoint conn)
-
(get_time pool -. Connection.created_at conn)
-
(Connection.use_count conn));
+
Log.debug (fun m ->
+
m "Closing connection to %a (age=%.2fs, uses=%d)" Endpoint.pp
+
(Connection.endpoint conn)
+
(get_time pool -. Connection.created_at conn)
+
(Connection.use_count conn));
Eio.Cancel.protect (fun () ->
-
try
-
Eio.Flow.close (Connection.flow conn)
-
with _ -> ()
-
);
+
try Eio.Flow.close (Connection.flow conn) with _ -> ());
(* Call hook if configured *)
-
Option.iter (fun f -> f (Connection.endpoint conn)) (Config.on_connection_closed pool.config)
+
Option.iter
+
(fun f -> f (Connection.endpoint conn))
+
(Config.on_connection_closed pool.config)
let get_or_create_endpoint_pool (pool : ('clock, 'net) internal) endpoint =
-
Log.debug (fun m -> m "Getting or creating endpoint pool for %a" Endpoint.pp endpoint);
+
Log.debug (fun m ->
+
m "Getting or creating endpoint pool for %a" Endpoint.pp endpoint);
(* First try with read lock *)
-
match Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
-
Hashtbl.find_opt pool.endpoints endpoint
-
) with
+
match
+
Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
+
Hashtbl.find_opt pool.endpoints endpoint)
+
with
| Some ep_pool ->
-
Log.debug (fun m -> m "Found existing endpoint pool for %a" Endpoint.pp endpoint);
+
Log.debug (fun m ->
+
m "Found existing endpoint pool for %a" Endpoint.pp endpoint);
ep_pool
| None ->
-
Log.debug (fun m -> m "No existing pool, need to create for %a" Endpoint.pp endpoint);
+
Log.debug (fun m ->
+
m "No existing pool, need to create for %a" Endpoint.pp endpoint);
(* Need to create - use write lock *)
Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () ->
-
(* Check again in case another fiber created it *)
-
match Hashtbl.find_opt pool.endpoints endpoint with
-
| Some ep_pool ->
-
Log.debug (fun m -> m "Another fiber created pool for %a" Endpoint.pp endpoint);
-
ep_pool
-
| None ->
+
(* Check again in case another fiber created it *)
+
match Hashtbl.find_opt pool.endpoints endpoint with
+
| Some ep_pool ->
+
Log.debug (fun m ->
+
m "Another fiber created pool for %a" Endpoint.pp endpoint);
+
ep_pool
+
| None ->
(* Create new endpoint pool *)
let stats = create_endp_stats () in
let mutex = Eio.Mutex.create () in
-
Log.info (fun m -> m "Creating new endpoint pool for %a (max_connections=%d)"
-
Endpoint.pp endpoint (Config.max_connections_per_endpoint pool.config));
+
Log.info (fun m ->
+
m "Creating new endpoint pool for %a (max_connections=%d)"
+
Endpoint.pp endpoint
+
(Config.max_connections_per_endpoint pool.config));
-
Log.debug (fun m -> m "About to create Eio.Pool for %a" Endpoint.pp endpoint);
+
Log.debug (fun m ->
+
m "About to create Eio.Pool for %a" Endpoint.pp endpoint);
-
let eio_pool = Eio.Pool.create
-
(Config.max_connections_per_endpoint pool.config)
-
~validate:(fun conn ->
-
Log.debug (fun m -> m "Validate called for connection to %a" Endpoint.pp endpoint);
-
(* Called before reusing from pool *)
-
let healthy = is_healthy pool ~check_readable:false conn in
+
let eio_pool =
+
Eio.Pool.create
+
(Config.max_connections_per_endpoint pool.config)
+
~validate:(fun conn ->
+
Log.debug (fun m ->
+
m "Validate called for connection to %a" Endpoint.pp
+
endpoint);
+
(* Called before reusing from pool *)
+
let healthy = is_healthy pool ~check_readable:false conn in
-
if healthy then (
-
Log.debug (fun m -> m "Reusing connection to %a from pool" Endpoint.pp endpoint);
+
if healthy then (
+
Log.debug (fun m ->
+
m "Reusing connection to %a from pool" Endpoint.pp
+
endpoint);
-
(* Update stats for reuse *)
-
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
-
stats.total_reused <- stats.total_reused + 1
-
);
+
(* Update stats for reuse *)
+
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
+
stats.total_reused <- stats.total_reused + 1);
-
(* Call hook if configured *)
-
Option.iter (fun f -> f endpoint) (Config.on_connection_reused pool.config);
+
(* Call hook if configured *)
+
Option.iter
+
(fun f -> f endpoint)
+
(Config.on_connection_reused pool.config);
-
(* Run health check if configured *)
-
match Config.health_check pool.config with
-
| Some check ->
-
(try check (Connection.flow conn)
-
with _ -> false)
-
| None -> true
-
) else begin
-
Log.debug (fun m -> m "Connection to %a failed validation, creating new one" Endpoint.pp endpoint);
-
false
-
end
-
)
-
~dispose:(fun conn ->
-
(* Called when removing from pool *)
-
Eio.Cancel.protect (fun () ->
-
close_internal pool conn;
+
(* Run health check if configured *)
+
match Config.health_check pool.config with
+
| Some check -> (
+
try check (Connection.flow conn) with _ -> false)
+
| None -> true)
+
else begin
+
Log.debug (fun m ->
+
m
+
"Connection to %a failed validation, creating new \
+
one"
+
Endpoint.pp endpoint);
+
false
+
end)
+
~dispose:(fun conn ->
+
(* Called when removing from pool *)
+
Eio.Cancel.protect (fun () ->
+
close_internal pool conn;
-
(* Update stats *)
-
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
-
stats.total_closed <- stats.total_closed + 1
-
)
-
)
-
)
-
(fun () ->
-
Log.debug (fun m -> m "Factory function called for %a" Endpoint.pp endpoint);
-
try
-
let conn = create_connection pool endpoint in
+
(* Update stats *)
+
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
+
stats.total_closed <- stats.total_closed + 1)))
+
(fun () ->
+
Log.debug (fun m ->
+
m "Factory function called for %a" Endpoint.pp endpoint);
+
try
+
let conn = create_connection pool endpoint in
-
Log.debug (fun m -> m "Connection created successfully for %a" Endpoint.pp endpoint);
+
Log.debug (fun m ->
+
m "Connection created successfully for %a" Endpoint.pp
+
endpoint);
-
(* Update stats *)
-
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
-
stats.total_created <- stats.total_created + 1
-
);
+
(* Update stats *)
+
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
+
stats.total_created <- stats.total_created + 1);
-
(* Call hook if configured *)
-
Option.iter (fun f -> f endpoint) (Config.on_connection_created pool.config);
+
(* Call hook if configured *)
+
Option.iter
+
(fun f -> f endpoint)
+
(Config.on_connection_created pool.config);
-
conn
-
with e ->
-
Log.err (fun m -> m "Factory function failed for %a: %s"
-
Endpoint.pp endpoint (Printexc.to_string e));
-
(* Update error stats *)
-
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
-
stats.errors <- stats.errors + 1
-
);
-
raise e
-
)
+
conn
+
with e ->
+
Log.err (fun m ->
+
m "Factory function failed for %a: %s" Endpoint.pp
+
endpoint (Printexc.to_string e));
+
(* Update error stats *)
+
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
+
stats.errors <- stats.errors + 1);
+
raise e)
in
-
Log.debug (fun m -> m "Eio.Pool created successfully for %a" Endpoint.pp endpoint);
+
Log.debug (fun m ->
+
m "Eio.Pool created successfully for %a" Endpoint.pp endpoint);
-
let ep_pool = {
-
pool = eio_pool;
-
stats;
-
mutex;
-
} in
+
let ep_pool = { pool = eio_pool; stats; mutex } in
Hashtbl.add pool.endpoints endpoint ep_pool;
-
Log.debug (fun m -> m "Endpoint pool added to hashtable for %a" Endpoint.pp endpoint);
-
ep_pool
-
)
+
Log.debug (fun m ->
+
m "Endpoint pool added to hashtable for %a" Endpoint.pp
+
endpoint);
+
ep_pool)
(** {1 Public API - Pool Creation} *)
-
let create ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock) ?tls ?(config = Config.default) () : t =
-
Log.info (fun m -> m "Creating new connection pool (max_per_endpoint=%d, max_idle=%.1fs, max_lifetime=%.1fs)"
-
(Config.max_connections_per_endpoint config)
-
(Config.max_idle_time config)
-
(Config.max_connection_lifetime config));
+
let create ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock) ?tls
+
?(config = Config.default) () : t =
+
Log.info (fun m ->
+
m
+
"Creating new connection pool (max_per_endpoint=%d, max_idle=%.1fs, \
+
max_lifetime=%.1fs)"
+
(Config.max_connections_per_endpoint config)
+
(Config.max_idle_time config)
+
(Config.max_connection_lifetime config));
-
let pool = {
-
sw;
-
net;
-
clock;
-
config;
-
tls;
-
endpoints = Hashtbl.create 16;
-
endpoints_mutex = Eio.Mutex.create ();
-
} in
+
let pool =
+
{
+
sw;
+
net;
+
clock;
+
config;
+
tls;
+
endpoints = Hashtbl.create 16;
+
endpoints_mutex = Eio.Mutex.create ();
+
}
+
in
(* Auto-cleanup on switch release *)
Eio.Switch.on_release sw (fun () ->
-
Eio.Cancel.protect (fun () ->
-
Log.info (fun m -> m "Closing connection pool");
-
(* Close all idle connections - active ones will be cleaned up by switch *)
-
Hashtbl.iter (fun _endpoint _ep_pool ->
-
(* Connections are bound to the switch and will be auto-closed *)
-
()
-
) pool.endpoints;
+
Eio.Cancel.protect (fun () ->
+
Log.info (fun m -> m "Closing connection pool");
+
(* Close all idle connections - active ones will be cleaned up by switch *)
+
Hashtbl.iter
+
(fun _endpoint _ep_pool ->
+
(* Connections are bound to the switch and will be auto-closed *)
+
())
+
pool.endpoints;
-
Hashtbl.clear pool.endpoints
-
)
-
);
+
Hashtbl.clear pool.endpoints));
T pool
···
(* Increment active count *)
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
-
ep_pool.stats.active <- ep_pool.stats.active + 1
-
);
+
ep_pool.stats.active <- ep_pool.stats.active + 1);
Fun.protect
~finally:(fun () ->
(* Decrement active count *)
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
-
ep_pool.stats.active <- ep_pool.stats.active - 1
-
);
-
Log.debug (fun m -> m "Released connection to %a" Endpoint.pp endpoint)
-
)
+
ep_pool.stats.active <- ep_pool.stats.active - 1);
+
Log.debug (fun m -> m "Released connection to %a" Endpoint.pp endpoint))
(fun () ->
(* Use Eio.Pool for resource management *)
Eio.Pool.use ep_pool.pool (fun conn ->
-
Log.debug (fun m -> m "Using connection to %a (uses=%d)"
-
Endpoint.pp endpoint (Connection.use_count conn));
+
Log.debug (fun m ->
+
m "Using connection to %a (uses=%d)" Endpoint.pp endpoint
+
(Connection.use_count conn));
-
(* Update last used time and use count *)
-
Connection.update_usage conn ~now:(get_time pool);
+
(* Update last used time and use count *)
+
Connection.update_usage conn ~now:(get_time pool);
-
(* Update idle stats (connection taken from idle pool) *)
-
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
-
ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1)
-
);
+
(* Update idle stats (connection taken from idle pool) *)
+
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
+
ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
-
try
-
let result = f conn.flow in
+
try
+
let result = f conn.flow in
-
(* Success - connection will be returned to pool by Eio.Pool *)
-
(* Update idle stats (connection returned to idle pool) *)
-
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
-
ep_pool.stats.idle <- ep_pool.stats.idle + 1
-
);
+
(* Success - connection will be returned to pool by Eio.Pool *)
+
(* Update idle stats (connection returned to idle pool) *)
+
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
+
ep_pool.stats.idle <- ep_pool.stats.idle + 1);
-
result
-
with e ->
-
(* Error - close connection so it won't be reused *)
-
Log.warn (fun m -> m "Error using connection to %a: %s"
-
Endpoint.pp endpoint (Printexc.to_string e));
-
close_internal pool conn;
+
result
+
with e ->
+
(* Error - close connection so it won't be reused *)
+
Log.warn (fun m ->
+
m "Error using connection to %a: %s" Endpoint.pp endpoint
+
(Printexc.to_string e));
+
close_internal pool conn;
-
(* Update error stats *)
-
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
-
ep_pool.stats.errors <- ep_pool.stats.errors + 1
-
);
+
(* Update error stats *)
+
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
+
ep_pool.stats.errors <- ep_pool.stats.errors + 1);
-
raise e
-
)
-
)
+
raise e))
(** {1 Public API - Statistics} *)
let stats (T pool) endpoint =
match Hashtbl.find_opt pool.endpoints endpoint with
| Some ep_pool ->
-
Eio.Mutex.use_ro ep_pool.mutex (fun () ->
-
snapshot_stats ep_pool.stats
-
)
+
Eio.Mutex.use_ro ep_pool.mutex (fun () -> snapshot_stats ep_pool.stats)
| None ->
(* No pool for this endpoint yet *)
-
Stats.make
-
~active:0
-
~idle:0
-
~total_created:0
-
~total_reused:0
-
~total_closed:0
-
~errors:0
+
Stats.make ~active:0 ~idle:0 ~total_created:0 ~total_reused:0
+
~total_closed:0 ~errors:0
let all_stats (T pool) =
Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
-
Hashtbl.fold (fun endpoint ep_pool acc ->
-
let stats = Eio.Mutex.use_ro ep_pool.mutex (fun () ->
-
snapshot_stats ep_pool.stats
-
) in
-
(endpoint, stats) :: acc
-
) pool.endpoints []
-
)
+
Hashtbl.fold
+
(fun endpoint ep_pool acc ->
+
let stats =
+
Eio.Mutex.use_ro ep_pool.mutex (fun () ->
+
snapshot_stats ep_pool.stats)
+
in
+
(endpoint, stats) :: acc)
+
pool.endpoints [])
(** {1 Public API - Pool Management} *)
···
match Hashtbl.find_opt pool.endpoints endpoint with
| Some _ep_pool ->
Eio.Cancel.protect (fun () ->
-
(* Remove endpoint pool from hashtable *)
-
(* Idle connections will be discarded *)
-
(* Active connections will be closed when returned *)
-
Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () ->
-
Hashtbl.remove pool.endpoints endpoint
-
)
-
)
+
(* Remove endpoint pool from hashtable *)
+
(* Idle connections will be discarded *)
+
(* Active connections will be closed when returned *)
+
Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () ->
+
Hashtbl.remove pool.endpoints endpoint))
| None ->
-
Log.debug (fun m -> m "No endpoint pool found for %a" Endpoint.pp endpoint)
+
Log.debug (fun m ->
+
m "No endpoint pool found for %a" Endpoint.pp endpoint)
+41 -49
lib/conpool.mli
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
(** Conpool - Protocol-agnostic TCP/IP connection pooling library for Eio *)
(** {1 Logging} *)
···
(** Logs source for the main connection pool. Configure logging with:
{[
Logs.Src.set_level Conpool.src (Some Logs.Debug);
-
Logs.set_reporter (Logs_fmt.reporter ());
+
Logs.set_reporter (Logs_fmt.reporter ())
]}
Each submodule also exposes its own log source for fine-grained control:
- {!Endpoint.src} - endpoint operations
- {!Tls_config.src} - TLS configuration
-
- {!Config.src} - pool configuration
-
*)
+
- {!Config.src} - pool configuration *)
(** {1 Core Types} *)
-
(** Network endpoint representation *)
module Endpoint : module type of Endpoint
+
(** Network endpoint representation *)
-
(** TLS configuration for connection pools *)
module Tls_config : module type of Tls_config
+
(** TLS configuration for connection pools *)
-
(** Configuration for connection pools *)
module Config : module type of Config
+
(** Configuration for connection pools *)
+
module Stats : module type of Stats
(** Statistics for connection pool endpoints *)
-
module Stats : module type of Stats
-
(** Cmdliner terms for connection pool configuration *)
module Cmd : module type of Cmd
+
(** Cmdliner terms for connection pool configuration *)
(** {1 Errors} *)
type error =
| Dns_resolution_failed of { hostname : string }
-
(** DNS resolution failed for the given hostname *)
-
-
| Connection_failed of { endpoint : Endpoint.t; attempts : int; last_error : string }
-
(** Failed to establish connection after all retry attempts *)
-
+
(** DNS resolution failed for the given hostname *)
+
| Connection_failed of {
+
endpoint : Endpoint.t;
+
attempts : int;
+
last_error : string;
+
} (** Failed to establish connection after all retry attempts *)
| Connection_timeout of { endpoint : Endpoint.t; timeout : float }
-
(** Connection attempt timed out *)
-
-
| Invalid_config of string
-
(** Invalid configuration parameter *)
-
-
| Invalid_endpoint of string
-
(** Invalid endpoint specification *)
+
(** Connection attempt timed out *)
+
| Invalid_config of string (** Invalid configuration parameter *)
+
| Invalid_endpoint of string (** Invalid endpoint specification *)
exception Pool_error of error
(** Exception raised by pool operations.
···
clock:'clock Eio.Time.clock ->
?tls:Tls_config.t ->
?config:Config.t ->
-
unit -> t
-
(** Create connection pool bound to switch.
-
All connections will be closed when switch is released.
+
unit ->
+
t
+
(** Create connection pool bound to switch. All connections will be closed when
+
switch is released.
@param sw Switch for resource management
@param net Network interface for creating connections
@param clock Clock for timeouts and time-based validation
@param tls Optional TLS configuration applied to all connections
-
@param config Optional pool configuration (uses Config.default if not provided) *)
+
@param config
+
Optional pool configuration (uses Config.default if not provided) *)
(** {1 Connection Usage} *)
···
(** Acquire connection, use it, automatically release back to pool.
If idle connection available and healthy:
-
- Reuse from pool (validates health first)
-
Else:
-
- Create new connection (may block if endpoint at limit)
+
- Reuse from pool (validates health first) Else:
+
- Create new connection (may block if endpoint at limit)
-
On success: connection returned to pool for reuse
-
On error: connection closed, not returned to pool
+
On success: connection returned to pool for reuse On error: connection
+
closed, not returned to pool
Example:
{[
let endpoint = Conpool.Endpoint.make ~host:"example.com" ~port:443 in
Conpool.with_connection pool endpoint (fun conn ->
-
(* Use conn for HTTP request, Redis command, etc. *)
-
Eio.Flow.copy_string "GET / HTTP/1.1\r\n\r\n" conn;
-
let buf = Eio.Buf_read.of_flow conn ~max_size:4096 in
-
Eio.Buf_read.take_all buf
-
)
-
]}
-
*)
+
(* Use conn for HTTP request, Redis command, etc. *)
+
Eio.Flow.copy_string "GET / HTTP/1.1\r\n\r\n" conn;
+
let buf = Eio.Buf_read.of_flow conn ~max_size:4096 in
+
Eio.Buf_read.take_all buf)
+
]} *)
(** {1 Statistics & Monitoring} *)
-
val stats :
-
t ->
-
Endpoint.t ->
-
Stats.t
+
val stats : t -> Endpoint.t -> Stats.t
(** Get statistics for specific endpoint *)
-
val all_stats :
-
t ->
-
(Endpoint.t * Stats.t) list
+
val all_stats : t -> (Endpoint.t * Stats.t) list
(** Get statistics for all endpoints in pool *)
(** {1 Pool Management} *)
-
val clear_endpoint :
-
t ->
-
Endpoint.t ->
-
unit
+
val clear_endpoint : t -> Endpoint.t -> unit
(** Clear all cached connections for a specific endpoint.
This removes the endpoint from the pool, discarding all idle connections.
Active connections will continue to work but won't be returned to the pool.
-
Use this when you know an endpoint's connections are no longer valid
-
(e.g., server restarted, network reconfigured, credentials changed).
+
Use this when you know an endpoint's connections are no longer valid (e.g.,
+
server restarted, network reconfigured, credentials changed).
The pool will be automatically cleaned up when its switch is released. *)
+14 -15
lib/endpoint.ml
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
(** Network endpoint representation *)
-
let src = Logs.Src.create "conpool.endpoint" ~doc:"Connection pool endpoint operations"
+
let src =
+
Logs.Src.create "conpool.endpoint" ~doc:"Connection pool endpoint operations"
+
module Log = (val Logs.src_log src : Logs.LOG)
-
type t = {
-
host : string;
-
port : int;
-
}
+
type t = { host : string; port : int }
let make ~host ~port =
(* Validate port range *)
if port < 1 || port > 65535 then
-
invalid_arg (Printf.sprintf "Invalid port number: %d (must be 1-65535)" port);
+
invalid_arg
+
(Printf.sprintf "Invalid port number: %d (must be 1-65535)" port);
(* Validate hostname is not empty *)
-
if String.trim host = "" then
-
invalid_arg "Hostname cannot be empty";
+
if String.trim host = "" then invalid_arg "Hostname cannot be empty";
Log.debug (fun m -> m "Creating endpoint: %s:%d" host port);
{ host; port }
let host t = t.host
let port t = t.port
-
-
let equal t1 t2 =
-
String.equal t1.host t2.host && t1.port = t2.port
-
-
let hash t =
-
Hashtbl.hash (t.host, t.port)
-
+
let equal t1 t2 = String.equal t1.host t2.host && t1.port = t2.port
+
let hash t = Hashtbl.hash (t.host, t.port)
let pp = Fmt.of_to_string (fun t -> Printf.sprintf "%s:%d" t.host t.port)
+7 -3
lib/endpoint.mli
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
(** Network endpoint representation *)
(** {1 Logging} *)
···
val src : Logs.Src.t
(** Logs source for endpoint operations. Configure logging with:
{[
-
Logs.Src.set_level Conpool.Endpoint.src (Some Logs.Debug);
-
]}
-
*)
+
Logs.Src.set_level Conpool.Endpoint.src (Some Logs.Debug)
+
]} *)
(** {1 Type} *)
+6 -6
lib/stats.ml
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
(** Statistics for connection pool endpoints *)
type t = {
···
- Reused: %d@,\
- Closed: %d@,\
- Errors: %d@]"
-
t.active
-
t.idle
-
t.total_created
-
t.total_reused
-
t.total_closed
-
t.errors
+
t.active t.idle t.total_created t.total_reused t.total_closed t.errors
+5
lib/stats.mli
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
(** Statistics for connection pool endpoints *)
(** {1 Type} *)
+9 -6
lib/tls_config.ml
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
(** TLS configuration for connection pools *)
let src = Logs.Src.create "conpool.tls" ~doc:"Connection pool TLS configuration"
+
module Log = (val Logs.src_log src : Logs.LOG)
-
type t = {
-
config : Tls.Config.client;
-
servername : string option;
-
}
+
type t = { config : Tls.Config.client; servername : string option }
let make ~config ?servername () =
Log.debug (fun m ->
-
m "Creating TLS config with servername: %s"
-
(match servername with Some s -> s | None -> "<default>"));
+
m "Creating TLS config with servername: %s"
+
(match servername with Some s -> s | None -> "<default>"));
{ config; servername }
let config t = t.config
+9 -4
lib/tls_config.mli
···
+
(*---------------------------------------------------------------------------
+
Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
+
SPDX-License-Identifier: ISC
+
---------------------------------------------------------------------------*)
+
(** TLS configuration for connection pools *)
(** {1 Logging} *)
···
val src : Logs.Src.t
(** Logs source for TLS configuration operations. Configure logging with:
{[
-
Logs.Src.set_level Conpool.Tls_config.src (Some Logs.Debug);
-
]}
-
*)
+
Logs.Src.set_level Conpool.Tls_config.src (Some Logs.Debug)
+
]} *)
(** {1 Type} *)
···
(** Create TLS configuration.
@param config TLS client configuration for all connections
-
@param servername Optional SNI server name override. If [None], uses the endpoint's hostname
+
@param servername
+
Optional SNI server name override. If [None], uses the endpoint's hostname
*)
(** {1 Accessors} *)