TCP/TLS connection pooling for Eio

eio-context

Changed files
+72 -69
lib
+51 -53
lib/conpool.ml
···
| Invalid_config of string
| Invalid_endpoint of string
-
exception Pool_error of error
-
let pp_error ppf = function
| Dns_resolution_failed { hostname } ->
Fmt.pf ppf "DNS resolution failed for hostname: %s" hostname
···
let resolve_endpoint (pool : ('clock, 'net) internal) endpoint =
Log.debug (fun m -> m "Resolving %a..." Endpoint.pp endpoint);
-
let addrs =
-
Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint)
-
~service:(string_of_int (Endpoint.port endpoint))
-
in
-
Log.debug (fun m -> m "Got address list for %a" Endpoint.pp endpoint);
-
match addrs with
-
| addr :: _ ->
-
Log.debug (fun m ->
-
m "Resolved %a to %a" Endpoint.pp endpoint Eio.Net.Sockaddr.pp addr);
-
addr
-
| [] ->
-
Log.err (fun m ->
-
m "Failed to resolve hostname: %s" (Endpoint.host endpoint));
-
raise
-
(Pool_error
-
(Dns_resolution_failed { hostname = Endpoint.host endpoint }))
(** {1 Connection Creation with Retry} *)
···
Log.err (fun m ->
m "Failed to connect to %a after %d attempts" Endpoint.pp endpoint
retry_count);
-
raise
-
(Pool_error
-
(Connection_failed { endpoint; attempts = retry_count; last_error }))
end;
Log.debug (fun m ->
···
(* Connect with optional timeout *)
let socket =
-
match Config.connect_timeout pool.config with
-
| Some timeout ->
-
Eio.Time.with_timeout_exn pool.clock timeout (fun () ->
-
Eio.Net.connect ~sw:pool.sw pool.net addr)
-
| None -> Eio.Net.connect ~sw:pool.sw pool.net addr
in
Log.debug (fun m ->
···
| None ->
(socket :> connection)
| Some tls_config ->
-
Log.debug (fun m ->
-
m "Initiating TLS handshake with %a" Endpoint.pp endpoint);
-
let host =
-
Domain_name.(host_exn (of_string_exn (Endpoint.host endpoint)))
-
in
-
let tls_flow = Tls_eio.client_of_flow ~host tls_config socket in
-
Log.info (fun m ->
-
m "TLS connection established to %a" Endpoint.pp endpoint);
-
(tls_flow :> connection)
in
let now = get_time pool in
···
mutex = Eio.Mutex.create ();
}
with
-
| Eio.Time.Timeout as e ->
Log.warn (fun m ->
m "Connection timeout to %a (attempt %d)" Endpoint.pp endpoint attempt);
-
let error_msg = Printexc.to_string e in
if attempt >= Config.connect_retry_count pool.config then
(* Last attempt - convert to our error type *)
match Config.connect_timeout pool.config with
| Some timeout ->
-
raise (Pool_error (Connection_timeout { endpoint; timeout }))
| None ->
-
raise
-
(Pool_error
-
(Connection_failed
-
{ endpoint; attempts = attempt; last_error = error_msg }))
else begin
(* Retry with exponential backoff *)
let delay =
···
*. (2.0 ** float_of_int (attempt - 1))
in
Eio.Time.sleep pool.clock delay;
-
create_connection_with_retry pool endpoint (attempt + 1) error_msg
end
-
| e ->
-
(* Other errors - retry with backoff *)
-
let error_msg = Printexc.to_string e in
Log.warn (fun m ->
m "Connection attempt %d to %a failed: %s" attempt Endpoint.pp
endpoint error_msg);
···
Eio.Time.sleep pool.clock delay;
create_connection_with_retry pool endpoint (attempt + 1) error_msg)
else
-
raise
-
(Pool_error
-
(Connection_failed
-
{ endpoint; attempts = attempt; last_error = error_msg }))
let create_connection (pool : ('clock, 'net) internal) endpoint =
create_connection_with_retry pool endpoint 1 "No attempts made"
···
let with_connection t endpoint f =
Eio.Switch.run (fun sw -> f (connection ~sw t endpoint))
-
-
let with_connection_exn t endpoint f =
-
try with_connection t endpoint f with Pool_error e -> raise (err e)
(** {1 Public API - Statistics} *)
···
| Invalid_config of string
| Invalid_endpoint of string
let pp_error ppf = function
| Dns_resolution_failed { hostname } ->
Fmt.pf ppf "DNS resolution failed for hostname: %s" hostname
···
let resolve_endpoint (pool : ('clock, 'net) internal) endpoint =
Log.debug (fun m -> m "Resolving %a..." Endpoint.pp endpoint);
+
try
+
let addrs =
+
Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint)
+
~service:(string_of_int (Endpoint.port endpoint))
+
in
+
Log.debug (fun m -> m "Got address list for %a" Endpoint.pp endpoint);
+
match addrs with
+
| addr :: _ ->
+
Log.debug (fun m ->
+
m "Resolved %a to %a" Endpoint.pp endpoint Eio.Net.Sockaddr.pp addr);
+
addr
+
| [] ->
+
Log.err (fun m ->
+
m "Failed to resolve hostname: %s" (Endpoint.host endpoint));
+
raise (err (Dns_resolution_failed { hostname = Endpoint.host endpoint }))
+
with Eio.Io _ as ex ->
+
let bt = Printexc.get_raw_backtrace () in
+
Eio.Exn.reraise_with_context ex bt "resolving %a" Endpoint.pp endpoint
(** {1 Connection Creation with Retry} *)
···
Log.err (fun m ->
m "Failed to connect to %a after %d attempts" Endpoint.pp endpoint
retry_count);
+
raise (err (Connection_failed { endpoint; attempts = retry_count; last_error }))
end;
Log.debug (fun m ->
···
(* Connect with optional timeout *)
let socket =
+
try
+
match Config.connect_timeout pool.config with
+
| Some timeout ->
+
Eio.Time.with_timeout_exn pool.clock timeout (fun () ->
+
Eio.Net.connect ~sw:pool.sw pool.net addr)
+
| None -> Eio.Net.connect ~sw:pool.sw pool.net addr
+
with Eio.Io _ as ex ->
+
let bt = Printexc.get_raw_backtrace () in
+
Eio.Exn.reraise_with_context ex bt "connecting to %a" Endpoint.pp endpoint
in
Log.debug (fun m ->
···
| None ->
(socket :> connection)
| Some tls_config ->
+
try
+
Log.debug (fun m ->
+
m "Initiating TLS handshake with %a" Endpoint.pp endpoint);
+
let host =
+
Domain_name.(host_exn (of_string_exn (Endpoint.host endpoint)))
+
in
+
let tls_flow = Tls_eio.client_of_flow ~host tls_config socket in
+
Log.info (fun m ->
+
m "TLS connection established to %a" Endpoint.pp endpoint);
+
(tls_flow :> connection)
+
with Eio.Io _ as ex ->
+
let bt = Printexc.get_raw_backtrace () in
+
Eio.Exn.reraise_with_context ex bt "TLS handshake with %a" Endpoint.pp endpoint
in
let now = get_time pool in
···
mutex = Eio.Mutex.create ();
}
with
+
| Eio.Time.Timeout ->
Log.warn (fun m ->
m "Connection timeout to %a (attempt %d)" Endpoint.pp endpoint attempt);
if attempt >= Config.connect_retry_count pool.config then
(* Last attempt - convert to our error type *)
match Config.connect_timeout pool.config with
| Some timeout ->
+
raise (err (Connection_timeout { endpoint; timeout }))
| None ->
+
raise (err (Connection_failed
+
{ endpoint; attempts = attempt; last_error = "Timeout" }))
else begin
(* Retry with exponential backoff *)
let delay =
···
*. (2.0 ** float_of_int (attempt - 1))
in
Eio.Time.sleep pool.clock delay;
+
create_connection_with_retry pool endpoint (attempt + 1) "Timeout"
end
+
| Eio.Io _ as ex ->
+
(* Eio IO errors - retry with backoff and add context on final failure *)
+
let error_msg = Printexc.to_string ex in
Log.warn (fun m ->
m "Connection attempt %d to %a failed: %s" attempt Endpoint.pp
endpoint error_msg);
···
Eio.Time.sleep pool.clock delay;
create_connection_with_retry pool endpoint (attempt + 1) error_msg)
else
+
let bt = Printexc.get_raw_backtrace () in
+
Eio.Exn.reraise_with_context ex bt "after %d retry attempts" attempt
let create_connection (pool : ('clock, 'net) internal) endpoint =
create_connection_with_retry pool endpoint 1 "No attempts made"
···
let with_connection t endpoint f =
Eio.Switch.run (fun sw -> f (connection ~sw t endpoint))
(** {1 Public API - Statistics} *)
+21 -16
lib/conpool.mli
···
| Invalid_config of string (** Invalid configuration parameter *)
| Invalid_endpoint of string (** Invalid endpoint specification *)
-
exception Pool_error of error
-
(** Exception raised by pool operations.
-
Most pool operations can raise this exception. Use {!pp_error} to get
-
human-readable error messages. *)
-
type Eio.Exn.err += E of error
-
(** Extension of Eio's error type for connection pool errors. *)
val err : error -> exn
(** [err e] is [Eio.Exn.create (E e)].
This converts a connection pool error to an Eio exception, allowing it to
-
be handled uniformly with other Eio I/O errors. *)
val pp_error : error Fmt.t
-
(** Pretty-printer for error values. *)
(** {1 Connection Types} *)
···
let buf = Eio.Buf_read.of_flow conn ~max_size:4096 in
Eio.Buf_read.take_all buf)
]} *)
-
-
val with_connection_exn : t -> Endpoint.t -> (connection -> 'a) -> 'a
-
(** [with_connection_exn pool endpoint fn] is like {!with_connection} but
-
converts {!Pool_error} exceptions to [Eio.Io] exceptions for better
-
integration with Eio error handling.
-
-
This is useful when you want pool errors to be handled uniformly with other
-
Eio I/O errors. *)
(** {1 Statistics & Monitoring} *)
···
| Invalid_config of string (** Invalid configuration parameter *)
| Invalid_endpoint of string (** Invalid endpoint specification *)
+
type Eio.Exn.err += E of error
+
(** Extension of Eio's error type for connection pool errors.
+
+
Pool operations raise [Eio.Io] exceptions with context information added at
+
each layer. The innermost error is often [E error], wrapped with context
+
strings that describe the operation being performed.
+
Example error message:
+
{[
+
Eio.Io Conpool Dns_resolution_failed { hostname = "invalid.example" },
+
resolving invalid.example:443,
+
connecting to invalid.example:443,
+
after 3 retry attempts
+
]}
+
Use {!pp_error} to format just the error code, or let Eio format the full
+
exception with context. *)
val err : error -> exn
(** [err e] is [Eio.Exn.create (E e)].
This converts a connection pool error to an Eio exception, allowing it to
+
be handled uniformly with other Eio I/O errors and enabling context to be
+
added via [Eio.Exn.reraise_with_context]. *)
val pp_error : error Fmt.t
+
(** Pretty-printer for error values (without context).
+
+
For full error messages including context, use [Eio.Exn.pp] or simply let
+
the exception be printed naturally. *)
(** {1 Connection Types} *)
···
let buf = Eio.Buf_read.of_flow conn ~max_size:4096 in
Eio.Buf_read.take_all buf)
]} *)
(** {1 Statistics & Monitoring} *)