My agentic slop goes here. Not intended for anyone else!

Conpool - TCP/IP Connection Pool Library for Eio#

Overview#

Conpool is a protocol-agnostic TCP/IP connection pooling library built on Eio.Pool. It manages connection lifecycles, validates connection health, and provides per-endpoint resource limiting for any TCP-based protocol (HTTP, Redis, PostgreSQL, etc.).

Design Philosophy#

Separation of Concerns#

  • Conpool: Manages TCP sockets and their lifecycle (connect, validate, close)
  • Protocol Libraries (requests, redis-eio, etc.): Handle protocol-level logic (HTTP requests, Redis commands)

Key Principles#

  1. Protocol Agnostic: Works with any TCP-based protocol
  2. Eio.Pool Foundation: Leverages Eio.Pool for resource management and limits
  3. Per-Endpoint Pooling: Separate pool per (host, port, tls) tuple
  4. Connection Validation: Health checks, age limits, idle timeouts
  5. Structured Concurrency: All resources bound to switches
  6. Cancel-Safe: Critical operations protected with Cancel.protect
  7. Observable: Rich statistics and monitoring hooks

Architecture#

Core Types#

(** conpool.mli *)

type endpoint = {
  host : string;
  port : int;
  tls : tls_config option;
}
and tls_config = {
  config : Tls.Config.client;
  servername : string option;
}

type connection
(** Opaque connection handle with metadata *)

type t
(** Connection pool managing multiple endpoints *)

type config = {
  max_connections_per_endpoint : int;
  (** Maximum connections per (host, port, tls) endpoint. Default: 10 *)

  max_idle_time : float;
  (** Maximum time (seconds) a connection can be idle before closure. Default: 60.0 *)

  max_connection_lifetime : float;
  (** Maximum lifetime (seconds) of any connection. Default: 300.0 *)

  max_connection_uses : int option;
  (** Maximum number of times a connection can be reused. None = unlimited. Default: None *)

  health_check : (Eio.Flow.two_way -> bool) option;
  (** Optional health check function. Called before reusing idle connection. Default: None *)

  connect_timeout : float option;
  (** Timeout for establishing new connections. Default: Some 10.0 *)

  on_connection_created : (endpoint -> unit) option;
  (** Hook called when new connection created. Default: None *)

  on_connection_closed : (endpoint -> unit) option;
  (** Hook called when connection closed. Default: None *)

  on_connection_reused : (endpoint -> unit) option;
  (** Hook called when connection reused from pool. Default: None *)
}

val default_config : config
(** Sensible defaults for most use cases *)

Pool Creation#

val create :
  sw:Eio.Switch.t ->
  net:#Eio.Net.t ->
  ?config:config ->
  unit -> t
(** Create connection pool bound to switch.
    All connections will be closed when switch is released. *)

Connection Acquisition & Release#

val with_connection :
  t ->
  endpoint ->
  (Eio.Flow.two_way -> 'a) ->
  'a
(** Acquire connection, use it, automatically release.

    If idle connection available and healthy:
      - Reuse from pool
    Else:
      - Create new connection (may block if endpoint at limit)

    On success: connection returned to pool
    On error: connection closed, not returned to pool

    Example:
    {[
      Conpool.with_connection pool endpoint (fun conn ->
        (* Use conn for HTTP request, Redis command, etc. *)
        Eio.Flow.write conn request_bytes;
        Eio.Flow.read conn response_buf
      )
    ]}
*)

val acquire :
  t ->
  endpoint ->
  connection
(** Manually acquire connection. Must call [release] or [close] later.
    Use [with_connection] instead unless you need explicit control. *)

val release :
  t ->
  connection ->
  unit
(** Return connection to pool. Connection must be in clean state.
    If connection is unhealthy, call [close] instead. *)

val close :
  t ->
  connection ->
  unit
(** Close connection immediately, remove from pool. *)

val get_flow :
  connection ->
  Eio.Flow.two_way
(** Extract underlying Eio flow from connection. *)

Connection Validation#

val is_healthy :
  ?check_readable:bool ->
  connection ->
  bool
(** Check if connection is healthy.

    Validates:
    - Not past max_connection_lifetime
    - Not idle past max_idle_time
    - Not exceeded max_connection_uses
    - Optional: health_check function (from config)
    - Optional: check_readable=true tests if socket still connected via 0-byte read
*)

val validate_and_release :
  t ->
  connection ->
  unit
(** Validate connection health, then release to pool if healthy or close if not.
    Equivalent to: if is_healthy conn then release pool conn else close pool conn *)

Statistics & Monitoring#

type endpoint_stats = {
  active : int;           (** Connections currently in use *)
  idle : int;             (** Connections in pool waiting to be reused *)
  total_created : int;    (** Total connections created (lifetime) *)
  total_reused : int;     (** Total times connections were reused *)
  total_closed : int;     (** Total connections closed *)
  errors : int;           (** Total connection errors *)
}

val stats :
  t ->
  endpoint ->
  endpoint_stats
(** Get statistics for specific endpoint *)

val all_stats :
  t ->
  (endpoint * endpoint_stats) list
(** Get statistics for all endpoints in pool *)

val pp_stats :
  Format.formatter ->
  endpoint_stats ->
  unit
(** Pretty-print endpoint statistics *)

Pool Management#

val close_idle_connections :
  t ->
  endpoint ->
  unit
(** Close all idle connections for endpoint (keeps active ones) *)

val close_all_connections :
  t ->
  endpoint ->
  unit
(** Close all connections for endpoint (blocks until active ones released) *)

val close_pool :
  t ->
  unit
(** Close entire pool. Blocks until all active connections released.
    Automatically called when switch releases. *)

Implementation Details#

Per-Endpoint Pool Structure#

Each endpoint gets its own Eio.Pool.t managing connections to that destination:

(* Internal implementation *)

type connection_metadata = {
  flow : Eio.Flow.two_way;
  created_at : float;
  mutable last_used : float;
  mutable use_count : int;
  endpoint : endpoint;
}

type connection = connection_metadata

type endpoint_pool = {
  pool : connection Eio.Pool.t;
  stats : endpoint_stats_mutable;
  mutex : Eio.Mutex.t;
}

type t = {
  sw : Eio.Switch.t;
  net : Eio.Net.t;
  config : config;
  endpoints : (endpoint, endpoint_pool) Hashtbl.t;
  endpoints_mutex : Eio.Mutex.t;
}

Connection Lifecycle with Eio.Pool#

let with_connection pool endpoint f =
  (* Get or create endpoint pool *)
  let ep_pool = get_or_create_endpoint_pool pool endpoint in

  (* Use Eio.Pool.use for resource management *)
  Eio.Pool.use ep_pool.pool (fun conn ->
    (* Connection acquired from pool or newly created *)

    (* Validate before use *)
    if not (is_healthy ~check_readable:true conn) then (
      (* Connection unhealthy, close and create new one *)
      close_internal pool conn;
      let new_conn = create_connection pool endpoint in

      (* Use new connection with failure boundary *)
      match f new_conn.flow with
      | result ->
          (* Success - update metadata and return to pool *)
          conn.last_used <- Unix.gettimeofday ();
          conn.use_count <- conn.use_count + 1;
          result
      | exception e ->
          (* Error - close connection, don't return to pool *)
          close_internal pool new_conn;
          raise e
    ) else (
      (* Connection healthy, use it *)
      match f conn.flow with
      | result ->
          (* Success - update metadata and return to pool *)
          conn.last_used <- Unix.gettimeofday ();
          conn.use_count <- conn.use_count + 1;
          result
      | exception e ->
          (* Error - close connection, don't return to pool *)
          close_internal pool conn;
          raise e
    )
  )

Eio.Pool Integration#

let create_endpoint_pool pool endpoint =
  (* Eio.Pool.create manages resource limits *)
  let eio_pool = Eio.Pool.create
    pool.config.max_connections_per_endpoint
    ~validate:(fun conn ->
      (* Called before reusing from pool *)
      is_healthy ~check_readable:false conn
    )
    ~dispose:(fun conn ->
      (* Called when removing from pool *)
      Eio.Cancel.protect (fun () ->
        close_internal pool conn
      )
    )
    (fun () ->
      (* Factory: create new connection *)
      create_connection pool endpoint
    )
  in

  {
    pool = eio_pool;
    stats = create_stats ();
    mutex = Eio.Mutex.create ();
  }

Connection Creation with Timeout & TLS#

let create_connection pool endpoint =
  (* Track stats *)
  Eio.Mutex.use_rw pool.endpoints_mutex (fun () ->
    let ep_pool = Hashtbl.find pool.endpoints endpoint in
    ep_pool.stats.total_created <- ep_pool.stats.total_created + 1
  );

  (* Call hook if configured *)
  Option.iter (fun f -> f endpoint) pool.config.on_connection_created;

  (* Connect with optional timeout *)
  let connect_with_timeout () =
    let addr = `Tcp (
      Eio.Net.Ipaddr.V4.loopback, (* TODO: resolve hostname *)
      endpoint.port
    ) in

    match pool.config.connect_timeout with
    | Some timeout ->
        Eio.Time.with_timeout_exn pool.clock timeout (fun () ->
          Eio.Net.connect ~sw:pool.sw pool.net addr
        )
    | None ->
        Eio.Net.connect ~sw:pool.sw pool.net addr
  in

  let flow = connect_with_timeout () in

  (* Wrap with TLS if configured *)
  let flow = match endpoint.tls with
    | None -> flow
    | Some tls_cfg ->
        let host = match tls_cfg.servername with
          | Some name -> Domain_name.(host_exn (of_string_exn name))
          | None -> Domain_name.(host_exn (of_string_exn endpoint.host))
        in
        Tls_eio.client_of_flow ~host tls_cfg.config flow
  in

  {
    flow;
    created_at = Unix.gettimeofday ();
    last_used = Unix.gettimeofday ();
    use_count = 0;
    endpoint;
  }

Connection Validation#

let is_healthy ?(check_readable = false) conn =
  let now = Unix.gettimeofday () in
  let config = (* get config from pool *) in

  (* Check age *)
  let age = now -. conn.created_at in
  if age > config.max_connection_lifetime then
    false

  (* Check idle time *)
  else if (now -. conn.last_used) > config.max_idle_time then
    false

  (* Check use count *)
  else if (match config.max_connection_uses with
           | Some max -> conn.use_count >= max
           | None -> false) then
    false

  (* Optional: custom health check *)
  else if (match config.health_check with
           | Some check -> not (check conn.flow)
           | None -> false) then
    false

  (* Optional: check if socket still connected *)
  else if check_readable then
    try
      (* Try zero-byte read - if socket closed, will raise *)
      let buf = Cstruct.create 0 in
      let _ = Eio.Flow.single_read conn.flow buf in
      true
    with
    | End_of_file -> false
    | _ -> false

  else
    true

Graceful Shutdown with Cancel.protect#

let close_pool pool =
  Eio.Cancel.protect (fun () ->
    (* Close all endpoint pools *)
    Hashtbl.iter (fun endpoint ep_pool ->
      (* Eio.Pool.dispose will call our dispose function for each connection *)
      Eio.Pool.dispose ep_pool.pool
    ) pool.endpoints;

    Hashtbl.clear pool.endpoints
  )

(* Register cleanup with switch *)
let create ~sw ~net ?(config = default_config) () =
  let pool = {
    sw;
    net;
    config;
    endpoints = Hashtbl.create 16;
    endpoints_mutex = Eio.Mutex.create ();
  } in

  (* Auto-cleanup on switch release *)
  Eio.Switch.on_release sw (fun () ->
    close_pool pool
  );

  pool

Usage Examples#

Example 1: HTTP Client Connection Pooling#

open Eio.Std

let () =
  Eio_main.run @@ fun env ->
  Switch.run @@ fun sw ->

  (* Create connection pool *)
  let pool = Conpool.create ~sw ~net:env#net () in

  (* Define endpoint *)
  let endpoint = Conpool.{
    host = "example.com";
    port = 443;
    tls = Some {
      config = my_tls_config;
      servername = Some "example.com";
    };
  } in

  (* Make 100 requests - connections will be reused *)
  for i = 1 to 100 do
    Conpool.with_connection pool endpoint (fun conn ->
      (* Send HTTP request *)
      let request = Printf.sprintf
        "GET / HTTP/1.1\r\nHost: example.com\r\n\r\n" in
      Eio.Flow.write conn [Cstruct.of_string request];

      (* Read response *)
      let buf = Cstruct.create 4096 in
      let n = Eio.Flow.single_read conn buf in
      Printf.printf "Response %d: %d bytes\n" i n
    )
  done;

  (* Print statistics *)
  let stats = Conpool.stats pool endpoint in
  Printf.printf "Created: %d, Reused: %d\n"
    stats.total_created stats.total_reused

Example 2: Redis Client with Health Checks#

let redis_health_check flow =
  (* Send PING command *)
  try
    Eio.Flow.write flow [Cstruct.of_string "PING\r\n"];
    let buf = Cstruct.create 7 in
    let n = Eio.Flow.single_read flow buf in
    (* Check for "+PONG\r\n" response *)
    n = 7 && Cstruct.to_string buf = "+PONG\r\n"
  with
  | _ -> false

let () =
  Eio_main.run @@ fun env ->
  Switch.run @@ fun sw ->

  let config = Conpool.{
    default_config with
    health_check = Some redis_health_check;
    max_idle_time = 30.0;  (* Redis connections timeout quickly *)
    max_connections_per_endpoint = 50;
  } in

  let pool = Conpool.create ~sw ~net:env#net ~config () in

  let redis_endpoint = Conpool.{
    host = "localhost";
    port = 6379;
    tls = None;
  } in

  (* Connection automatically validated with PING before reuse *)
  Conpool.with_connection pool redis_endpoint (fun conn ->
    Eio.Flow.write conn [Cstruct.of_string "GET mykey\r\n"];
    (* ... read response ... *)
  )

Example 3: PostgreSQL with Connection Limits#

let () =
  Eio_main.run @@ fun env ->
  Switch.run @@ fun sw ->

  let config = Conpool.{
    default_config with
    max_connections_per_endpoint = 20;  (* PostgreSQL default max_connections *)
    max_connection_lifetime = 3600.0;   (* 1 hour max lifetime *)
    max_connection_uses = Some 1000;    (* Recycle after 1000 queries *)
  } in

  let pool = Conpool.create ~sw ~net:env#net ~config () in

  let db_endpoint = Conpool.{
    host = "db.example.com";
    port = 5432;
    tls = Some { config = tls_config; servername = None };
  } in

  (* 100 concurrent queries - limited to 20 connections *)
  Eio.Fiber.all (List.init 100 (fun i ->
    (fun () ->
      Conpool.with_connection pool db_endpoint (fun conn ->
        (* Execute query on conn *)
        Printf.printf "Query %d\n" i
      )
    )
  ))

Example 4: Manual Connection Management#

(* Advanced: manual acquire/release for transactions *)
let with_transaction pool endpoint f =
  let conn = Conpool.acquire pool endpoint in

  try
    (* Begin transaction *)
    Eio.Flow.write (Conpool.get_flow conn)
      [Cstruct.of_string "BEGIN\r\n"];

    (* Execute user code *)
    let result = f (Conpool.get_flow conn) in

    (* Commit *)
    Eio.Flow.write (Conpool.get_flow conn)
      [Cstruct.of_string "COMMIT\r\n"];

    (* Return connection to pool *)
    Conpool.release pool conn;

    result
  with e ->
    (* Rollback on error *)
    Eio.Flow.write (Conpool.get_flow conn)
      [Cstruct.of_string "ROLLBACK\r\n"];

    (* Connection still usable, return to pool *)
    Conpool.release pool conn;

    raise e

Integration with Requests Library#

The requests library will use Conpool for TCP connection management:

(* requests/lib/one.ml *)

type ('a,'b) t = {
  clock : 'a;
  net : 'b;
  default_headers : Headers.t;
  timeout : Timeout.t;
  (* ... other fields ... *)
  connection_pool : Conpool.t;  (* NEW *)
}

let request ~sw ?client ~method_ url =
  let client = get_client client in

  (* Parse URL to endpoint *)
  let uri = Uri.of_string url in
  let endpoint = uri_to_conpool_endpoint uri client.tls_config in

  (* Acquire connection from pool *)
  Conpool.with_connection client.connection_pool endpoint (fun tcp_conn ->
    (* Create cohttp client wrapping this connection *)
    let cohttp_client = make_cohttp_client_from_flow tcp_conn in

    (* Make HTTP request over pooled connection *)
    let resp, resp_body =
      Cohttp_eio.Client.call ~sw cohttp_client method_ uri
        ~headers ?body
    in

    (* Must fully drain body before with_connection returns *)
    (* Otherwise connection in inconsistent state *)
    Eio.Cancel.protect (fun () ->
      let body_data = Eio.Buf_read.parse_exn take_all resp_body
        ~max_size:max_int in

      (* Connection auto-returned to pool when with_connection exits *)
      Response.make ~sw ~status ~headers
        ~body:(string_source body_data) ~url ~elapsed
    )
  )

Advanced Features#

DNS Caching & Resolution#

Conpool will cache DNS lookups per hostname:

type t = {
  (* ... existing fields ... *)
  dns_cache : (string, Eio.Net.Sockaddr.t list) Hashtbl.t;
  dns_cache_mutex : Eio.Mutex.t;
  dns_cache_ttl : float;  (* Default: 300.0 seconds *)
}

let resolve_hostname pool host port =
  (* Check cache first *)
  (* Fall back to Eio.Net.getaddrinfo_stream *)
  (* Cache result with TTL *)

Connection Warming#

Pre-establish connections to reduce first-request latency:

val warm_endpoint :
  t ->
  endpoint ->
  count:int ->
  unit
(** Pre-create [count] connections to endpoint and add to pool *)

Circuit Breaker Integration#

type circuit_breaker_state =
  | Closed   (* Normal operation *)
  | Open     (* Failing, reject requests *)
  | HalfOpen (* Testing if recovered *)

val with_circuit_breaker :
  t ->
  endpoint ->
  failure_threshold:int ->
  timeout:float ->
  (Eio.Flow.two_way -> 'a) ->
  'a
(** Wrap with_connection with circuit breaker pattern *)

Testing Strategy#

Unit Tests#

  1. Connection lifecycle:

    • Create → Use → Release → Reuse
    • Create → Use → Error → Close (not reused)
  2. Validation:

    • Age expiration
    • Idle timeout
    • Use count limit
    • Health check failure
  3. Concurrency:

    • Multiple fibers acquiring from same endpoint
    • Limit enforcement (blocks at max_connections_per_endpoint)
    • Thread-safety of pool operations
  4. Cancel safety:

    • Cancel during connection creation
    • Cancel during use
    • Cancel during release
    • Pool cleanup on switch release

Integration Tests#

  1. Real TCP servers:

    • HTTP server with keep-alive
    • Echo server for connection reuse
    • Server that closes connections to test validation
  2. Performance:

    • Connection reuse speedup (10x for 100 requests)
    • Concurrent request handling
    • Pool contention under load

File Structure#

conpool/
├── CLAUDE.md              (this file)
├── dune-project
├── conpool.opam
├── lib/
│   ├── dune
│   ├── conpool.ml
│   ├── conpool.mli
│   └── conpool_stats.ml   (statistics tracking)
├── test/
│   ├── dune
│   ├── test_lifecycle.ml
│   ├── test_validation.ml
│   ├── test_concurrency.ml
│   └── test_integration.ml
└── examples/
    ├── http_client.ml
    ├── redis_client.ml
    └── postgres_client.ml

Dependencies#

eio >= 1.0
tls-eio >= 1.0

Comparison with Other Approaches#

vs. cohttp-lwt Connection_cache#

cohttp-lwt has Connection_cache module for HTTP connection pooling:

  • Limited: HTTP-specific, tied to cohttp
  • Conpool: Protocol-agnostic, reusable across libraries

vs. Per-Request Connections (current requests impl)#

Current requests library creates new connection per request:

  • Overhead: 3-way handshake + TLS handshake every request
  • Conpool: Reuse connections, 5-10x speedup

vs. Global Connection Pool#

Some libraries use global singleton pools:

  • Inflexible: Can't configure per-client
  • Conpool: Pool per Conpool.t instance, fine-grained control

Future Enhancements#

  1. HTTP/2 & HTTP/3 Support

    • Track streams per connection
    • Multiplexing-aware pooling
  2. Metrics Export

    • Prometheus exporter
    • OpenTelemetry integration
  3. Advanced Health Checks

    • Periodic background health checks
    • Adaptive health check frequency
  4. Connection Affinity

    • Sticky connections for stateful protocols
    • Session-aware pooling
  5. Proxy Support

    • HTTP CONNECT tunneling
    • SOCKS5 proxy
  6. Unix Domain Sockets

    • Pool UDS connections (e.g., local Redis, PostgreSQL)

Summary#

Conpool provides:

  • Protocol-agnostic TCP/IP connection pooling
  • Eio.Pool foundation for resource management
  • Per-endpoint isolation and limits
  • Connection validation (age, idle, health checks)
  • Cancel-safe operations with Cancel.protect
  • Rich statistics and monitoring
  • Drop-in integration with requests, redis-eio, etc.

This design separates TCP pooling from protocol logic, making it a reusable foundation for any TCP-based Eio library.