TCP/TLS connection pooling for Eio

better eio errors

Changed files
+14 -70
lib
-5
lib/config.ml
···
(Printf.sprintf "connect_retry_delay must be positive, got %.2f"
connect_retry_delay);
-
Log.debug (fun m ->
-
m
-
"Creating config: max_connections=%d, max_idle=%.1fs, \
-
max_lifetime=%.1fs"
-
max_connections_per_endpoint max_idle_time max_connection_lifetime);
{
max_connections_per_endpoint;
max_idle_time;
+14 -64
lib/conpool.ml
···
(** {1 DNS Resolution} *)
let resolve_endpoint (pool : ('clock, 'net) internal) endpoint =
-
Log.debug (fun m -> m "Resolving %a..." Endpoint.pp endpoint);
+
Log.debug (fun m -> m "Resolving %a" Endpoint.pp endpoint);
try
let addrs =
Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint)
~service:(string_of_int (Endpoint.port endpoint))
in
-
Log.debug (fun m -> m "Got address list for %a" Endpoint.pp endpoint);
match addrs with
| addr :: _ ->
Log.debug (fun m ->
m "Resolved %a to %a" Endpoint.pp endpoint Eio.Net.Sockaddr.pp addr);
addr
| [] ->
-
Log.err (fun m ->
-
m "Failed to resolve hostname: %s" (Endpoint.host endpoint));
+
(* Raise exception with error code - context will be added when caught *)
raise (err (Dns_resolution_failed { hostname = Endpoint.host endpoint }))
with Eio.Io _ as ex ->
let bt = Printexc.get_raw_backtrace () in
···
let rec create_connection_with_retry (pool : ('clock, 'net) internal) endpoint
attempt last_error =
let retry_count = Config.connect_retry_count pool.config in
-
if attempt > retry_count then begin
-
Log.err (fun m ->
-
m "Failed to connect to %a after %d attempts" Endpoint.pp endpoint
-
retry_count);
-
raise (err (Connection_failed { endpoint; attempts = retry_count; last_error }))
-
end;
+
if attempt > retry_count then
+
(* Raise exception with error code - context will be added when caught *)
+
raise (err (Connection_failed { endpoint; attempts = retry_count; last_error }));
Log.debug (fun m ->
m "Connecting to %a (attempt %d/%d)" Endpoint.pp endpoint attempt
···
try
let addr = resolve_endpoint pool endpoint in
-
Log.debug (fun m -> m "Resolved %a to address" Endpoint.pp endpoint);
(* Connect with optional timeout *)
let socket =
···
(Config.on_connection_closed pool.config)
let get_or_create_endpoint_pool (pool : ('clock, 'net) internal) endpoint =
-
Log.debug (fun m ->
-
m "Getting or creating endpoint pool for %a" Endpoint.pp endpoint);
-
(* First try with read lock *)
match
Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
Hashtbl.find_opt pool.endpoints endpoint)
with
| Some ep_pool ->
-
Log.debug (fun m ->
-
m "Found existing endpoint pool for %a" Endpoint.pp endpoint);
ep_pool
| None ->
-
Log.debug (fun m ->
-
m "No existing pool, need to create for %a" Endpoint.pp endpoint);
(* Need to create - use write lock *)
Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () ->
(* Check again in case another fiber created it *)
match Hashtbl.find_opt pool.endpoints endpoint with
| Some ep_pool ->
-
Log.debug (fun m ->
-
m "Another fiber created pool for %a" Endpoint.pp endpoint);
ep_pool
| None ->
(* Create new endpoint pool *)
···
let mutex = Eio.Mutex.create () in
Log.info (fun m ->
-
m "Creating new endpoint pool for %a (max_connections=%d)"
+
m "Creating endpoint pool for %a (max_connections=%d)"
Endpoint.pp endpoint
(Config.max_connections_per_endpoint pool.config));
-
Log.debug (fun m ->
-
m "About to create Eio.Pool for %a" Endpoint.pp endpoint);
-
let eio_pool =
Eio.Pool.create
(Config.max_connections_per_endpoint pool.config)
~validate:(fun conn ->
-
Log.debug (fun m ->
-
m "Validate called for connection to %a" Endpoint.pp
-
endpoint);
-
(* Called before reusing from pool *)
let healthy = is_healthy pool ~check_readable:false conn in
-
if healthy then (
-
Log.debug (fun m ->
-
m "Reusing connection to %a from pool" Endpoint.pp
-
endpoint);
-
(* Update stats for reuse *)
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
stats.total_reused <- stats.total_reused + 1);
···
| Some check -> (
try check (Connection.flow conn) with _ -> false)
| None -> true)
-
else begin
-
Log.debug (fun m ->
-
m
-
"Connection to %a failed validation, creating new \
-
one"
-
Endpoint.pp endpoint);
-
false
-
end)
+
else
+
false)
~dispose:(fun conn ->
(* Called when removing from pool *)
Eio.Cancel.protect (fun () ->
···
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
stats.total_closed <- stats.total_closed + 1)))
(fun () ->
-
Log.debug (fun m ->
-
m "Factory function called for %a" Endpoint.pp endpoint);
try
let conn = create_connection pool endpoint in
-
Log.debug (fun m ->
-
m "Connection created successfully for %a" Endpoint.pp
-
endpoint);
-
(* Update stats *)
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
stats.total_created <- stats.total_created + 1);
···
(Config.on_connection_created pool.config);
conn
-
with e ->
-
Log.err (fun m ->
-
m "Factory function failed for %a: %s" Endpoint.pp
-
endpoint (Printexc.to_string e));
-
(* Update error stats *)
+
with Eio.Io _ as ex ->
+
(* Eio.Io exceptions already have full context from create_connection.
+
Just update error stats and let the exception propagate. *)
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
stats.errors <- stats.errors + 1);
-
raise e)
+
raise ex)
in
-
-
Log.debug (fun m ->
-
m "Eio.Pool created successfully for %a" Endpoint.pp endpoint);
let ep_pool = { pool = eio_pool; stats; mutex } in
-
Hashtbl.add pool.endpoints endpoint ep_pool;
-
Log.debug (fun m ->
-
m "Endpoint pool added to hashtable for %a" Endpoint.pp
-
endpoint);
ep_pool)
(** {1 Public API - Pool Creation} *)
···
`Stop_daemon
with e ->
-
(* Error - close connection so it won't be reused *)
-
Log.warn (fun m ->
-
m "Error with connection to %a: %s" Endpoint.pp endpoint
-
(Printexc.to_string e));
+
(* Error during connection usage - close so it won't be reused.
+
The exception already has context from where it was raised. *)
close_internal pool conn;
(* Update error stats *)
-1
lib/endpoint.ml
···
(* Validate hostname is not empty *)
if String.trim host = "" then invalid_arg "Hostname cannot be empty";
-
Log.debug (fun m -> m "Creating endpoint: %s:%d" host port);
{ host; port }
let host t = t.host