TCP/TLS connection pooling for Eio

refine

+1 -1
lib/config.ml
···
max_connection_lifetime : float;
max_connection_uses : int option;
health_check :
-
([ `Close | `Flow | `R | `Shutdown | `W ] Eio.Resource.t -> bool) option;
+
([Eio.Resource.close_ty | Eio.Flow.two_way_ty] Eio.Resource.t -> bool) option;
connect_timeout : float option;
connect_retry_count : int;
connect_retry_delay : float;
+2 -3
lib/config.mli
···
?max_idle_time:float ->
?max_connection_lifetime:float ->
?max_connection_uses:int ->
-
?health_check:
-
([ `Close | `Flow | `R | `Shutdown | `W ] Eio.Resource.t -> bool) ->
+
?health_check:([Eio.Resource.close_ty | Eio.Flow.two_way_ty] Eio.Resource.t -> bool) ->
?connect_timeout:float ->
?connect_retry_count:int ->
?connect_retry_delay:float ->
···
(** Get maximum connection uses, if any. *)
val health_check :
-
t -> ([ `Close | `Flow | `R | `Shutdown | `W ] Eio.Resource.t -> bool) option
+
t -> ([Eio.Resource.close_ty | Eio.Flow.two_way_ty] Eio.Resource.t -> bool) option
(** Get custom health check function, if any. *)
val connect_timeout : t -> float option
+1 -1
lib/connection.ml
···
module Log = (val Logs.src_log src : Logs.LOG)
type t = {
-
flow : [ `Close | `Flow | `R | `Shutdown | `W ] Eio.Resource.t;
+
flow : [Eio.Resource.close_ty | Eio.Flow.two_way_ty] Eio.Resource.t;
created_at : float;
mutable last_used : float;
mutable use_count : int;
+79 -37
lib/conpool.ml
···
| Invalid_config msg -> Fmt.pf ppf "Invalid configuration: %s" msg
| Invalid_endpoint msg -> Fmt.pf ppf "Invalid endpoint: %s" msg
+
type Eio.Exn.err += E of error
+
+
let err e = Eio.Exn.create (E e)
+
+
let () =
+
Eio.Exn.register_pp (fun f -> function
+
| E e ->
+
Fmt.string f "Conpool ";
+
pp_error f e;
+
true
+
| _ -> false)
+
+
(** {1 Connection Types} *)
+
+
type connection_ty = [Eio.Resource.close_ty | Eio.Flow.two_way_ty]
+
type connection = connection_ty Eio.Resource.t
+
type endp_stats = {
mutable active : int;
mutable idle : int;
···
let flow =
match pool.tls with
| None ->
-
(socket :> [ `Close | `Flow | `R | `Shutdown | `W ] Eio.Resource.t)
+
(socket :> connection)
| Some tls_cfg ->
Log.debug (fun m ->
m "Initiating TLS handshake with %a" Endpoint.pp endpoint);
···
in
Log.info (fun m ->
m "TLS connection established to %a" Endpoint.pp endpoint);
-
(tls_flow :> [ `Close | `Flow | `R | `Shutdown | `W ] Eio.Resource.t)
+
(tls_flow :> connection)
in
let now = get_time pool in
···
(** {1 Public API - Connection Management} *)
-
let with_connection (T pool) endpoint f =
+
let connection_internal ~sw (T pool) endpoint =
Log.debug (fun m -> m "Acquiring connection to %a" Endpoint.pp endpoint);
let ep_pool = get_or_create_endpoint_pool pool endpoint in
+
+
(* Create promises for connection handoff and cleanup signal *)
+
let conn_promise, conn_resolver = Eio.Promise.create () in
+
let done_promise, done_resolver = Eio.Promise.create () in
(* Increment active count *)
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
ep_pool.stats.active <- ep_pool.stats.active + 1);
-
Fun.protect
-
~finally:(fun () ->
-
(* Decrement active count *)
-
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
-
ep_pool.stats.active <- ep_pool.stats.active - 1);
-
Log.debug (fun m -> m "Released connection to %a" Endpoint.pp endpoint))
-
(fun () ->
-
(* Use Eio.Pool for resource management *)
-
Eio.Pool.use ep_pool.pool (fun conn ->
-
Log.debug (fun m ->
-
m "Using connection to %a (uses=%d)" Endpoint.pp endpoint
-
(Connection.use_count conn));
+
(* Fork a daemon fiber to manage the connection lifecycle *)
+
Eio.Fiber.fork_daemon ~sw (fun () ->
+
Fun.protect
+
~finally:(fun () ->
+
(* Decrement active count *)
+
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
+
ep_pool.stats.active <- ep_pool.stats.active - 1);
+
Log.debug (fun m -> m "Released connection to %a" Endpoint.pp endpoint))
+
(fun () ->
+
(* Use Eio.Pool for resource management *)
+
Eio.Pool.use ep_pool.pool (fun conn ->
+
Log.debug (fun m ->
+
m "Using connection to %a (uses=%d)" Endpoint.pp endpoint
+
(Connection.use_count conn));
+
+
(* Update last used time and use count *)
+
Connection.update_usage conn ~now:(get_time pool);
+
+
(* Update idle stats (connection taken from idle pool) *)
+
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
+
ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
+
+
(* Hand off connection to caller *)
+
Eio.Promise.resolve conn_resolver conn.flow;
+
+
try
+
(* Wait for switch to signal cleanup *)
+
Eio.Promise.await done_promise;
+
+
(* Success - connection will be returned to pool by Eio.Pool *)
+
(* Update idle stats (connection returned to idle pool) *)
+
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
+
ep_pool.stats.idle <- ep_pool.stats.idle + 1);
+
+
`Stop_daemon
+
with e ->
+
(* Error - close connection so it won't be reused *)
+
Log.warn (fun m ->
+
m "Error with connection to %a: %s" Endpoint.pp endpoint
+
(Printexc.to_string e));
+
close_internal pool conn;
-
(* Update last used time and use count *)
-
Connection.update_usage conn ~now:(get_time pool);
+
(* Update error stats *)
+
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
+
ep_pool.stats.errors <- ep_pool.stats.errors + 1);
-
(* Update idle stats (connection taken from idle pool) *)
-
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
-
ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
+
raise e)));
-
try
-
let result = f conn.flow in
+
(* Signal cleanup when switch ends *)
+
Eio.Switch.on_release sw (fun () ->
+
Eio.Promise.resolve done_resolver ());
-
(* Success - connection will be returned to pool by Eio.Pool *)
-
(* Update idle stats (connection returned to idle pool) *)
-
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
-
ep_pool.stats.idle <- ep_pool.stats.idle + 1);
+
(* Return the connection *)
+
Eio.Promise.await conn_promise
-
result
-
with e ->
-
(* Error - close connection so it won't be reused *)
-
Log.warn (fun m ->
-
m "Error using connection to %a: %s" Endpoint.pp endpoint
-
(Printexc.to_string e));
-
close_internal pool conn;
+
let connection ~sw t endpoint = connection_internal ~sw t endpoint
-
(* Update error stats *)
-
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
-
ep_pool.stats.errors <- ep_pool.stats.errors + 1);
+
let with_connection t endpoint f =
+
Eio.Switch.run (fun sw -> f (connection ~sw t endpoint))
-
raise e))
+
let with_connection_exn t endpoint f =
+
try with_connection t endpoint f with Pool_error e -> raise (err e)
(** {1 Public API - Statistics} *)
+59 -15
lib/conpool.mli
···
(** {1 Core Types} *)
-
module Endpoint : module type of Endpoint
+
module Endpoint = Endpoint
(** Network endpoint representation *)
-
module Tls_config : module type of Tls_config
+
module Tls_config = Tls_config
(** TLS configuration for connection pools *)
-
module Config : module type of Config
+
module Config = Config
(** Configuration for connection pools *)
-
module Stats : module type of Stats
+
module Stats = Stats
(** Statistics for connection pool endpoints *)
-
module Cmd : module type of Cmd
+
module Cmd = Cmd
(** Cmdliner terms for connection pool configuration *)
(** {1 Errors} *)
···
Most pool operations can raise this exception. Use {!pp_error} to get
human-readable error messages. *)
+
+
type Eio.Exn.err += E of error
+
(** Extension of Eio's error type for connection pool errors. *)
+
+
val err : error -> exn
+
(** [err e] is [Eio.Exn.create (E e)].
+
+
This converts a connection pool error to an Eio exception, allowing it to
+
be handled uniformly with other Eio I/O errors. *)
val pp_error : error Fmt.t
(** Pretty-printer for error values. *)
+
(** {1 Connection Types} *)
+
+
type connection_ty = [Eio.Resource.close_ty | Eio.Flow.two_way_ty]
+
(** The type tags for a pooled connection.
+
Connections support reading, writing, shutdown, and closing. *)
+
+
type connection = connection_ty Eio.Resource.t
+
(** A connection resource from the pool. *)
+
(** {1 Connection Pool} *)
type t
···
(** {1 Connection Usage} *)
-
val with_connection :
-
t ->
-
Endpoint.t ->
-
([ `Close | `Flow | `R | `Shutdown | `W ] Eio.Resource.t -> 'a) ->
-
'a
-
(** Acquire connection, use it, automatically release back to pool.
+
val connection : sw:Eio.Switch.t -> t -> Endpoint.t -> connection
+
(** [connection ~sw pool endpoint] acquires a connection from the pool.
-
If idle connection available and healthy:
-
- Reuse from pool (validates health first) Else:
+
The connection is automatically returned to the pool when [sw] finishes.
+
If the connection becomes unhealthy or an error occurs during use, it is
+
closed instead of being returned to the pool.
+
+
If an idle connection is available and healthy:
+
- Reuse from pool (validates health first)
+
+
Otherwise:
- Create new connection (may block if endpoint at limit)
-
On success: connection returned to pool for reuse On error: connection
-
closed, not returned to pool
+
Example:
+
{[
+
let endpoint = Conpool.Endpoint.make ~host:"example.com" ~port:443 in
+
Eio.Switch.run (fun sw ->
+
let conn = Conpool.connection ~sw pool endpoint in
+
Eio.Flow.copy_string "GET / HTTP/1.1\r\n\r\n" conn;
+
let buf = Eio.Buf_read.of_flow conn ~max_size:4096 in
+
Eio.Buf_read.take_all buf)
+
]} *)
+
+
val with_connection : t -> Endpoint.t -> (connection -> 'a) -> 'a
+
(** [with_connection pool endpoint fn] is a convenience wrapper around
+
{!val:connection}.
+
+
Equivalent to:
+
{[
+
Eio.Switch.run (fun sw -> fn (connection ~sw pool endpoint))
+
]}
Example:
{[
···
let buf = Eio.Buf_read.of_flow conn ~max_size:4096 in
Eio.Buf_read.take_all buf)
]} *)
+
+
val with_connection_exn : t -> Endpoint.t -> (connection -> 'a) -> 'a
+
(** [with_connection_exn pool endpoint fn] is like {!with_connection} but
+
converts {!Pool_error} exceptions to [Eio.Io] exceptions for better
+
integration with Eio error handling.
+
+
This is useful when you want pool errors to be handled uniformly with other
+
Eio I/O errors. *)
(** {1 Statistics & Monitoring} *)