···
+
# Conpool - TCP/IP Connection Pool Library for Eio
+
**Conpool** is a protocol-agnostic TCP/IP connection pooling library built on Eio.Pool. It manages connection lifecycles, validates connection health, and provides per-endpoint resource limiting for any TCP-based protocol (HTTP, Redis, PostgreSQL, etc.).
+
### Separation of Concerns
+
- **Conpool**: Manages TCP sockets and their lifecycle (connect, validate, close)
+
- **Protocol Libraries** (requests, redis-eio, etc.): Handle protocol-level logic (HTTP requests, Redis commands)
+
1. **Protocol Agnostic**: Works with any TCP-based protocol
+
2. **Eio.Pool Foundation**: Leverages Eio.Pool for resource management and limits
+
3. **Per-Endpoint Pooling**: Separate pool per (host, port, tls) tuple
+
4. **Connection Validation**: Health checks, age limits, idle timeouts
+
5. **Structured Concurrency**: All resources bound to switches
+
6. **Cancel-Safe**: Critical operations protected with `Cancel.protect`
+
7. **Observable**: Rich statistics and monitoring hooks
+
tls : tls_config option;
+
config : Tls.Config.client;
+
servername : string option;
+
(** Opaque connection handle with metadata *)
+
(** Connection pool managing multiple endpoints *)
+
max_connections_per_endpoint : int;
+
(** Maximum connections per (host, port, tls) endpoint. Default: 10 *)
+
(** Maximum time (seconds) a connection can be idle before closure. Default: 60.0 *)
+
max_connection_lifetime : float;
+
(** Maximum lifetime (seconds) of any connection. Default: 300.0 *)
+
max_connection_uses : int option;
+
(** Maximum number of times a connection can be reused. None = unlimited. Default: None *)
+
health_check : (Eio.Flow.two_way -> bool) option;
+
(** Optional health check function. Called before reusing idle connection. Default: None *)
+
connect_timeout : float option;
+
(** Timeout for establishing new connections. Default: Some 10.0 *)
+
on_connection_created : (endpoint -> unit) option;
+
(** Hook called when new connection created. Default: None *)
+
on_connection_closed : (endpoint -> unit) option;
+
(** Hook called when connection closed. Default: None *)
+
on_connection_reused : (endpoint -> unit) option;
+
(** Hook called when connection reused from pool. Default: None *)
+
val default_config : config
+
(** Sensible defaults for most use cases *)
+
(** Create connection pool bound to switch.
+
All connections will be closed when switch is released. *)
+
### Connection Acquisition & Release
+
(Eio.Flow.two_way -> 'a) ->
+
(** Acquire connection, use it, automatically release.
+
If idle connection available and healthy:
+
- Create new connection (may block if endpoint at limit)
+
On success: connection returned to pool
+
On error: connection closed, not returned to pool
+
Conpool.with_connection pool endpoint (fun conn ->
+
(* Use conn for HTTP request, Redis command, etc. *)
+
Eio.Flow.write conn request_bytes;
+
Eio.Flow.read conn response_buf
+
(** Manually acquire connection. Must call [release] or [close] later.
+
Use [with_connection] instead unless you need explicit control. *)
+
(** Return connection to pool. Connection must be in clean state.
+
If connection is unhealthy, call [close] instead. *)
+
(** Close connection immediately, remove from pool. *)
+
(** Extract underlying Eio flow from connection. *)
+
### Connection Validation
+
?check_readable:bool ->
+
(** Check if connection is healthy.
+
- Not past max_connection_lifetime
+
- Not idle past max_idle_time
+
- Not exceeded max_connection_uses
+
- Optional: health_check function (from config)
+
- Optional: check_readable=true tests if socket still connected via 0-byte read
+
val validate_and_release :
+
(** Validate connection health, then release to pool if healthy or close if not.
+
Equivalent to: if is_healthy conn then release pool conn else close pool conn *)
+
### Statistics & Monitoring
+
type endpoint_stats = {
+
active : int; (** Connections currently in use *)
+
idle : int; (** Connections in pool waiting to be reused *)
+
total_created : int; (** Total connections created (lifetime) *)
+
total_reused : int; (** Total times connections were reused *)
+
total_closed : int; (** Total connections closed *)
+
errors : int; (** Total connection errors *)
+
(** Get statistics for specific endpoint *)
+
(endpoint * endpoint_stats) list
+
(** Get statistics for all endpoints in pool *)
+
(** Pretty-print endpoint statistics *)
+
val close_idle_connections :
+
(** Close all idle connections for endpoint (keeps active ones) *)
+
val close_all_connections :
+
(** Close all connections for endpoint (blocks until active ones released) *)
+
(** Close entire pool. Blocks until all active connections released.
+
Automatically called when switch releases. *)
+
## Implementation Details
+
### Per-Endpoint Pool Structure
+
Each endpoint gets its own `Eio.Pool.t` managing connections to that destination:
+
(* Internal implementation *)
+
type connection_metadata = {
+
flow : Eio.Flow.two_way;
+
mutable last_used : float;
+
mutable use_count : int;
+
type connection = connection_metadata
+
pool : connection Eio.Pool.t;
+
stats : endpoint_stats_mutable;
+
endpoints : (endpoint, endpoint_pool) Hashtbl.t;
+
endpoints_mutex : Eio.Mutex.t;
+
### Connection Lifecycle with Eio.Pool
+
let with_connection pool endpoint f =
+
(* Get or create endpoint pool *)
+
let ep_pool = get_or_create_endpoint_pool pool endpoint in
+
(* Use Eio.Pool.use for resource management *)
+
Eio.Pool.use ep_pool.pool (fun conn ->
+
(* Connection acquired from pool or newly created *)
+
(* Validate before use *)
+
if not (is_healthy ~check_readable:true conn) then (
+
(* Connection unhealthy, close and create new one *)
+
close_internal pool conn;
+
let new_conn = create_connection pool endpoint in
+
(* Use new connection with failure boundary *)
+
match f new_conn.flow with
+
(* Success - update metadata and return to pool *)
+
conn.last_used <- Unix.gettimeofday ();
+
conn.use_count <- conn.use_count + 1;
+
(* Error - close connection, don't return to pool *)
+
close_internal pool new_conn;
+
(* Connection healthy, use it *)
+
(* Success - update metadata and return to pool *)
+
conn.last_used <- Unix.gettimeofday ();
+
conn.use_count <- conn.use_count + 1;
+
(* Error - close connection, don't return to pool *)
+
close_internal pool conn;
+
### Eio.Pool Integration
+
let create_endpoint_pool pool endpoint =
+
(* Eio.Pool.create manages resource limits *)
+
let eio_pool = Eio.Pool.create
+
pool.config.max_connections_per_endpoint
+
(* Called before reusing from pool *)
+
is_healthy ~check_readable:false conn
+
(* Called when removing from pool *)
+
Eio.Cancel.protect (fun () ->
+
close_internal pool conn
+
(* Factory: create new connection *)
+
create_connection pool endpoint
+
stats = create_stats ();
+
mutex = Eio.Mutex.create ();
+
### Connection Creation with Timeout & TLS
+
let create_connection pool endpoint =
+
Eio.Mutex.use_rw pool.endpoints_mutex (fun () ->
+
let ep_pool = Hashtbl.find pool.endpoints endpoint in
+
ep_pool.stats.total_created <- ep_pool.stats.total_created + 1
+
(* Call hook if configured *)
+
Option.iter (fun f -> f endpoint) pool.config.on_connection_created;
+
(* Connect with optional timeout *)
+
let connect_with_timeout () =
+
Eio.Net.Ipaddr.V4.loopback, (* TODO: resolve hostname *)
+
match pool.config.connect_timeout with
+
Eio.Time.with_timeout_exn pool.clock timeout (fun () ->
+
Eio.Net.connect ~sw:pool.sw pool.net addr
+
Eio.Net.connect ~sw:pool.sw pool.net addr
+
let flow = connect_with_timeout () in
+
(* Wrap with TLS if configured *)
+
let flow = match endpoint.tls with
+
let host = match tls_cfg.servername with
+
| Some name -> Domain_name.(host_exn (of_string_exn name))
+
| None -> Domain_name.(host_exn (of_string_exn endpoint.host))
+
Tls_eio.client_of_flow ~host tls_cfg.config flow
+
created_at = Unix.gettimeofday ();
+
last_used = Unix.gettimeofday ();
+
### Connection Validation
+
let is_healthy ?(check_readable = false) conn =
+
let now = Unix.gettimeofday () in
+
let config = (* get config from pool *) in
+
let age = now -. conn.created_at in
+
if age > config.max_connection_lifetime then
+
else if (now -. conn.last_used) > config.max_idle_time then
+
else if (match config.max_connection_uses with
+
| Some max -> conn.use_count >= max
+
(* Optional: custom health check *)
+
else if (match config.health_check with
+
| Some check -> not (check conn.flow)
+
(* Optional: check if socket still connected *)
+
else if check_readable then
+
(* Try zero-byte read - if socket closed, will raise *)
+
let buf = Cstruct.create 0 in
+
let _ = Eio.Flow.single_read conn.flow buf in
+
### Graceful Shutdown with Cancel.protect
+
Eio.Cancel.protect (fun () ->
+
(* Close all endpoint pools *)
+
Hashtbl.iter (fun endpoint ep_pool ->
+
(* Eio.Pool.dispose will call our dispose function for each connection *)
+
Eio.Pool.dispose ep_pool.pool
+
Hashtbl.clear pool.endpoints
+
(* Register cleanup with switch *)
+
let create ~sw ~net ?(config = default_config) () =
+
endpoints = Hashtbl.create 16;
+
endpoints_mutex = Eio.Mutex.create ();
+
(* Auto-cleanup on switch release *)
+
Eio.Switch.on_release sw (fun () ->
+
### Example 1: HTTP Client Connection Pooling
+
Eio_main.run @@ fun env ->
+
Switch.run @@ fun sw ->
+
(* Create connection pool *)
+
let pool = Conpool.create ~sw ~net:env#net () in
+
let endpoint = Conpool.{
+
config = my_tls_config;
+
servername = Some "example.com";
+
(* Make 100 requests - connections will be reused *)
+
Conpool.with_connection pool endpoint (fun conn ->
+
(* Send HTTP request *)
+
let request = Printf.sprintf
+
"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n" in
+
Eio.Flow.write conn [Cstruct.of_string request];
+
let buf = Cstruct.create 4096 in
+
let n = Eio.Flow.single_read conn buf in
+
Printf.printf "Response %d: %d bytes\n" i n
+
let stats = Conpool.stats pool endpoint in
+
Printf.printf "Created: %d, Reused: %d\n"
+
stats.total_created stats.total_reused
+
### Example 2: Redis Client with Health Checks
+
let redis_health_check flow =
+
(* Send PING command *)
+
Eio.Flow.write flow [Cstruct.of_string "PING\r\n"];
+
let buf = Cstruct.create 7 in
+
let n = Eio.Flow.single_read flow buf in
+
(* Check for "+PONG\r\n" response *)
+
n = 7 && Cstruct.to_string buf = "+PONG\r\n"
+
Eio_main.run @@ fun env ->
+
Switch.run @@ fun sw ->
+
health_check = Some redis_health_check;
+
max_idle_time = 30.0; (* Redis connections timeout quickly *)
+
max_connections_per_endpoint = 50;
+
let pool = Conpool.create ~sw ~net:env#net ~config () in
+
let redis_endpoint = Conpool.{
+
(* Connection automatically validated with PING before reuse *)
+
Conpool.with_connection pool redis_endpoint (fun conn ->
+
Eio.Flow.write conn [Cstruct.of_string "GET mykey\r\n"];
+
(* ... read response ... *)
+
### Example 3: PostgreSQL with Connection Limits
+
Eio_main.run @@ fun env ->
+
Switch.run @@ fun sw ->
+
max_connections_per_endpoint = 20; (* PostgreSQL default max_connections *)
+
max_connection_lifetime = 3600.0; (* 1 hour max lifetime *)
+
max_connection_uses = Some 1000; (* Recycle after 1000 queries *)
+
let pool = Conpool.create ~sw ~net:env#net ~config () in
+
let db_endpoint = Conpool.{
+
host = "db.example.com";
+
tls = Some { config = tls_config; servername = None };
+
(* 100 concurrent queries - limited to 20 connections *)
+
Eio.Fiber.all (List.init 100 (fun i ->
+
Conpool.with_connection pool db_endpoint (fun conn ->
+
(* Execute query on conn *)
+
Printf.printf "Query %d\n" i
+
### Example 4: Manual Connection Management
+
(* Advanced: manual acquire/release for transactions *)
+
let with_transaction pool endpoint f =
+
let conn = Conpool.acquire pool endpoint in
+
(* Begin transaction *)
+
Eio.Flow.write (Conpool.get_flow conn)
+
[Cstruct.of_string "BEGIN\r\n"];
+
(* Execute user code *)
+
let result = f (Conpool.get_flow conn) in
+
Eio.Flow.write (Conpool.get_flow conn)
+
[Cstruct.of_string "COMMIT\r\n"];
+
(* Return connection to pool *)
+
Conpool.release pool conn;
+
(* Rollback on error *)
+
Eio.Flow.write (Conpool.get_flow conn)
+
[Cstruct.of_string "ROLLBACK\r\n"];
+
(* Connection still usable, return to pool *)
+
Conpool.release pool conn;
+
## Integration with Requests Library
+
The requests library will use Conpool for TCP connection management:
+
(* requests/lib/one.ml *)
+
default_headers : Headers.t;
+
(* ... other fields ... *)
+
connection_pool : Conpool.t; (* NEW *)
+
let request ~sw ?client ~method_ url =
+
let client = get_client client in
+
(* Parse URL to endpoint *)
+
let uri = Uri.of_string url in
+
let endpoint = uri_to_conpool_endpoint uri client.tls_config in
+
(* Acquire connection from pool *)
+
Conpool.with_connection client.connection_pool endpoint (fun tcp_conn ->
+
(* Create cohttp client wrapping this connection *)
+
let cohttp_client = make_cohttp_client_from_flow tcp_conn in
+
(* Make HTTP request over pooled connection *)
+
Cohttp_eio.Client.call ~sw cohttp_client method_ uri
+
(* Must fully drain body before with_connection returns *)
+
(* Otherwise connection in inconsistent state *)
+
Eio.Cancel.protect (fun () ->
+
let body_data = Eio.Buf_read.parse_exn take_all resp_body
+
(* Connection auto-returned to pool when with_connection exits *)
+
Response.make ~sw ~status ~headers
+
~body:(string_source body_data) ~url ~elapsed
+
### DNS Caching & Resolution
+
Conpool will cache DNS lookups per hostname:
+
(* ... existing fields ... *)
+
dns_cache : (string, Eio.Net.Sockaddr.t list) Hashtbl.t;
+
dns_cache_mutex : Eio.Mutex.t;
+
dns_cache_ttl : float; (* Default: 300.0 seconds *)
+
let resolve_hostname pool host port =
+
(* Check cache first *)
+
(* Fall back to Eio.Net.getaddrinfo_stream *)
+
(* Cache result with TTL *)
+
Pre-establish connections to reduce first-request latency:
+
(** Pre-create [count] connections to endpoint and add to pool *)
+
### Circuit Breaker Integration
+
type circuit_breaker_state =
+
| Closed (* Normal operation *)
+
| Open (* Failing, reject requests *)
+
| HalfOpen (* Testing if recovered *)
+
val with_circuit_breaker :
+
failure_threshold:int ->
+
(Eio.Flow.two_way -> 'a) ->
+
(** Wrap with_connection with circuit breaker pattern *)
+
1. **Connection lifecycle:**
+
- Create → Use → Release → Reuse
+
- Create → Use → Error → Close (not reused)
+
- Multiple fibers acquiring from same endpoint
+
- Limit enforcement (blocks at max_connections_per_endpoint)
+
- Thread-safety of pool operations
+
- Cancel during connection creation
+
- Cancel during release
+
- Pool cleanup on switch release
+
1. **Real TCP servers:**
+
- HTTP server with keep-alive
+
- Echo server for connection reuse
+
- Server that closes connections to test validation
+
- Connection reuse speedup (10x for 100 requests)
+
- Concurrent request handling
+
- Pool contention under load
+
├── CLAUDE.md (this file)
+
│ └── conpool_stats.ml (statistics tracking)
+
│ ├── test_lifecycle.ml
+
│ ├── test_validation.ml
+
│ ├── test_concurrency.ml
+
│ └── test_integration.ml
+
## Comparison with Other Approaches
+
### vs. cohttp-lwt Connection_cache
+
cohttp-lwt has `Connection_cache` module for HTTP connection pooling:
+
- **Limited:** HTTP-specific, tied to cohttp
+
- **Conpool:** Protocol-agnostic, reusable across libraries
+
### vs. Per-Request Connections (current requests impl)
+
Current requests library creates new connection per request:
+
- **Overhead:** 3-way handshake + TLS handshake every request
+
- **Conpool:** Reuse connections, 5-10x speedup
+
### vs. Global Connection Pool
+
Some libraries use global singleton pools:
+
- **Inflexible:** Can't configure per-client
+
- **Conpool:** Pool per Conpool.t instance, fine-grained control
+
1. **HTTP/2 & HTTP/3 Support**
+
- Track streams per connection
+
- Multiplexing-aware pooling
+
- OpenTelemetry integration
+
3. **Advanced Health Checks**
+
- Periodic background health checks
+
- Adaptive health check frequency
+
4. **Connection Affinity**
+
- Sticky connections for stateful protocols
+
- Session-aware pooling
+
- HTTP CONNECT tunneling
+
6. **Unix Domain Sockets**
+
- Pool UDS connections (e.g., local Redis, PostgreSQL)
+
- ✅ **Protocol-agnostic** TCP/IP connection pooling
+
- ✅ **Eio.Pool foundation** for resource management
+
- ✅ **Per-endpoint** isolation and limits
+
- ✅ **Connection validation** (age, idle, health checks)
+
- ✅ **Cancel-safe** operations with `Cancel.protect`
+
- ✅ **Rich statistics** and monitoring
+
- ✅ **Drop-in integration** with requests, redis-eio, etc.
+
This design separates TCP pooling from protocol logic, making it a reusable foundation for any TCP-based Eio library.