Conpool - TCP/IP Connection Pool Library for Eio#
Overview#
Conpool is a protocol-agnostic TCP/IP connection pooling library built on Eio.Pool. It manages connection lifecycles, validates connection health, and provides per-endpoint resource limiting for any TCP-based protocol (HTTP, Redis, PostgreSQL, etc.).
Design Philosophy#
Separation of Concerns#
- Conpool: Manages TCP sockets and their lifecycle (connect, validate, close)
- Protocol Libraries (requests, redis-eio, etc.): Handle protocol-level logic (HTTP requests, Redis commands)
Key Principles#
- Protocol Agnostic: Works with any TCP-based protocol
- Eio.Pool Foundation: Leverages Eio.Pool for resource management and limits
- Per-Endpoint Pooling: Separate pool per (host, port, tls) tuple
- Connection Validation: Health checks, age limits, idle timeouts
- Structured Concurrency: All resources bound to switches
- Cancel-Safe: Critical operations protected with
Cancel.protect - Observable: Rich statistics and monitoring hooks
Architecture#
Core Types#
(** conpool.mli *)
type endpoint = {
host : string;
port : int;
tls : tls_config option;
}
and tls_config = {
config : Tls.Config.client;
servername : string option;
}
type connection
(** Opaque connection handle with metadata *)
type t
(** Connection pool managing multiple endpoints *)
type config = {
max_connections_per_endpoint : int;
(** Maximum connections per (host, port, tls) endpoint. Default: 10 *)
max_idle_time : float;
(** Maximum time (seconds) a connection can be idle before closure. Default: 60.0 *)
max_connection_lifetime : float;
(** Maximum lifetime (seconds) of any connection. Default: 300.0 *)
max_connection_uses : int option;
(** Maximum number of times a connection can be reused. None = unlimited. Default: None *)
health_check : (Eio.Flow.two_way -> bool) option;
(** Optional health check function. Called before reusing idle connection. Default: None *)
connect_timeout : float option;
(** Timeout for establishing new connections. Default: Some 10.0 *)
on_connection_created : (endpoint -> unit) option;
(** Hook called when new connection created. Default: None *)
on_connection_closed : (endpoint -> unit) option;
(** Hook called when connection closed. Default: None *)
on_connection_reused : (endpoint -> unit) option;
(** Hook called when connection reused from pool. Default: None *)
}
val default_config : config
(** Sensible defaults for most use cases *)
Pool Creation#
val create :
sw:Eio.Switch.t ->
net:#Eio.Net.t ->
?config:config ->
unit -> t
(** Create connection pool bound to switch.
All connections will be closed when switch is released. *)
Connection Acquisition & Release#
val with_connection :
t ->
endpoint ->
(Eio.Flow.two_way -> 'a) ->
'a
(** Acquire connection, use it, automatically release.
If idle connection available and healthy:
- Reuse from pool
Else:
- Create new connection (may block if endpoint at limit)
On success: connection returned to pool
On error: connection closed, not returned to pool
Example:
{[
Conpool.with_connection pool endpoint (fun conn ->
(* Use conn for HTTP request, Redis command, etc. *)
Eio.Flow.write conn request_bytes;
Eio.Flow.read conn response_buf
)
]}
*)
val acquire :
t ->
endpoint ->
connection
(** Manually acquire connection. Must call [release] or [close] later.
Use [with_connection] instead unless you need explicit control. *)
val release :
t ->
connection ->
unit
(** Return connection to pool. Connection must be in clean state.
If connection is unhealthy, call [close] instead. *)
val close :
t ->
connection ->
unit
(** Close connection immediately, remove from pool. *)
val get_flow :
connection ->
Eio.Flow.two_way
(** Extract underlying Eio flow from connection. *)
Connection Validation#
val is_healthy :
?check_readable:bool ->
connection ->
bool
(** Check if connection is healthy.
Validates:
- Not past max_connection_lifetime
- Not idle past max_idle_time
- Not exceeded max_connection_uses
- Optional: health_check function (from config)
- Optional: check_readable=true tests if socket still connected via 0-byte read
*)
val validate_and_release :
t ->
connection ->
unit
(** Validate connection health, then release to pool if healthy or close if not.
Equivalent to: if is_healthy conn then release pool conn else close pool conn *)
Statistics & Monitoring#
type endpoint_stats = {
active : int; (** Connections currently in use *)
idle : int; (** Connections in pool waiting to be reused *)
total_created : int; (** Total connections created (lifetime) *)
total_reused : int; (** Total times connections were reused *)
total_closed : int; (** Total connections closed *)
errors : int; (** Total connection errors *)
}
val stats :
t ->
endpoint ->
endpoint_stats
(** Get statistics for specific endpoint *)
val all_stats :
t ->
(endpoint * endpoint_stats) list
(** Get statistics for all endpoints in pool *)
val pp_stats :
Format.formatter ->
endpoint_stats ->
unit
(** Pretty-print endpoint statistics *)
Pool Management#
val close_idle_connections :
t ->
endpoint ->
unit
(** Close all idle connections for endpoint (keeps active ones) *)
val close_all_connections :
t ->
endpoint ->
unit
(** Close all connections for endpoint (blocks until active ones released) *)
val close_pool :
t ->
unit
(** Close entire pool. Blocks until all active connections released.
Automatically called when switch releases. *)
Implementation Details#
Per-Endpoint Pool Structure#
Each endpoint gets its own Eio.Pool.t managing connections to that destination:
(* Internal implementation *)
type connection_metadata = {
flow : Eio.Flow.two_way;
created_at : float;
mutable last_used : float;
mutable use_count : int;
endpoint : endpoint;
}
type connection = connection_metadata
type endpoint_pool = {
pool : connection Eio.Pool.t;
stats : endpoint_stats_mutable;
mutex : Eio.Mutex.t;
}
type t = {
sw : Eio.Switch.t;
net : Eio.Net.t;
config : config;
endpoints : (endpoint, endpoint_pool) Hashtbl.t;
endpoints_mutex : Eio.Mutex.t;
}
Connection Lifecycle with Eio.Pool#
let with_connection pool endpoint f =
(* Get or create endpoint pool *)
let ep_pool = get_or_create_endpoint_pool pool endpoint in
(* Use Eio.Pool.use for resource management *)
Eio.Pool.use ep_pool.pool (fun conn ->
(* Connection acquired from pool or newly created *)
(* Validate before use *)
if not (is_healthy ~check_readable:true conn) then (
(* Connection unhealthy, close and create new one *)
close_internal pool conn;
let new_conn = create_connection pool endpoint in
(* Use new connection with failure boundary *)
match f new_conn.flow with
| result ->
(* Success - update metadata and return to pool *)
conn.last_used <- Unix.gettimeofday ();
conn.use_count <- conn.use_count + 1;
result
| exception e ->
(* Error - close connection, don't return to pool *)
close_internal pool new_conn;
raise e
) else (
(* Connection healthy, use it *)
match f conn.flow with
| result ->
(* Success - update metadata and return to pool *)
conn.last_used <- Unix.gettimeofday ();
conn.use_count <- conn.use_count + 1;
result
| exception e ->
(* Error - close connection, don't return to pool *)
close_internal pool conn;
raise e
)
)
Eio.Pool Integration#
let create_endpoint_pool pool endpoint =
(* Eio.Pool.create manages resource limits *)
let eio_pool = Eio.Pool.create
pool.config.max_connections_per_endpoint
~validate:(fun conn ->
(* Called before reusing from pool *)
is_healthy ~check_readable:false conn
)
~dispose:(fun conn ->
(* Called when removing from pool *)
Eio.Cancel.protect (fun () ->
close_internal pool conn
)
)
(fun () ->
(* Factory: create new connection *)
create_connection pool endpoint
)
in
{
pool = eio_pool;
stats = create_stats ();
mutex = Eio.Mutex.create ();
}
Connection Creation with Timeout & TLS#
let create_connection pool endpoint =
(* Track stats *)
Eio.Mutex.use_rw pool.endpoints_mutex (fun () ->
let ep_pool = Hashtbl.find pool.endpoints endpoint in
ep_pool.stats.total_created <- ep_pool.stats.total_created + 1
);
(* Call hook if configured *)
Option.iter (fun f -> f endpoint) pool.config.on_connection_created;
(* Connect with optional timeout *)
let connect_with_timeout () =
let addr = `Tcp (
Eio.Net.Ipaddr.V4.loopback, (* TODO: resolve hostname *)
endpoint.port
) in
match pool.config.connect_timeout with
| Some timeout ->
Eio.Time.with_timeout_exn pool.clock timeout (fun () ->
Eio.Net.connect ~sw:pool.sw pool.net addr
)
| None ->
Eio.Net.connect ~sw:pool.sw pool.net addr
in
let flow = connect_with_timeout () in
(* Wrap with TLS if configured *)
let flow = match endpoint.tls with
| None -> flow
| Some tls_cfg ->
let host = match tls_cfg.servername with
| Some name -> Domain_name.(host_exn (of_string_exn name))
| None -> Domain_name.(host_exn (of_string_exn endpoint.host))
in
Tls_eio.client_of_flow ~host tls_cfg.config flow
in
{
flow;
created_at = Unix.gettimeofday ();
last_used = Unix.gettimeofday ();
use_count = 0;
endpoint;
}
Connection Validation#
let is_healthy ?(check_readable = false) conn =
let now = Unix.gettimeofday () in
let config = (* get config from pool *) in
(* Check age *)
let age = now -. conn.created_at in
if age > config.max_connection_lifetime then
false
(* Check idle time *)
else if (now -. conn.last_used) > config.max_idle_time then
false
(* Check use count *)
else if (match config.max_connection_uses with
| Some max -> conn.use_count >= max
| None -> false) then
false
(* Optional: custom health check *)
else if (match config.health_check with
| Some check -> not (check conn.flow)
| None -> false) then
false
(* Optional: check if socket still connected *)
else if check_readable then
try
(* Try zero-byte read - if socket closed, will raise *)
let buf = Cstruct.create 0 in
let _ = Eio.Flow.single_read conn.flow buf in
true
with
| End_of_file -> false
| _ -> false
else
true
Graceful Shutdown with Cancel.protect#
let close_pool pool =
Eio.Cancel.protect (fun () ->
(* Close all endpoint pools *)
Hashtbl.iter (fun endpoint ep_pool ->
(* Eio.Pool.dispose will call our dispose function for each connection *)
Eio.Pool.dispose ep_pool.pool
) pool.endpoints;
Hashtbl.clear pool.endpoints
)
(* Register cleanup with switch *)
let create ~sw ~net ?(config = default_config) () =
let pool = {
sw;
net;
config;
endpoints = Hashtbl.create 16;
endpoints_mutex = Eio.Mutex.create ();
} in
(* Auto-cleanup on switch release *)
Eio.Switch.on_release sw (fun () ->
close_pool pool
);
pool
Usage Examples#
Example 1: HTTP Client Connection Pooling#
open Eio.Std
let () =
Eio_main.run @@ fun env ->
Switch.run @@ fun sw ->
(* Create connection pool *)
let pool = Conpool.create ~sw ~net:env#net () in
(* Define endpoint *)
let endpoint = Conpool.{
host = "example.com";
port = 443;
tls = Some {
config = my_tls_config;
servername = Some "example.com";
};
} in
(* Make 100 requests - connections will be reused *)
for i = 1 to 100 do
Conpool.with_connection pool endpoint (fun conn ->
(* Send HTTP request *)
let request = Printf.sprintf
"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n" in
Eio.Flow.write conn [Cstruct.of_string request];
(* Read response *)
let buf = Cstruct.create 4096 in
let n = Eio.Flow.single_read conn buf in
Printf.printf "Response %d: %d bytes\n" i n
)
done;
(* Print statistics *)
let stats = Conpool.stats pool endpoint in
Printf.printf "Created: %d, Reused: %d\n"
stats.total_created stats.total_reused
Example 2: Redis Client with Health Checks#
let redis_health_check flow =
(* Send PING command *)
try
Eio.Flow.write flow [Cstruct.of_string "PING\r\n"];
let buf = Cstruct.create 7 in
let n = Eio.Flow.single_read flow buf in
(* Check for "+PONG\r\n" response *)
n = 7 && Cstruct.to_string buf = "+PONG\r\n"
with
| _ -> false
let () =
Eio_main.run @@ fun env ->
Switch.run @@ fun sw ->
let config = Conpool.{
default_config with
health_check = Some redis_health_check;
max_idle_time = 30.0; (* Redis connections timeout quickly *)
max_connections_per_endpoint = 50;
} in
let pool = Conpool.create ~sw ~net:env#net ~config () in
let redis_endpoint = Conpool.{
host = "localhost";
port = 6379;
tls = None;
} in
(* Connection automatically validated with PING before reuse *)
Conpool.with_connection pool redis_endpoint (fun conn ->
Eio.Flow.write conn [Cstruct.of_string "GET mykey\r\n"];
(* ... read response ... *)
)
Example 3: PostgreSQL with Connection Limits#
let () =
Eio_main.run @@ fun env ->
Switch.run @@ fun sw ->
let config = Conpool.{
default_config with
max_connections_per_endpoint = 20; (* PostgreSQL default max_connections *)
max_connection_lifetime = 3600.0; (* 1 hour max lifetime *)
max_connection_uses = Some 1000; (* Recycle after 1000 queries *)
} in
let pool = Conpool.create ~sw ~net:env#net ~config () in
let db_endpoint = Conpool.{
host = "db.example.com";
port = 5432;
tls = Some { config = tls_config; servername = None };
} in
(* 100 concurrent queries - limited to 20 connections *)
Eio.Fiber.all (List.init 100 (fun i ->
(fun () ->
Conpool.with_connection pool db_endpoint (fun conn ->
(* Execute query on conn *)
Printf.printf "Query %d\n" i
)
)
))
Example 4: Manual Connection Management#
(* Advanced: manual acquire/release for transactions *)
let with_transaction pool endpoint f =
let conn = Conpool.acquire pool endpoint in
try
(* Begin transaction *)
Eio.Flow.write (Conpool.get_flow conn)
[Cstruct.of_string "BEGIN\r\n"];
(* Execute user code *)
let result = f (Conpool.get_flow conn) in
(* Commit *)
Eio.Flow.write (Conpool.get_flow conn)
[Cstruct.of_string "COMMIT\r\n"];
(* Return connection to pool *)
Conpool.release pool conn;
result
with e ->
(* Rollback on error *)
Eio.Flow.write (Conpool.get_flow conn)
[Cstruct.of_string "ROLLBACK\r\n"];
(* Connection still usable, return to pool *)
Conpool.release pool conn;
raise e
Integration with Requests Library#
The requests library will use Conpool for TCP connection management:
(* requests/lib/one.ml *)
type ('a,'b) t = {
clock : 'a;
net : 'b;
default_headers : Headers.t;
timeout : Timeout.t;
(* ... other fields ... *)
connection_pool : Conpool.t; (* NEW *)
}
let request ~sw ?client ~method_ url =
let client = get_client client in
(* Parse URL to endpoint *)
let uri = Uri.of_string url in
let endpoint = uri_to_conpool_endpoint uri client.tls_config in
(* Acquire connection from pool *)
Conpool.with_connection client.connection_pool endpoint (fun tcp_conn ->
(* Create cohttp client wrapping this connection *)
let cohttp_client = make_cohttp_client_from_flow tcp_conn in
(* Make HTTP request over pooled connection *)
let resp, resp_body =
Cohttp_eio.Client.call ~sw cohttp_client method_ uri
~headers ?body
in
(* Must fully drain body before with_connection returns *)
(* Otherwise connection in inconsistent state *)
Eio.Cancel.protect (fun () ->
let body_data = Eio.Buf_read.parse_exn take_all resp_body
~max_size:max_int in
(* Connection auto-returned to pool when with_connection exits *)
Response.make ~sw ~status ~headers
~body:(string_source body_data) ~url ~elapsed
)
)
Advanced Features#
DNS Caching & Resolution#
Conpool will cache DNS lookups per hostname:
type t = {
(* ... existing fields ... *)
dns_cache : (string, Eio.Net.Sockaddr.t list) Hashtbl.t;
dns_cache_mutex : Eio.Mutex.t;
dns_cache_ttl : float; (* Default: 300.0 seconds *)
}
let resolve_hostname pool host port =
(* Check cache first *)
(* Fall back to Eio.Net.getaddrinfo_stream *)
(* Cache result with TTL *)
Connection Warming#
Pre-establish connections to reduce first-request latency:
val warm_endpoint :
t ->
endpoint ->
count:int ->
unit
(** Pre-create [count] connections to endpoint and add to pool *)
Circuit Breaker Integration#
type circuit_breaker_state =
| Closed (* Normal operation *)
| Open (* Failing, reject requests *)
| HalfOpen (* Testing if recovered *)
val with_circuit_breaker :
t ->
endpoint ->
failure_threshold:int ->
timeout:float ->
(Eio.Flow.two_way -> 'a) ->
'a
(** Wrap with_connection with circuit breaker pattern *)
Testing Strategy#
Unit Tests#
-
Connection lifecycle:
- Create → Use → Release → Reuse
- Create → Use → Error → Close (not reused)
-
Validation:
- Age expiration
- Idle timeout
- Use count limit
- Health check failure
-
Concurrency:
- Multiple fibers acquiring from same endpoint
- Limit enforcement (blocks at max_connections_per_endpoint)
- Thread-safety of pool operations
-
Cancel safety:
- Cancel during connection creation
- Cancel during use
- Cancel during release
- Pool cleanup on switch release
Integration Tests#
-
Real TCP servers:
- HTTP server with keep-alive
- Echo server for connection reuse
- Server that closes connections to test validation
-
Performance:
- Connection reuse speedup (10x for 100 requests)
- Concurrent request handling
- Pool contention under load
File Structure#
conpool/
├── CLAUDE.md (this file)
├── dune-project
├── conpool.opam
├── lib/
│ ├── dune
│ ├── conpool.ml
│ ├── conpool.mli
│ └── conpool_stats.ml (statistics tracking)
├── test/
│ ├── dune
│ ├── test_lifecycle.ml
│ ├── test_validation.ml
│ ├── test_concurrency.ml
│ └── test_integration.ml
└── examples/
├── http_client.ml
├── redis_client.ml
└── postgres_client.ml
Dependencies#
eio >= 1.0
tls-eio >= 1.0
Comparison with Other Approaches#
vs. cohttp-lwt Connection_cache#
cohttp-lwt has Connection_cache module for HTTP connection pooling:
- Limited: HTTP-specific, tied to cohttp
- Conpool: Protocol-agnostic, reusable across libraries
vs. Per-Request Connections (current requests impl)#
Current requests library creates new connection per request:
- Overhead: 3-way handshake + TLS handshake every request
- Conpool: Reuse connections, 5-10x speedup
vs. Global Connection Pool#
Some libraries use global singleton pools:
- Inflexible: Can't configure per-client
- Conpool: Pool per Conpool.t instance, fine-grained control
Future Enhancements#
-
HTTP/2 & HTTP/3 Support
- Track streams per connection
- Multiplexing-aware pooling
-
Metrics Export
- Prometheus exporter
- OpenTelemetry integration
-
Advanced Health Checks
- Periodic background health checks
- Adaptive health check frequency
-
Connection Affinity
- Sticky connections for stateful protocols
- Session-aware pooling
-
Proxy Support
- HTTP CONNECT tunneling
- SOCKS5 proxy
-
Unix Domain Sockets
- Pool UDS connections (e.g., local Redis, PostgreSQL)
Summary#
Conpool provides:
- ✅ Protocol-agnostic TCP/IP connection pooling
- ✅ Eio.Pool foundation for resource management
- ✅ Per-endpoint isolation and limits
- ✅ Connection validation (age, idle, health checks)
- ✅ Cancel-safe operations with
Cancel.protect - ✅ Rich statistics and monitoring
- ✅ Drop-in integration with requests, redis-eio, etc.
This design separates TCP pooling from protocol logic, making it a reusable foundation for any TCP-based Eio library.