TCP/TLS connection pooling for Eio

more validation

+28
lib/config.ml
···
?on_connection_closed
?on_connection_reused
() =
+
(* Validate parameters *)
+
if max_connections_per_endpoint <= 0 then
+
invalid_arg (Printf.sprintf "max_connections_per_endpoint must be positive, got %d"
+
max_connections_per_endpoint);
+
+
if max_idle_time <= 0.0 then
+
invalid_arg (Printf.sprintf "max_idle_time must be positive, got %.2f" max_idle_time);
+
+
if max_connection_lifetime <= 0.0 then
+
invalid_arg (Printf.sprintf "max_connection_lifetime must be positive, got %.2f"
+
max_connection_lifetime);
+
+
(match max_connection_uses with
+
| Some n when n <= 0 ->
+
invalid_arg (Printf.sprintf "max_connection_uses must be positive, got %d" n)
+
| _ -> ());
+
+
if connect_timeout <= 0.0 then
+
invalid_arg (Printf.sprintf "connect_timeout must be positive, got %.2f" connect_timeout);
+
+
if connect_retry_count < 0 then
+
invalid_arg (Printf.sprintf "connect_retry_count must be non-negative, got %d"
+
connect_retry_count);
+
+
if connect_retry_delay <= 0.0 then
+
invalid_arg (Printf.sprintf "connect_retry_delay must be positive, got %.2f"
+
connect_retry_delay);
+
Log.debug (fun m ->
m "Creating config: max_connections=%d, max_idle=%.1fs, max_lifetime=%.1fs"
max_connections_per_endpoint max_idle_time max_connection_lifetime);
+15 -3
lib/connection.ml
···
mutable last_used : float;
mutable use_count : int;
endpoint : Endpoint.t;
+
mutex : Eio.Mutex.t;
}
let flow t = t.flow
let endpoint t = t.endpoint
let created_at t = t.created_at
-
let last_used t = t.last_used
-
let use_count t = t.use_count
+
+
let last_used t =
+
Eio.Mutex.use_ro t.mutex (fun () -> t.last_used)
+
+
let use_count t =
+
Eio.Mutex.use_ro t.mutex (fun () -> t.use_count)
+
+
let update_usage t ~now =
+
Eio.Mutex.use_rw ~protect:true t.mutex (fun () ->
+
t.last_used <- now;
+
t.use_count <- t.use_count + 1
+
)
let pp ppf t =
+
let uses = Eio.Mutex.use_ro t.mutex (fun () -> t.use_count) in
Fmt.pf ppf "Connection(endpoint=%a, age=%.2fs, uses=%d)"
Endpoint.pp t.endpoint
(Unix.gettimeofday () -. t.created_at)
-
t.use_count
+
uses
+50 -15
lib/conpool.ml
···
module Stats = Stats
module Cmd = Cmd
+
(** {1 Error Types} *)
+
+
type error =
+
| Dns_resolution_failed of { hostname : string }
+
| Connection_failed of { endpoint : Endpoint.t; attempts : int; last_error : string }
+
| Connection_timeout of { endpoint : Endpoint.t; timeout : float }
+
| Invalid_config of string
+
| Invalid_endpoint of string
+
+
exception Pool_error of error
+
+
let pp_error ppf = function
+
| Dns_resolution_failed { hostname } ->
+
Fmt.pf ppf "DNS resolution failed for hostname: %s" hostname
+
| Connection_failed { endpoint; attempts; last_error } ->
+
Fmt.pf ppf "Failed to connect to %a after %d attempts: %s"
+
Endpoint.pp endpoint attempts last_error
+
| Connection_timeout { endpoint; timeout } ->
+
Fmt.pf ppf "Connection timeout to %a after %.2fs"
+
Endpoint.pp endpoint timeout
+
| Invalid_config msg ->
+
Fmt.pf ppf "Invalid configuration: %s" msg
+
| Invalid_endpoint msg ->
+
Fmt.pf ppf "Invalid endpoint: %s" msg
+
type endp_stats = {
mutable active : int;
mutable idle : int;
···
addr
| [] ->
Log.err (fun m -> m "Failed to resolve hostname: %s" (Endpoint.host endpoint));
-
failwith (Printf.sprintf "Failed to resolve hostname: %s" (Endpoint.host endpoint))
+
raise (Pool_error (Dns_resolution_failed { hostname = Endpoint.host endpoint }))
(** {1 Connection Creation with Retry} *)
-
let rec create_connection_with_retry (pool : ('clock, 'net) internal) endpoint attempt =
+
let rec create_connection_with_retry (pool : ('clock, 'net) internal) endpoint attempt last_error =
let retry_count = Config.connect_retry_count pool.config in
if attempt > retry_count then begin
Log.err (fun m -> m "Failed to connect to %a after %d attempts"
Endpoint.pp endpoint retry_count);
-
failwith (Printf.sprintf "Failed to connect to %s:%d after %d attempts"
-
(Endpoint.host endpoint) (Endpoint.port endpoint) retry_count)
+
raise (Pool_error (Connection_failed { endpoint; attempts = retry_count; last_error }))
end;
Log.debug (fun m -> m "Connecting to %a (attempt %d/%d)"
···
last_used = now;
use_count = 0;
endpoint;
+
mutex = Eio.Mutex.create ();
}
with
-
| Eio.Time.Timeout ->
+
| Eio.Time.Timeout as e ->
Log.warn (fun m -> m "Connection timeout to %a (attempt %d)" Endpoint.pp endpoint attempt);
-
(* Exponential backoff *)
-
let delay = Config.connect_retry_delay pool.config *. (2.0 ** float_of_int (attempt - 1)) in
-
Eio.Time.sleep pool.clock delay;
-
create_connection_with_retry pool endpoint (attempt + 1)
+
let error_msg = Printexc.to_string e in
+
if attempt >= Config.connect_retry_count pool.config then
+
(* Last attempt - convert to our error type *)
+
match Config.connect_timeout pool.config with
+
| Some timeout ->
+
raise (Pool_error (Connection_timeout { endpoint; timeout }))
+
| None ->
+
raise (Pool_error (Connection_failed { endpoint; attempts = attempt; last_error = error_msg }))
+
else begin
+
(* Retry with exponential backoff *)
+
let delay = Config.connect_retry_delay pool.config *. (2.0 ** float_of_int (attempt - 1)) in
+
Eio.Time.sleep pool.clock delay;
+
create_connection_with_retry pool endpoint (attempt + 1) error_msg
+
end
| e ->
(* Other errors - retry with backoff *)
+
let error_msg = Printexc.to_string e in
Log.warn (fun m -> m "Connection attempt %d to %a failed: %s"
-
attempt Endpoint.pp endpoint (Printexc.to_string e));
+
attempt Endpoint.pp endpoint error_msg);
if attempt < Config.connect_retry_count pool.config then (
let delay = Config.connect_retry_delay pool.config *. (2.0 ** float_of_int (attempt - 1)) in
Eio.Time.sleep pool.clock delay;
-
create_connection_with_retry pool endpoint (attempt + 1)
+
create_connection_with_retry pool endpoint (attempt + 1) error_msg
) else
-
raise e
+
raise (Pool_error (Connection_failed { endpoint; attempts = attempt; last_error = error_msg }))
let create_connection (pool : ('clock, 'net) internal) endpoint =
-
create_connection_with_retry pool endpoint 1
+
create_connection_with_retry pool endpoint 1 "No attempts made"
(** {1 Connection Validation} *)
···
Endpoint.pp endpoint (Connection.use_count conn));
(* Update last used time and use count *)
-
conn.last_used <- get_time pool;
-
conn.use_count <- conn.use_count + 1;
+
Connection.update_usage conn ~now:(get_time pool);
(* Update idle stats (connection taken from idle pool) *)
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
+27
lib/conpool.mli
···
(** Cmdliner terms for connection pool configuration *)
module Cmd : module type of Cmd
+
(** {1 Errors} *)
+
+
type error =
+
| Dns_resolution_failed of { hostname : string }
+
(** DNS resolution failed for the given hostname *)
+
+
| Connection_failed of { endpoint : Endpoint.t; attempts : int; last_error : string }
+
(** Failed to establish connection after all retry attempts *)
+
+
| Connection_timeout of { endpoint : Endpoint.t; timeout : float }
+
(** Connection attempt timed out *)
+
+
| Invalid_config of string
+
(** Invalid configuration parameter *)
+
+
| Invalid_endpoint of string
+
(** Invalid endpoint specification *)
+
+
exception Pool_error of error
+
(** Exception raised by pool operations.
+
+
Most pool operations can raise this exception. Use {!pp_error} to get
+
human-readable error messages. *)
+
+
val pp_error : error Fmt.t
+
(** Pretty-printer for error values. *)
+
(** {1 Connection Pool} *)
type t
+8
lib/endpoint.ml
···
}
let make ~host ~port =
+
(* Validate port range *)
+
if port < 1 || port > 65535 then
+
invalid_arg (Printf.sprintf "Invalid port number: %d (must be 1-65535)" port);
+
+
(* Validate hostname is not empty *)
+
if String.trim host = "" then
+
invalid_arg "Hostname cannot be empty";
+
Log.debug (fun m -> m "Creating endpoint: %s:%d" host port);
{ host; port }