TCP/TLS connection pooling for Eio

split out modules

+2
conpool.opam
···
"eio"
"tls-eio" {>= "1.0"}
"logs"
"odoc" {with-doc}
]
build: [
···
"eio"
"tls-eio" {>= "1.0"}
"logs"
+
"fmt"
+
"cmdliner"
"odoc" {with-doc}
]
build: [
+3 -1
dune-project
···
(dune (>= 3.0))
eio
(tls-eio (>= 1.0))
-
logs))
···
(dune (>= 3.0))
eio
(tls-eio (>= 1.0))
+
logs
+
fmt
+
cmdliner))
+52
lib/cmd.ml
···
···
+
(** Cmdliner terms for connection pool configuration *)
+
+
open Cmdliner
+
+
let max_connections_per_endpoint =
+
let doc = "Maximum concurrent connections per endpoint." in
+
Arg.(value & opt int 10 & info ["max-connections-per-endpoint"] ~doc ~docv:"NUM")
+
+
let max_idle_time =
+
let doc = "Maximum time a connection can sit idle in seconds." in
+
Arg.(value & opt float 60.0 & info ["max-idle-time"] ~doc ~docv:"SECONDS")
+
+
let max_connection_lifetime =
+
let doc = "Maximum connection age in seconds." in
+
Arg.(value & opt float 300.0 & info ["max-connection-lifetime"] ~doc ~docv:"SECONDS")
+
+
let max_connection_uses =
+
let doc = "Maximum times a connection can be reused (omit for unlimited)." in
+
Arg.(value & opt (some int) None & info ["max-connection-uses"] ~doc ~docv:"NUM")
+
+
let connect_timeout =
+
let doc = "Connection timeout in seconds." in
+
Arg.(value & opt float 10.0 & info ["connect-timeout"] ~doc ~docv:"SECONDS")
+
+
let connect_retry_count =
+
let doc = "Number of connection retry attempts." in
+
Arg.(value & opt int 3 & info ["connect-retry-count"] ~doc ~docv:"NUM")
+
+
let connect_retry_delay =
+
let doc = "Initial retry delay in seconds (with exponential backoff)." in
+
Arg.(value & opt float 0.1 & info ["connect-retry-delay"] ~doc ~docv:"SECONDS")
+
+
let config =
+
let make max_conn max_idle max_lifetime max_uses timeout retry_count retry_delay =
+
Config.make
+
~max_connections_per_endpoint:max_conn
+
~max_idle_time:max_idle
+
~max_connection_lifetime:max_lifetime
+
?max_connection_uses:max_uses
+
~connect_timeout:timeout
+
~connect_retry_count:retry_count
+
~connect_retry_delay:retry_delay
+
()
+
in
+
Term.(const make
+
$ max_connections_per_endpoint
+
$ max_idle_time
+
$ max_connection_lifetime
+
$ max_connection_uses
+
$ connect_timeout
+
$ connect_retry_count
+
$ connect_retry_delay)
+45
lib/cmd.mli
···
···
+
(** Cmdliner terms for connection pool configuration *)
+
+
(** {1 Configuration Terms} *)
+
+
val max_connections_per_endpoint : int Cmdliner.Term.t
+
(** Cmdliner term for maximum connections per endpoint.
+
Default: 10
+
Flag: [--max-connections-per-endpoint] *)
+
+
val max_idle_time : float Cmdliner.Term.t
+
(** Cmdliner term for maximum idle time in seconds.
+
Default: 60.0
+
Flag: [--max-idle-time] *)
+
+
val max_connection_lifetime : float Cmdliner.Term.t
+
(** Cmdliner term for maximum connection lifetime in seconds.
+
Default: 300.0
+
Flag: [--max-connection-lifetime] *)
+
+
val max_connection_uses : int option Cmdliner.Term.t
+
(** Cmdliner term for maximum connection uses.
+
Default: None (unlimited)
+
Flag: [--max-connection-uses] *)
+
+
val connect_timeout : float Cmdliner.Term.t
+
(** Cmdliner term for connection timeout in seconds.
+
Default: 10.0
+
Flag: [--connect-timeout] *)
+
+
val connect_retry_count : int Cmdliner.Term.t
+
(** Cmdliner term for number of connection retry attempts.
+
Default: 3
+
Flag: [--connect-retry-count] *)
+
+
val connect_retry_delay : float Cmdliner.Term.t
+
(** Cmdliner term for initial retry delay in seconds.
+
Default: 0.1
+
Flag: [--connect-retry-delay] *)
+
+
(** {1 Combined Terms} *)
+
+
val config : Config.t Cmdliner.Term.t
+
(** Cmdliner term that combines all configuration options into a {!Config.t}.
+
This term can be used in your application's main command to accept
+
all connection pool configuration options from the command line. *)
+80
lib/config.ml
···
···
+
(** Configuration for connection pools *)
+
+
let src = Logs.Src.create "conpool.config" ~doc:"Connection pool configuration"
+
module Log = (val Logs.src_log src : Logs.LOG)
+
+
type t = {
+
max_connections_per_endpoint : int;
+
max_idle_time : float;
+
max_connection_lifetime : float;
+
max_connection_uses : int option;
+
health_check : ([`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t -> bool) option;
+
connect_timeout : float option;
+
connect_retry_count : int;
+
connect_retry_delay : float;
+
on_connection_created : (Endpoint.t -> unit) option;
+
on_connection_closed : (Endpoint.t -> unit) option;
+
on_connection_reused : (Endpoint.t -> unit) option;
+
}
+
+
let make
+
?(max_connections_per_endpoint = 10)
+
?(max_idle_time = 60.0)
+
?(max_connection_lifetime = 300.0)
+
?max_connection_uses
+
?health_check
+
?(connect_timeout = 10.0)
+
?(connect_retry_count = 3)
+
?(connect_retry_delay = 0.1)
+
?on_connection_created
+
?on_connection_closed
+
?on_connection_reused
+
() =
+
Log.debug (fun m ->
+
m "Creating config: max_connections=%d, max_idle=%.1fs, max_lifetime=%.1fs"
+
max_connections_per_endpoint max_idle_time max_connection_lifetime);
+
{
+
max_connections_per_endpoint;
+
max_idle_time;
+
max_connection_lifetime;
+
max_connection_uses;
+
health_check;
+
connect_timeout = Some connect_timeout;
+
connect_retry_count;
+
connect_retry_delay;
+
on_connection_created;
+
on_connection_closed;
+
on_connection_reused;
+
}
+
+
let default = make ()
+
+
let max_connections_per_endpoint t = t.max_connections_per_endpoint
+
let max_idle_time t = t.max_idle_time
+
let max_connection_lifetime t = t.max_connection_lifetime
+
let max_connection_uses t = t.max_connection_uses
+
let health_check t = t.health_check
+
let connect_timeout t = t.connect_timeout
+
let connect_retry_count t = t.connect_retry_count
+
let connect_retry_delay t = t.connect_retry_delay
+
let on_connection_created t = t.on_connection_created
+
let on_connection_closed t = t.on_connection_closed
+
let on_connection_reused t = t.on_connection_reused
+
+
let pp ppf t =
+
Fmt.pf ppf
+
"@[<v>Config:@,\
+
- max_connections_per_endpoint: %d@,\
+
- max_idle_time: %.1fs@,\
+
- max_connection_lifetime: %.1fs@,\
+
- max_connection_uses: %s@,\
+
- connect_timeout: %s@,\
+
- connect_retry_count: %d@,\
+
- connect_retry_delay: %.2fs@]"
+
t.max_connections_per_endpoint
+
t.max_idle_time
+
t.max_connection_lifetime
+
(match t.max_connection_uses with Some n -> string_of_int n | None -> "unlimited")
+
(match t.connect_timeout with Some f -> Fmt.str "%.1fs" f | None -> "none")
+
t.connect_retry_count
+
t.connect_retry_delay
+98
lib/config.mli
···
···
+
(** Configuration for connection pools *)
+
+
(** {1 Logging} *)
+
+
val src : Logs.Src.t
+
(** Logs source for configuration operations. Configure logging with:
+
{[
+
Logs.Src.set_level Conpool.Config.src (Some Logs.Debug);
+
]}
+
*)
+
+
(** {1 Type} *)
+
+
type t
+
(** Pool configuration *)
+
+
(** {1 Construction} *)
+
+
val make :
+
?max_connections_per_endpoint:int ->
+
?max_idle_time:float ->
+
?max_connection_lifetime:float ->
+
?max_connection_uses:int ->
+
?health_check:([ `Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t -> bool) ->
+
?connect_timeout:float ->
+
?connect_retry_count:int ->
+
?connect_retry_delay:float ->
+
?on_connection_created:(Endpoint.t -> unit) ->
+
?on_connection_closed:(Endpoint.t -> unit) ->
+
?on_connection_reused:(Endpoint.t -> unit) ->
+
unit -> t
+
(** Create pool configuration with optional parameters.
+
+
@param max_connections_per_endpoint Maximum concurrent connections per endpoint (default: 10)
+
@param max_idle_time Maximum time a connection can sit idle in seconds (default: 60.0)
+
@param max_connection_lifetime Maximum connection age in seconds (default: 300.0)
+
@param max_connection_uses Maximum times a connection can be reused (default: unlimited)
+
@param health_check Custom health check function (default: none)
+
@param connect_timeout Connection timeout in seconds (default: 10.0)
+
@param connect_retry_count Number of connection retry attempts (default: 3)
+
@param connect_retry_delay Initial retry delay in seconds, with exponential backoff (default: 0.1)
+
@param on_connection_created Hook called when a connection is created
+
@param on_connection_closed Hook called when a connection is closed
+
@param on_connection_reused Hook called when a connection is reused
+
*)
+
+
val default : t
+
(** Sensible defaults for most use cases:
+
- max_connections_per_endpoint: 10
+
- max_idle_time: 60.0s
+
- max_connection_lifetime: 300.0s
+
- max_connection_uses: unlimited
+
- health_check: none
+
- connect_timeout: 10.0s
+
- connect_retry_count: 3
+
- connect_retry_delay: 0.1s
+
- hooks: none
+
*)
+
+
(** {1 Accessors} *)
+
+
val max_connections_per_endpoint : t -> int
+
(** Get maximum connections per endpoint. *)
+
+
val max_idle_time : t -> float
+
(** Get maximum idle time in seconds. *)
+
+
val max_connection_lifetime : t -> float
+
(** Get maximum connection lifetime in seconds. *)
+
+
val max_connection_uses : t -> int option
+
(** Get maximum connection uses, if any. *)
+
+
val health_check : t -> ([ `Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t -> bool) option
+
(** Get custom health check function, if any. *)
+
+
val connect_timeout : t -> float option
+
(** Get connection timeout in seconds, if any. *)
+
+
val connect_retry_count : t -> int
+
(** Get number of connection retry attempts. *)
+
+
val connect_retry_delay : t -> float
+
(** Get initial retry delay in seconds. *)
+
+
val on_connection_created : t -> (Endpoint.t -> unit) option
+
(** Get connection created hook, if any. *)
+
+
val on_connection_closed : t -> (Endpoint.t -> unit) option
+
(** Get connection closed hook, if any. *)
+
+
val on_connection_reused : t -> (Endpoint.t -> unit) option
+
(** Get connection reused hook, if any. *)
+
+
(** {1 Pretty-printing} *)
+
+
val pp : t Fmt.t
+
(** Pretty-printer for configuration. *)
+24
lib/connection.ml
···
···
+
(** Internal connection representation - not exposed in public API *)
+
+
let src = Logs.Src.create "conpool.connection" ~doc:"Connection pool internal connection management"
+
module Log = (val Logs.src_log src : Logs.LOG)
+
+
type t = {
+
flow : [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t;
+
created_at : float;
+
mutable last_used : float;
+
mutable use_count : int;
+
endpoint : Endpoint.t;
+
}
+
+
let flow t = t.flow
+
let endpoint t = t.endpoint
+
let created_at t = t.created_at
+
let last_used t = t.last_used
+
let use_count t = t.use_count
+
+
let pp ppf t =
+
Fmt.pf ppf "Connection(endpoint=%a, age=%.2fs, uses=%d)"
+
Endpoint.pp t.endpoint
+
(Unix.gettimeofday () -. t.created_at)
+
t.use_count
+104 -251
lib/conpool.ml
···
let src = Logs.Src.create "conpool" ~doc:"Connection pooling library"
module Log = (val Logs.src_log src : Logs.LOG)
-
module Endpoint = struct
-
type t = {
-
host : string;
-
port : int;
-
}
-
-
let make ~host ~port = { host; port }
-
-
let host t = t.host
-
let port t = t.port
-
-
let pp fmt t =
-
Format.fprintf fmt "%s:%d" t.host t.port
-
-
let equal t1 t2 =
-
String.equal t1.host t2.host && t1.port = t2.port
-
-
let hash t =
-
Hashtbl.hash (t.host, t.port)
-
end
-
-
module Tls_config = struct
-
type t = {
-
config : Tls.Config.client;
-
servername : string option;
-
}
-
-
let make ~config ?servername () = { config; servername }
-
-
let config t = t.config
-
let servername t = t.servername
-
-
let pp fmt t =
-
Format.fprintf fmt "TLS(servername=%s)"
-
(match t.servername with Some s -> s | None -> "<default>")
-
end
-
-
(* Internal connection type - not exposed in public API *)
-
module Connection = struct
-
type t = {
-
flow : [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t;
-
created_at : float;
-
mutable last_used : float;
-
mutable use_count : int;
-
endpoint : Endpoint.t;
-
}
-
-
let flow t = t.flow
-
let endpoint t = t.endpoint
-
let created_at t = t.created_at
-
let last_used t = t.last_used
-
let use_count t = t.use_count
-
end
-
-
module Config = struct
-
type t = {
-
max_connections_per_endpoint : int;
-
max_idle_time : float;
-
max_connection_lifetime : float;
-
max_connection_uses : int option;
-
health_check : ([`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t -> bool) option;
-
connect_timeout : float option;
-
connect_retry_count : int;
-
connect_retry_delay : float;
-
on_connection_created : (Endpoint.t -> unit) option;
-
on_connection_closed : (Endpoint.t -> unit) option;
-
on_connection_reused : (Endpoint.t -> unit) option;
-
}
-
-
let make
-
?(max_connections_per_endpoint = 10)
-
?(max_idle_time = 60.0)
-
?(max_connection_lifetime = 300.0)
-
?max_connection_uses
-
?health_check
-
?(connect_timeout = 10.0)
-
?(connect_retry_count = 3)
-
?(connect_retry_delay = 0.1)
-
?on_connection_created
-
?on_connection_closed
-
?on_connection_reused
-
() =
-
{
-
max_connections_per_endpoint;
-
max_idle_time;
-
max_connection_lifetime;
-
max_connection_uses;
-
health_check;
-
connect_timeout = Some connect_timeout;
-
connect_retry_count;
-
connect_retry_delay;
-
on_connection_created;
-
on_connection_closed;
-
on_connection_reused;
-
}
-
-
let default = make ()
-
-
let max_connections_per_endpoint t = t.max_connections_per_endpoint
-
let max_idle_time t = t.max_idle_time
-
let max_connection_lifetime t = t.max_connection_lifetime
-
let max_connection_uses t = t.max_connection_uses
-
let health_check t = t.health_check
-
let connect_timeout t = t.connect_timeout
-
let connect_retry_count t = t.connect_retry_count
-
let connect_retry_delay t = t.connect_retry_delay
-
-
let pp fmt t =
-
Format.fprintf fmt
-
"@[<v>Config:@,\
-
- max_connections_per_endpoint: %d@,\
-
- max_idle_time: %.1fs@,\
-
- max_connection_lifetime: %.1fs@,\
-
- max_connection_uses: %s@,\
-
- connect_timeout: %s@,\
-
- connect_retry_count: %d@,\
-
- connect_retry_delay: %.2fs@]"
-
t.max_connections_per_endpoint
-
t.max_idle_time
-
t.max_connection_lifetime
-
(match t.max_connection_uses with Some n -> string_of_int n | None -> "unlimited")
-
(match t.connect_timeout with Some f -> Printf.sprintf "%.1fs" f | None -> "none")
-
t.connect_retry_count
-
t.connect_retry_delay
-
end
-
-
module Stats = struct
-
type t = {
-
active : int;
-
idle : int;
-
total_created : int;
-
total_reused : int;
-
total_closed : int;
-
errors : int;
-
}
-
-
let active t = t.active
-
let idle t = t.idle
-
let total_created t = t.total_created
-
let total_reused t = t.total_reused
-
let total_closed t = t.total_closed
-
let errors t = t.errors
-
-
let pp fmt t =
-
Format.fprintf fmt
-
"@[<v>Stats:@,\
-
- Active: %d@,\
-
- Idle: %d@,\
-
- Created: %d@,\
-
- Reused: %d@,\
-
- Closed: %d@,\
-
- Errors: %d@]"
-
t.active
-
t.idle
-
t.total_created
-
t.total_reused
-
t.total_closed
-
t.errors
-
end
type endp_stats = {
mutable active : int;
···
mutex : Eio.Mutex.t;
}
-
type ('clock, 'net) t = {
sw : Eio.Switch.t;
net : 'net;
clock : 'clock;
···
endpoints_mutex : Eio.Mutex.t;
}
module EndpointTbl = Hashtbl.Make(struct
type t = Endpoint.t
let equal = Endpoint.equal
let hash = Endpoint.hash
end)
-
let get_time pool =
Eio.Time.now pool.clock
let create_endp_stats () = {
···
errors = 0;
}
-
let snapshot_stats (stats : endp_stats) : Stats.t = {
-
active = stats.active;
-
idle = stats.idle;
-
total_created = stats.total_created;
-
total_reused = stats.total_reused;
-
total_closed = stats.total_closed;
-
errors = stats.errors;
-
}
(** {1 DNS Resolution} *)
-
let resolve_endpoint pool endpoint =
Log.debug (fun m -> m "Resolving %a..." Endpoint.pp endpoint);
let addrs = Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint) ~service:(string_of_int (Endpoint.port endpoint)) in
Log.debug (fun m -> m "Got address list for %a" Endpoint.pp endpoint);
···
(** {1 Connection Creation with Retry} *)
-
let rec create_connection_with_retry pool endpoint attempt =
-
if attempt > pool.config.connect_retry_count then begin
Log.err (fun m -> m "Failed to connect to %a after %d attempts"
-
Endpoint.pp endpoint pool.config.connect_retry_count);
failwith (Printf.sprintf "Failed to connect to %s:%d after %d attempts"
-
(Endpoint.host endpoint) (Endpoint.port endpoint) pool.config.connect_retry_count)
end;
Log.debug (fun m -> m "Connecting to %a (attempt %d/%d)"
-
Endpoint.pp endpoint attempt pool.config.connect_retry_count);
try
let addr = resolve_endpoint pool endpoint in
···
(* Connect with optional timeout *)
let socket =
-
match pool.config.connect_timeout with
| Some timeout ->
Eio.Time.with_timeout_exn pool.clock timeout
(fun () -> Eio.Net.connect ~sw:pool.sw pool.net addr)
···
| Eio.Time.Timeout ->
Log.warn (fun m -> m "Connection timeout to %a (attempt %d)" Endpoint.pp endpoint attempt);
(* Exponential backoff *)
-
let delay = pool.config.connect_retry_delay *. (2.0 ** float_of_int (attempt - 1)) in
Eio.Time.sleep pool.clock delay;
create_connection_with_retry pool endpoint (attempt + 1)
| e ->
(* Other errors - retry with backoff *)
Log.warn (fun m -> m "Connection attempt %d to %a failed: %s"
attempt Endpoint.pp endpoint (Printexc.to_string e));
-
if attempt < pool.config.connect_retry_count then (
-
let delay = pool.config.connect_retry_delay *. (2.0 ** float_of_int (attempt - 1)) in
Eio.Time.sleep pool.clock delay;
create_connection_with_retry pool endpoint (attempt + 1)
) else
raise e
-
let create_connection pool endpoint =
create_connection_with_retry pool endpoint 1
(** {1 Connection Validation} *)
-
let is_healthy pool ?(check_readable = false) conn =
let now = get_time pool in
(* Check age *)
let age = now -. Connection.created_at conn in
-
if age > pool.config.max_connection_lifetime then begin
Log.debug (fun m -> m "Connection to %a unhealthy: exceeded max lifetime (%.2fs > %.2fs)"
-
Endpoint.pp (Connection.endpoint conn) age pool.config.max_connection_lifetime);
false
end
(* Check idle time *)
-
else if (now -. Connection.last_used conn) > pool.config.max_idle_time then begin
-
let idle_time = now -. Connection.last_used conn in
-
Log.debug (fun m -> m "Connection to %a unhealthy: exceeded max idle time (%.2fs > %.2fs)"
-
Endpoint.pp (Connection.endpoint conn) idle_time pool.config.max_idle_time);
-
false
-
end
-
(* Check use count *)
-
else if (match pool.config.max_connection_uses with
-
| Some max -> Connection.use_count conn >= max
-
| None -> false) then begin
-
Log.debug (fun m -> m "Connection to %a unhealthy: exceeded max use count (%d)"
-
Endpoint.pp (Connection.endpoint conn) (Connection.use_count conn));
-
false
-
end
-
(* Optional: custom health check *)
-
else if (match pool.config.health_check with
-
| Some check ->
-
(try
-
let healthy = check (Connection.flow conn) in
-
if not healthy then
-
Log.debug (fun m -> m "Connection to %a failed custom health check"
-
Endpoint.pp (Connection.endpoint conn));
-
not healthy
-
with e ->
-
Log.debug (fun m -> m "Connection to %a health check raised exception: %s"
-
Endpoint.pp (Connection.endpoint conn) (Printexc.to_string e));
-
true) (* Exception in health check = unhealthy *)
-
| None -> false) then
-
false
-
(* Optional: check if socket still connected *)
-
else if check_readable then
-
try
-
(* TODO avsm: a sockopt for this? *)
true
-
with
-
| _ -> false
-
-
else begin
-
Log.debug (fun m -> m "Connection to %a is healthy (age=%.2fs, idle=%.2fs, uses=%d)"
-
Endpoint.pp (Connection.endpoint conn)
-
age
-
(now -. Connection.last_used conn)
-
(Connection.use_count conn));
-
true
end
(** {1 Internal Pool Operations} *)
-
let close_internal pool conn =
Log.debug (fun m -> m "Closing connection to %a (age=%.2fs, uses=%d)"
Endpoint.pp (Connection.endpoint conn)
(get_time pool -. Connection.created_at conn)
···
);
(* Call hook if configured *)
-
Option.iter (fun f -> f (Connection.endpoint conn)) pool.config.on_connection_closed
-
let get_or_create_endpoint_pool pool endpoint =
Log.debug (fun m -> m "Getting or creating endpoint pool for %a" Endpoint.pp endpoint);
(* First try with read lock *)
···
let mutex = Eio.Mutex.create () in
Log.info (fun m -> m "Creating new endpoint pool for %a (max_connections=%d)"
-
Endpoint.pp endpoint pool.config.max_connections_per_endpoint);
Log.debug (fun m -> m "About to create Eio.Pool for %a" Endpoint.pp endpoint);
let eio_pool = Eio.Pool.create
-
pool.config.max_connections_per_endpoint
~validate:(fun conn ->
Log.debug (fun m -> m "Validate called for connection to %a" Endpoint.pp endpoint);
(* Called before reusing from pool *)
···
);
(* Call hook if configured *)
-
Option.iter (fun f -> f endpoint) pool.config.on_connection_reused;
(* Run health check if configured *)
-
match pool.config.health_check with
| Some check ->
(try check (Connection.flow conn)
with _ -> false)
···
);
(* Call hook if configured *)
-
Option.iter (fun f -> f endpoint) pool.config.on_connection_created;
conn
with e ->
···
(** {1 Public API - Pool Creation} *)
-
let create ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock) ?tls ?(config = Config.default) () : ('clock Eio.Time.clock, 'net Eio.Net.t) t =
Log.info (fun m -> m "Creating new connection pool (max_per_endpoint=%d, max_idle=%.1fs, max_lifetime=%.1fs)"
-
config.max_connections_per_endpoint
-
config.max_idle_time
-
config.max_connection_lifetime);
let pool = {
sw;
···
)
);
-
pool
(** {1 Public API - Connection Management} *)
-
let with_connection (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) endpoint f =
Log.debug (fun m -> m "Acquiring connection to %a" Endpoint.pp endpoint);
let ep_pool = get_or_create_endpoint_pool pool endpoint in
···
(** {1 Public API - Statistics} *)
-
let stats (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) endpoint =
match Hashtbl.find_opt pool.endpoints endpoint with
| Some ep_pool ->
Eio.Mutex.use_ro ep_pool.mutex (fun () ->
···
)
| None ->
(* No pool for this endpoint yet *)
-
{
-
Stats.active = 0;
-
idle = 0;
-
total_created = 0;
-
total_reused = 0;
-
total_closed = 0;
-
errors = 0;
-
}
-
let all_stats (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) =
Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
Hashtbl.fold (fun endpoint ep_pool acc ->
let stats = Eio.Mutex.use_ro ep_pool.mutex (fun () ->
···
(** {1 Public API - Pool Management} *)
-
let clear_endpoint (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) endpoint =
Log.info (fun m -> m "Clearing endpoint %a from pool" Endpoint.pp endpoint);
match Hashtbl.find_opt pool.endpoints endpoint with
| Some _ep_pool ->
···
let src = Logs.Src.create "conpool" ~doc:"Connection pooling library"
module Log = (val Logs.src_log src : Logs.LOG)
+
(* Re-export submodules *)
+
module Endpoint = Endpoint
+
module Tls_config = Tls_config
+
module Config = Config
+
module Stats = Stats
+
module Cmd = Cmd
type endp_stats = {
mutable active : int;
···
mutex : Eio.Mutex.t;
}
+
type ('clock, 'net) internal = {
sw : Eio.Switch.t;
net : 'net;
clock : 'clock;
···
endpoints_mutex : Eio.Mutex.t;
}
+
type t = T : ('clock Eio.Time.clock, 'net Eio.Net.t) internal -> t
+
module EndpointTbl = Hashtbl.Make(struct
type t = Endpoint.t
let equal = Endpoint.equal
let hash = Endpoint.hash
end)
+
let get_time (pool : ('clock, 'net) internal) =
Eio.Time.now pool.clock
let create_endp_stats () = {
···
errors = 0;
}
+
let snapshot_stats (stats : endp_stats) : Stats.t =
+
Stats.make
+
~active:stats.active
+
~idle:stats.idle
+
~total_created:stats.total_created
+
~total_reused:stats.total_reused
+
~total_closed:stats.total_closed
+
~errors:stats.errors
(** {1 DNS Resolution} *)
+
let resolve_endpoint (pool : ('clock, 'net) internal) endpoint =
Log.debug (fun m -> m "Resolving %a..." Endpoint.pp endpoint);
let addrs = Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint) ~service:(string_of_int (Endpoint.port endpoint)) in
Log.debug (fun m -> m "Got address list for %a" Endpoint.pp endpoint);
···
(** {1 Connection Creation with Retry} *)
+
let rec create_connection_with_retry (pool : ('clock, 'net) internal) endpoint attempt =
+
let retry_count = Config.connect_retry_count pool.config in
+
if attempt > retry_count then begin
Log.err (fun m -> m "Failed to connect to %a after %d attempts"
+
Endpoint.pp endpoint retry_count);
failwith (Printf.sprintf "Failed to connect to %s:%d after %d attempts"
+
(Endpoint.host endpoint) (Endpoint.port endpoint) retry_count)
end;
Log.debug (fun m -> m "Connecting to %a (attempt %d/%d)"
+
Endpoint.pp endpoint attempt retry_count);
try
let addr = resolve_endpoint pool endpoint in
···
(* Connect with optional timeout *)
let socket =
+
match Config.connect_timeout pool.config with
| Some timeout ->
Eio.Time.with_timeout_exn pool.clock timeout
(fun () -> Eio.Net.connect ~sw:pool.sw pool.net addr)
···
| Eio.Time.Timeout ->
Log.warn (fun m -> m "Connection timeout to %a (attempt %d)" Endpoint.pp endpoint attempt);
(* Exponential backoff *)
+
let delay = Config.connect_retry_delay pool.config *. (2.0 ** float_of_int (attempt - 1)) in
Eio.Time.sleep pool.clock delay;
create_connection_with_retry pool endpoint (attempt + 1)
| e ->
(* Other errors - retry with backoff *)
Log.warn (fun m -> m "Connection attempt %d to %a failed: %s"
attempt Endpoint.pp endpoint (Printexc.to_string e));
+
if attempt < Config.connect_retry_count pool.config then (
+
let delay = Config.connect_retry_delay pool.config *. (2.0 ** float_of_int (attempt - 1)) in
Eio.Time.sleep pool.clock delay;
create_connection_with_retry pool endpoint (attempt + 1)
) else
raise e
+
let create_connection (pool : ('clock, 'net) internal) endpoint =
create_connection_with_retry pool endpoint 1
(** {1 Connection Validation} *)
+
let is_healthy (pool : ('clock, 'net) internal) ?(check_readable = false) conn =
let now = get_time pool in
(* Check age *)
let age = now -. Connection.created_at conn in
+
let max_lifetime = Config.max_connection_lifetime pool.config in
+
if age > max_lifetime then begin
Log.debug (fun m -> m "Connection to %a unhealthy: exceeded max lifetime (%.2fs > %.2fs)"
+
Endpoint.pp (Connection.endpoint conn) age max_lifetime);
false
end
(* Check idle time *)
+
else begin
+
let max_idle = Config.max_idle_time pool.config in
+
if (now -. Connection.last_used conn) > max_idle then begin
+
let idle_time = now -. Connection.last_used conn in
+
Log.debug (fun m -> m "Connection to %a unhealthy: exceeded max idle time (%.2fs > %.2fs)"
+
Endpoint.pp (Connection.endpoint conn) idle_time max_idle);
+
false
+
end
+
(* Check use count *)
+
else if (match Config.max_connection_uses pool.config with
+
| Some max -> Connection.use_count conn >= max
+
| None -> false) then begin
+
Log.debug (fun m -> m "Connection to %a unhealthy: exceeded max use count (%d)"
+
Endpoint.pp (Connection.endpoint conn) (Connection.use_count conn));
+
false
+
end
+
(* Optional: custom health check *)
+
else if (match Config.health_check pool.config with
+
| Some check ->
+
(try
+
let healthy = check (Connection.flow conn) in
+
if not healthy then
+
Log.debug (fun m -> m "Connection to %a failed custom health check"
+
Endpoint.pp (Connection.endpoint conn));
+
not healthy
+
with e ->
+
Log.debug (fun m -> m "Connection to %a health check raised exception: %s"
+
Endpoint.pp (Connection.endpoint conn) (Printexc.to_string e));
+
true) (* Exception in health check = unhealthy *)
+
| None -> false) then
+
false
+
+
(* Optional: check if socket still connected *)
+
else if check_readable then
+
try
+
(* TODO avsm: a sockopt for this? *)
+
true
+
with
+
| _ -> false
+
else begin
+
Log.debug (fun m -> m "Connection to %a is healthy (age=%.2fs, idle=%.2fs, uses=%d)"
+
Endpoint.pp (Connection.endpoint conn)
+
age
+
(now -. Connection.last_used conn)
+
(Connection.use_count conn));
true
+
end
end
(** {1 Internal Pool Operations} *)
+
let close_internal (pool : ('clock, 'net) internal) conn =
Log.debug (fun m -> m "Closing connection to %a (age=%.2fs, uses=%d)"
Endpoint.pp (Connection.endpoint conn)
(get_time pool -. Connection.created_at conn)
···
);
(* Call hook if configured *)
+
Option.iter (fun f -> f (Connection.endpoint conn)) (Config.on_connection_closed pool.config)
+
let get_or_create_endpoint_pool (pool : ('clock, 'net) internal) endpoint =
Log.debug (fun m -> m "Getting or creating endpoint pool for %a" Endpoint.pp endpoint);
(* First try with read lock *)
···
let mutex = Eio.Mutex.create () in
Log.info (fun m -> m "Creating new endpoint pool for %a (max_connections=%d)"
+
Endpoint.pp endpoint (Config.max_connections_per_endpoint pool.config));
Log.debug (fun m -> m "About to create Eio.Pool for %a" Endpoint.pp endpoint);
let eio_pool = Eio.Pool.create
+
(Config.max_connections_per_endpoint pool.config)
~validate:(fun conn ->
Log.debug (fun m -> m "Validate called for connection to %a" Endpoint.pp endpoint);
(* Called before reusing from pool *)
···
);
(* Call hook if configured *)
+
Option.iter (fun f -> f endpoint) (Config.on_connection_reused pool.config);
(* Run health check if configured *)
+
match Config.health_check pool.config with
| Some check ->
(try check (Connection.flow conn)
with _ -> false)
···
);
(* Call hook if configured *)
+
Option.iter (fun f -> f endpoint) (Config.on_connection_created pool.config);
conn
with e ->
···
(** {1 Public API - Pool Creation} *)
+
let create ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock) ?tls ?(config = Config.default) () : t =
Log.info (fun m -> m "Creating new connection pool (max_per_endpoint=%d, max_idle=%.1fs, max_lifetime=%.1fs)"
+
(Config.max_connections_per_endpoint config)
+
(Config.max_idle_time config)
+
(Config.max_connection_lifetime config));
let pool = {
sw;
···
)
);
+
T pool
(** {1 Public API - Connection Management} *)
+
let with_connection (T pool) endpoint f =
Log.debug (fun m -> m "Acquiring connection to %a" Endpoint.pp endpoint);
let ep_pool = get_or_create_endpoint_pool pool endpoint in
···
(** {1 Public API - Statistics} *)
+
let stats (T pool) endpoint =
match Hashtbl.find_opt pool.endpoints endpoint with
| Some ep_pool ->
Eio.Mutex.use_ro ep_pool.mutex (fun () ->
···
)
| None ->
(* No pool for this endpoint yet *)
+
Stats.make
+
~active:0
+
~idle:0
+
~total_created:0
+
~total_reused:0
+
~total_closed:0
+
~errors:0
+
let all_stats (T pool) =
Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
Hashtbl.fold (fun endpoint ep_pool acc ->
let stats = Eio.Mutex.use_ro ep_pool.mutex (fun () ->
···
(** {1 Public API - Pool Management} *)
+
let clear_endpoint (T pool) endpoint =
Log.info (fun m -> m "Clearing endpoint %a from pool" Endpoint.pp endpoint);
match Hashtbl.find_opt pool.endpoints endpoint with
| Some _ep_pool ->
+23 -121
lib/conpool.mli
···
(** {1 Logging} *)
val src : Logs.Src.t
-
(** Logs source for conpool. Configure logging with:
{[
Logs.Src.set_level Conpool.src (Some Logs.Debug);
Logs.set_reporter (Logs_fmt.reporter ());
]}
*)
(** {1 Core Types} *)
-
(** Network endpoint *)
-
module Endpoint : sig
-
type t
-
(** Network endpoint identified by host and port *)
-
-
val make : host:string -> port:int -> t
-
(** Create an endpoint *)
-
-
val host : t -> string
-
(** Get the hostname *)
-
-
val port : t -> int
-
(** Get the port number *)
-
-
val pp : Format.formatter -> t -> unit
-
(** Pretty-print an endpoint *)
-
-
val equal : t -> t -> bool
-
(** Compare two endpoints for equality *)
-
-
val hash : t -> int
-
(** Hash an endpoint *)
-
end
-
-
(** TLS configuration *)
-
module Tls_config : sig
-
type t
-
(** TLS configuration applied to all connections in a pool *)
-
-
val make : config:Tls.Config.client -> ?servername:string -> unit -> t
-
(** Create TLS configuration.
-
@param config TLS client configuration
-
@param servername Optional SNI server name override. If None, uses endpoint host *)
-
-
val config : t -> Tls.Config.client
-
(** Get the TLS client configuration *)
-
-
val servername : t -> string option
-
(** Get the SNI server name override *)
-
-
val pp : Format.formatter -> t -> unit
-
(** Pretty-print TLS configuration *)
-
end
-
-
-
(** Pool configuration *)
-
module Config : sig
-
type t
-
(** Pool configuration *)
-
-
val make :
-
?max_connections_per_endpoint:int ->
-
?max_idle_time:float ->
-
?max_connection_lifetime:float ->
-
?max_connection_uses:int ->
-
?health_check:([ `Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t -> bool) ->
-
?connect_timeout:float ->
-
?connect_retry_count:int ->
-
?connect_retry_delay:float ->
-
?on_connection_created:(Endpoint.t -> unit) ->
-
?on_connection_closed:(Endpoint.t -> unit) ->
-
?on_connection_reused:(Endpoint.t -> unit) ->
-
unit -> t
-
(** Create pool configuration with optional parameters.
-
See field descriptions for defaults. *)
-
-
val default : t
-
(** Sensible defaults for most use cases:
-
- max_connections_per_endpoint: 10
-
- max_idle_time: 60.0s
-
- max_connection_lifetime: 300.0s
-
- max_connection_uses: None (unlimited)
-
- health_check: None
-
- connect_timeout: 10.0s
-
- connect_retry_count: 3
-
- connect_retry_delay: 0.1s
-
- hooks: None *)
-
-
val max_connections_per_endpoint : t -> int
-
val max_idle_time : t -> float
-
val max_connection_lifetime : t -> float
-
val max_connection_uses : t -> int option
-
val health_check : t -> ([ `Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t -> bool) option
-
val connect_timeout : t -> float option
-
val connect_retry_count : t -> int
-
val connect_retry_delay : t -> float
-
-
val pp : Format.formatter -> t -> unit
-
(** Pretty-print configuration *)
-
end
-
-
(** Statistics for an endpoint *)
-
module Stats : sig
-
type t
-
(** Statistics for a specific endpoint *)
-
-
val active : t -> int
-
(** Connections currently in use *)
-
-
val idle : t -> int
-
(** Connections in pool waiting to be reused *)
-
val total_created : t -> int
-
(** Total connections created (lifetime) *)
-
-
val total_reused : t -> int
-
(** Total times connections were reused *)
-
val total_closed : t -> int
-
(** Total connections closed *)
-
val errors : t -> int
-
(** Total connection errors *)
-
val pp : Format.formatter -> t -> unit
-
(** Pretty-print endpoint statistics *)
-
end
(** {1 Connection Pool} *)
-
type ('clock, 'net) t
-
(** Connection pool managing multiple endpoints, parameterized by clock and network types *)
val create :
sw:Eio.Switch.t ->
···
clock:'clock Eio.Time.clock ->
?tls:Tls_config.t ->
?config:Config.t ->
-
unit -> ('clock Eio.Time.clock, 'net Eio.Net.t) t
(** Create connection pool bound to switch.
All connections will be closed when switch is released.
···
(** {1 Connection Usage} *)
val with_connection :
-
('clock Eio.Time.clock, 'net Eio.Net.t) t ->
Endpoint.t ->
([ `Close | `Flow | `R | `Shutdown | `W ] Eio.Resource.t -> 'a) ->
'a
···
(** {1 Statistics & Monitoring} *)
val stats :
-
('clock Eio.Time.clock, 'net Eio.Net.t) t ->
Endpoint.t ->
Stats.t
(** Get statistics for specific endpoint *)
val all_stats :
-
('clock Eio.Time.clock, 'net Eio.Net.t) t ->
(Endpoint.t * Stats.t) list
(** Get statistics for all endpoints in pool *)
(** {1 Pool Management} *)
val clear_endpoint :
-
('clock Eio.Time.clock, 'net Eio.Net.t) t ->
Endpoint.t ->
unit
(** Clear all cached connections for a specific endpoint.
···
(** {1 Logging} *)
val src : Logs.Src.t
+
(** Logs source for the main connection pool. Configure logging with:
{[
Logs.Src.set_level Conpool.src (Some Logs.Debug);
Logs.set_reporter (Logs_fmt.reporter ());
]}
+
+
Each submodule also exposes its own log source for fine-grained control:
+
- {!Endpoint.src} - endpoint operations
+
- {!Tls_config.src} - TLS configuration
+
- {!Config.src} - pool configuration
*)
(** {1 Core Types} *)
+
(** Network endpoint representation *)
+
module Endpoint : module type of Endpoint
+
(** TLS configuration for connection pools *)
+
module Tls_config : module type of Tls_config
+
(** Configuration for connection pools *)
+
module Config : module type of Config
+
(** Statistics for connection pool endpoints *)
+
module Stats : module type of Stats
+
(** Cmdliner terms for connection pool configuration *)
+
module Cmd : module type of Cmd
(** {1 Connection Pool} *)
+
type t
+
(** Connection pool managing multiple endpoints *)
val create :
sw:Eio.Switch.t ->
···
clock:'clock Eio.Time.clock ->
?tls:Tls_config.t ->
?config:Config.t ->
+
unit -> t
(** Create connection pool bound to switch.
All connections will be closed when switch is released.
···
(** {1 Connection Usage} *)
val with_connection :
+
t ->
Endpoint.t ->
([ `Close | `Flow | `R | `Shutdown | `W ] Eio.Resource.t -> 'a) ->
'a
···
(** {1 Statistics & Monitoring} *)
val stats :
+
t ->
Endpoint.t ->
Stats.t
(** Get statistics for specific endpoint *)
val all_stats :
+
t ->
(Endpoint.t * Stats.t) list
(** Get statistics for all endpoints in pool *)
(** {1 Pool Management} *)
val clear_endpoint :
+
t ->
Endpoint.t ->
unit
(** Clear all cached connections for a specific endpoint.
+1 -1
lib/dune
···
(library
(name conpool)
(public_name conpool)
-
(libraries eio eio.unix tls-eio logs))
···
(library
(name conpool)
(public_name conpool)
+
(libraries eio eio.unix tls-eio logs fmt cmdliner))
+24
lib/endpoint.ml
···
···
+
(** Network endpoint representation *)
+
+
let src = Logs.Src.create "conpool.endpoint" ~doc:"Connection pool endpoint operations"
+
module Log = (val Logs.src_log src : Logs.LOG)
+
+
type t = {
+
host : string;
+
port : int;
+
}
+
+
let make ~host ~port =
+
Log.debug (fun m -> m "Creating endpoint: %s:%d" host port);
+
{ host; port }
+
+
let host t = t.host
+
let port t = t.port
+
+
let equal t1 t2 =
+
String.equal t1.host t2.host && t1.port = t2.port
+
+
let hash t =
+
Hashtbl.hash (t.host, t.port)
+
+
let pp = Fmt.of_to_string (fun t -> Printf.sprintf "%s:%d" t.host t.port)
+41
lib/endpoint.mli
···
···
+
(** Network endpoint representation *)
+
+
(** {1 Logging} *)
+
+
val src : Logs.Src.t
+
(** Logs source for endpoint operations. Configure logging with:
+
{[
+
Logs.Src.set_level Conpool.Endpoint.src (Some Logs.Debug);
+
]}
+
*)
+
+
(** {1 Type} *)
+
+
type t
+
(** Network endpoint identified by host and port *)
+
+
(** {1 Construction} *)
+
+
val make : host:string -> port:int -> t
+
(** Create an endpoint from a hostname and port. *)
+
+
(** {1 Accessors} *)
+
+
val host : t -> string
+
(** Get the hostname from an endpoint. *)
+
+
val port : t -> int
+
(** Get the port number from an endpoint. *)
+
+
(** {1 Comparison and Hashing} *)
+
+
val equal : t -> t -> bool
+
(** Compare two endpoints for equality. *)
+
+
val hash : t -> int
+
(** Hash an endpoint for use in hash tables. *)
+
+
(** {1 Pretty-printing} *)
+
+
val pp : t Fmt.t
+
(** Pretty-printer for endpoints. Formats as "host:port". *)
+36
lib/stats.ml
···
···
+
(** Statistics for connection pool endpoints *)
+
+
type t = {
+
active : int;
+
idle : int;
+
total_created : int;
+
total_reused : int;
+
total_closed : int;
+
errors : int;
+
}
+
+
let make ~active ~idle ~total_created ~total_reused ~total_closed ~errors =
+
{ active; idle; total_created; total_reused; total_closed; errors }
+
+
let active t = t.active
+
let idle t = t.idle
+
let total_created t = t.total_created
+
let total_reused t = t.total_reused
+
let total_closed t = t.total_closed
+
let errors t = t.errors
+
+
let pp ppf t =
+
Fmt.pf ppf
+
"@[<v>Stats:@,\
+
- Active: %d@,\
+
- Idle: %d@,\
+
- Created: %d@,\
+
- Reused: %d@,\
+
- Closed: %d@,\
+
- Errors: %d@]"
+
t.active
+
t.idle
+
t.total_created
+
t.total_reused
+
t.total_closed
+
t.errors
+43
lib/stats.mli
···
···
+
(** Statistics for connection pool endpoints *)
+
+
(** {1 Type} *)
+
+
type t
+
(** Statistics snapshot for a specific endpoint *)
+
+
(** {1 Construction} *)
+
+
val make :
+
active:int ->
+
idle:int ->
+
total_created:int ->
+
total_reused:int ->
+
total_closed:int ->
+
errors:int ->
+
t
+
(** Create a statistics snapshot. *)
+
+
(** {1 Accessors} *)
+
+
val active : t -> int
+
(** Number of connections currently in use. *)
+
+
val idle : t -> int
+
(** Number of connections in pool waiting to be reused. *)
+
+
val total_created : t -> int
+
(** Total connections created over the endpoint's lifetime. *)
+
+
val total_reused : t -> int
+
(** Total number of times connections were reused from the pool. *)
+
+
val total_closed : t -> int
+
(** Total connections that have been closed. *)
+
+
val errors : t -> int
+
(** Total connection errors encountered. *)
+
+
(** {1 Pretty-printing} *)
+
+
val pp : t Fmt.t
+
(** Pretty-printer for statistics. *)
+22
lib/tls_config.ml
···
···
+
(** TLS configuration for connection pools *)
+
+
let src = Logs.Src.create "conpool.tls" ~doc:"Connection pool TLS configuration"
+
module Log = (val Logs.src_log src : Logs.LOG)
+
+
type t = {
+
config : Tls.Config.client;
+
servername : string option;
+
}
+
+
let make ~config ?servername () =
+
Log.debug (fun m ->
+
m "Creating TLS config with servername: %s"
+
(match servername with Some s -> s | None -> "<default>"));
+
{ config; servername }
+
+
let config t = t.config
+
let servername t = t.servername
+
+
let pp ppf t =
+
Fmt.pf ppf "TLS(servername=%s)"
+
(match t.servername with Some s -> s | None -> "<default>")
+37
lib/tls_config.mli
···
···
+
(** TLS configuration for connection pools *)
+
+
(** {1 Logging} *)
+
+
val src : Logs.Src.t
+
(** Logs source for TLS configuration operations. Configure logging with:
+
{[
+
Logs.Src.set_level Conpool.Tls_config.src (Some Logs.Debug);
+
]}
+
*)
+
+
(** {1 Type} *)
+
+
type t
+
(** TLS configuration applied to all connections in a pool *)
+
+
(** {1 Construction} *)
+
+
val make : config:Tls.Config.client -> ?servername:string -> unit -> t
+
(** Create TLS configuration.
+
+
@param config TLS client configuration for all connections
+
@param servername Optional SNI server name override. If [None], uses the endpoint's hostname
+
*)
+
+
(** {1 Accessors} *)
+
+
val config : t -> Tls.Config.client
+
(** Get the TLS client configuration. *)
+
+
val servername : t -> string option
+
(** Get the SNI server name override, if any. *)
+
+
(** {1 Pretty-printing} *)
+
+
val pp : t Fmt.t
+
(** Pretty-printer for TLS configuration. *)