TCP/TLS connection pooling for Eio
1(*---------------------------------------------------------------------------
2 Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
3 SPDX-License-Identifier: ISC
4 ---------------------------------------------------------------------------*)
5
6(** Conpool - Protocol-agnostic TCP/IP connection pooling library for Eio *)
7
8let src = Logs.Src.create "conpool" ~doc:"Connection pooling library"
9
10module Log = (val Logs.src_log src : Logs.LOG)
11
12(* Re-export submodules *)
13module Endpoint = Endpoint
14module Config = Config
15module Stats = Stats
16module Cmd = Cmd
17
18(** {1 Error Types} *)
19
20type error =
21 | Dns_resolution_failed of { hostname : string }
22 | Connection_failed of {
23 endpoint : Endpoint.t;
24 attempts : int;
25 last_error : string;
26 }
27 | Connection_timeout of { endpoint : Endpoint.t; timeout : float }
28 | Invalid_config of string
29 | Invalid_endpoint of string
30
31let pp_error ppf = function
32 | Dns_resolution_failed { hostname } ->
33 Fmt.pf ppf "DNS resolution failed for hostname: %s" hostname
34 | Connection_failed { endpoint; attempts; last_error } ->
35 Fmt.pf ppf "Failed to connect to %a after %d attempts: %s" Endpoint.pp
36 endpoint attempts last_error
37 | Connection_timeout { endpoint; timeout } ->
38 Fmt.pf ppf "Connection timeout to %a after %.2fs" Endpoint.pp endpoint
39 timeout
40 | Invalid_config msg -> Fmt.pf ppf "Invalid configuration: %s" msg
41 | Invalid_endpoint msg -> Fmt.pf ppf "Invalid endpoint: %s" msg
42
43type Eio.Exn.err += E of error
44
45let err e = Eio.Exn.create (E e)
46
47let () =
48 Eio.Exn.register_pp (fun f -> function
49 | E e ->
50 Fmt.string f "Conpool ";
51 pp_error f e;
52 true
53 | _ -> false)
54
55(** {1 Connection Types} *)
56
57type connection_ty = [Eio.Resource.close_ty | Eio.Flow.two_way_ty]
58type connection = connection_ty Eio.Resource.t
59
60type endp_stats = {
61 mutable active : int;
62 mutable idle : int;
63 mutable total_created : int;
64 mutable total_reused : int;
65 mutable total_closed : int;
66 mutable errors : int;
67}
68
69type endpoint_pool = {
70 pool : Connection.t Eio.Pool.t;
71 stats : endp_stats;
72 mutex : Eio.Mutex.t;
73}
74
75type ('clock, 'net) internal = {
76 sw : Eio.Switch.t;
77 net : 'net;
78 clock : 'clock;
79 config : Config.t;
80 tls : Tls.Config.client option;
81 endpoints : (Endpoint.t, endpoint_pool) Hashtbl.t;
82 endpoints_mutex : Eio.Mutex.t;
83}
84
85type t = T : ('clock Eio.Time.clock, 'net Eio.Net.t) internal -> t
86
87module EndpointTbl = Hashtbl.Make (struct
88 type t = Endpoint.t
89
90 let equal = Endpoint.equal
91 let hash = Endpoint.hash
92end)
93
94let get_time (pool : ('clock, 'net) internal) = Eio.Time.now pool.clock
95
96let create_endp_stats () =
97 {
98 active = 0;
99 idle = 0;
100 total_created = 0;
101 total_reused = 0;
102 total_closed = 0;
103 errors = 0;
104 }
105
106let snapshot_stats (stats : endp_stats) : Stats.t =
107 Stats.make ~active:stats.active ~idle:stats.idle
108 ~total_created:stats.total_created ~total_reused:stats.total_reused
109 ~total_closed:stats.total_closed ~errors:stats.errors
110
111(** {1 DNS Resolution} *)
112
113let resolve_endpoint (pool : ('clock, 'net) internal) endpoint =
114 Log.debug (fun m -> m "Resolving %a" Endpoint.pp endpoint);
115 try
116 let addrs =
117 Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint)
118 ~service:(string_of_int (Endpoint.port endpoint))
119 in
120 match addrs with
121 | addr :: _ ->
122 Log.debug (fun m ->
123 m "Resolved %a to %a" Endpoint.pp endpoint Eio.Net.Sockaddr.pp addr);
124 addr
125 | [] ->
126 (* Raise exception with error code - context will be added when caught *)
127 raise (err (Dns_resolution_failed { hostname = Endpoint.host endpoint }))
128 with Eio.Io _ as ex ->
129 let bt = Printexc.get_raw_backtrace () in
130 Eio.Exn.reraise_with_context ex bt "resolving %a" Endpoint.pp endpoint
131
132(** {1 Connection Creation with Retry} *)
133
134let rec create_connection_with_retry (pool : ('clock, 'net) internal) endpoint
135 attempt last_error =
136 let retry_count = Config.connect_retry_count pool.config in
137 if attempt > retry_count then
138 (* Raise exception with error code - context will be added when caught *)
139 raise (err (Connection_failed { endpoint; attempts = retry_count; last_error }));
140
141 Log.debug (fun m ->
142 m "Connecting to %a (attempt %d/%d)" Endpoint.pp endpoint attempt
143 retry_count);
144
145 try
146 let addr = resolve_endpoint pool endpoint in
147
148 (* Connect with optional timeout *)
149 let socket =
150 try
151 match Config.connect_timeout pool.config with
152 | Some timeout ->
153 Eio.Time.with_timeout_exn pool.clock timeout (fun () ->
154 Eio.Net.connect ~sw:pool.sw pool.net addr)
155 | None -> Eio.Net.connect ~sw:pool.sw pool.net addr
156 with Eio.Io _ as ex ->
157 let bt = Printexc.get_raw_backtrace () in
158 Eio.Exn.reraise_with_context ex bt "connecting to %a" Endpoint.pp endpoint
159 in
160
161 Log.debug (fun m ->
162 m "TCP connection established to %a" Endpoint.pp endpoint);
163
164 let flow =
165 match pool.tls with
166 | None ->
167 (socket :> connection)
168 | Some tls_config ->
169 try
170 Log.debug (fun m ->
171 m "Initiating TLS handshake with %a" Endpoint.pp endpoint);
172 let host =
173 Domain_name.(host_exn (of_string_exn (Endpoint.host endpoint)))
174 in
175 let tls_flow = Tls_eio.client_of_flow ~host tls_config socket in
176 Log.info (fun m ->
177 m "TLS connection established to %a" Endpoint.pp endpoint);
178 (tls_flow :> connection)
179 with Eio.Io _ as ex ->
180 let bt = Printexc.get_raw_backtrace () in
181 Eio.Exn.reraise_with_context ex bt "TLS handshake with %a" Endpoint.pp endpoint
182 in
183
184 let now = get_time pool in
185 Log.info (fun m -> m "Connection created to %a" Endpoint.pp endpoint);
186 {
187 Connection.flow;
188 created_at = now;
189 last_used = now;
190 use_count = 0;
191 endpoint;
192 mutex = Eio.Mutex.create ();
193 }
194 with
195 | Eio.Time.Timeout ->
196 Log.warn (fun m ->
197 m "Connection timeout to %a (attempt %d)" Endpoint.pp endpoint attempt);
198 if attempt >= Config.connect_retry_count pool.config then
199 (* Last attempt - convert to our error type *)
200 match Config.connect_timeout pool.config with
201 | Some timeout ->
202 raise (err (Connection_timeout { endpoint; timeout }))
203 | None ->
204 raise (err (Connection_failed
205 { endpoint; attempts = attempt; last_error = "Timeout" }))
206 else begin
207 (* Retry with exponential backoff *)
208 let delay =
209 Config.connect_retry_delay pool.config
210 *. (2.0 ** float_of_int (attempt - 1))
211 in
212 Eio.Time.sleep pool.clock delay;
213 create_connection_with_retry pool endpoint (attempt + 1) "Timeout"
214 end
215 | Eio.Io _ as ex ->
216 (* Eio IO errors - retry with backoff and add context on final failure *)
217 let error_msg = Printexc.to_string ex in
218 Log.warn (fun m ->
219 m "Connection attempt %d to %a failed: %s" attempt Endpoint.pp
220 endpoint error_msg);
221 if attempt < Config.connect_retry_count pool.config then (
222 let delay =
223 Config.connect_retry_delay pool.config
224 *. (2.0 ** float_of_int (attempt - 1))
225 in
226 Eio.Time.sleep pool.clock delay;
227 create_connection_with_retry pool endpoint (attempt + 1) error_msg)
228 else
229 let bt = Printexc.get_raw_backtrace () in
230 Eio.Exn.reraise_with_context ex bt "after %d retry attempts" attempt
231
232let create_connection (pool : ('clock, 'net) internal) endpoint =
233 create_connection_with_retry pool endpoint 1 "No attempts made"
234
235(** {1 Connection Validation} *)
236
237let is_healthy (pool : ('clock, 'net) internal) ?(check_readable = false) conn =
238 let now = get_time pool in
239
240 (* Check age *)
241 let age = now -. Connection.created_at conn in
242 let max_lifetime = Config.max_connection_lifetime pool.config in
243 if age > max_lifetime then begin
244 Log.debug (fun m ->
245 m "Connection to %a unhealthy: exceeded max lifetime (%.2fs > %.2fs)"
246 Endpoint.pp (Connection.endpoint conn) age max_lifetime);
247 false
248 end
249 (* Check idle time *)
250 else begin
251 let max_idle = Config.max_idle_time pool.config in
252 if now -. Connection.last_used conn > max_idle then begin
253 let idle_time = now -. Connection.last_used conn in
254 Log.debug (fun m ->
255 m "Connection to %a unhealthy: exceeded max idle time (%.2fs > %.2fs)"
256 Endpoint.pp (Connection.endpoint conn) idle_time max_idle);
257 false
258 end (* Check use count *)
259 else if
260 match Config.max_connection_uses pool.config with
261 | Some max -> Connection.use_count conn >= max
262 | None -> false
263 then begin
264 Log.debug (fun m ->
265 m "Connection to %a unhealthy: exceeded max use count (%d)"
266 Endpoint.pp (Connection.endpoint conn)
267 (Connection.use_count conn));
268 false
269 end (* Optional: custom health check *)
270 else if
271 match Config.health_check pool.config with
272 | Some check -> (
273 try
274 let healthy = check (Connection.flow conn) in
275 if not healthy then
276 Log.debug (fun m ->
277 m "Connection to %a failed custom health check" Endpoint.pp
278 (Connection.endpoint conn));
279 not healthy
280 with e ->
281 Log.debug (fun m ->
282 m "Connection to %a health check raised exception: %s"
283 Endpoint.pp (Connection.endpoint conn) (Printexc.to_string e));
284 true (* Exception in health check = unhealthy *))
285 | None -> false
286 then false (* Optional: check if socket still connected *)
287 else if check_readable then
288 try
289 (* TODO avsm: a sockopt for this? *)
290 true
291 with _ -> false
292 else begin
293 Log.debug (fun m ->
294 m "Connection to %a is healthy (age=%.2fs, idle=%.2fs, uses=%d)"
295 Endpoint.pp (Connection.endpoint conn) age
296 (now -. Connection.last_used conn)
297 (Connection.use_count conn));
298 true
299 end
300 end
301
302(** {1 Internal Pool Operations} *)
303
304let close_internal (pool : ('clock, 'net) internal) conn =
305 Log.debug (fun m ->
306 m "Closing connection to %a (age=%.2fs, uses=%d)" Endpoint.pp
307 (Connection.endpoint conn)
308 (get_time pool -. Connection.created_at conn)
309 (Connection.use_count conn));
310
311 Eio.Cancel.protect (fun () ->
312 try Eio.Flow.close (Connection.flow conn) with _ -> ());
313
314 (* Call hook if configured *)
315 Option.iter
316 (fun f -> f (Connection.endpoint conn))
317 (Config.on_connection_closed pool.config)
318
319let get_or_create_endpoint_pool (pool : ('clock, 'net) internal) endpoint =
320 (* First try with read lock *)
321 match
322 Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
323 Hashtbl.find_opt pool.endpoints endpoint)
324 with
325 | Some ep_pool ->
326 ep_pool
327 | None ->
328 (* Need to create - use write lock *)
329 Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () ->
330 (* Check again in case another fiber created it *)
331 match Hashtbl.find_opt pool.endpoints endpoint with
332 | Some ep_pool ->
333 ep_pool
334 | None ->
335 (* Create new endpoint pool *)
336 let stats = create_endp_stats () in
337 let mutex = Eio.Mutex.create () in
338
339 Log.info (fun m ->
340 m "Creating endpoint pool for %a (max_connections=%d)"
341 Endpoint.pp endpoint
342 (Config.max_connections_per_endpoint pool.config));
343
344 let eio_pool =
345 Eio.Pool.create
346 (Config.max_connections_per_endpoint pool.config)
347 ~validate:(fun conn ->
348 let healthy = is_healthy pool ~check_readable:false conn in
349 if healthy then (
350 (* Update stats for reuse *)
351 Eio.Mutex.use_rw ~protect:true mutex (fun () ->
352 stats.total_reused <- stats.total_reused + 1);
353
354 (* Call hook if configured *)
355 Option.iter
356 (fun f -> f endpoint)
357 (Config.on_connection_reused pool.config);
358
359 (* Run health check if configured *)
360 match Config.health_check pool.config with
361 | Some check -> (
362 try check (Connection.flow conn) with _ -> false)
363 | None -> true)
364 else
365 false)
366 ~dispose:(fun conn ->
367 (* Called when removing from pool *)
368 Eio.Cancel.protect (fun () ->
369 close_internal pool conn;
370
371 (* Update stats *)
372 Eio.Mutex.use_rw ~protect:true mutex (fun () ->
373 stats.total_closed <- stats.total_closed + 1)))
374 (fun () ->
375 try
376 let conn = create_connection pool endpoint in
377
378 (* Update stats *)
379 Eio.Mutex.use_rw ~protect:true mutex (fun () ->
380 stats.total_created <- stats.total_created + 1);
381
382 (* Call hook if configured *)
383 Option.iter
384 (fun f -> f endpoint)
385 (Config.on_connection_created pool.config);
386
387 conn
388 with Eio.Io _ as ex ->
389 (* Eio.Io exceptions already have full context from create_connection.
390 Just update error stats and let the exception propagate. *)
391 Eio.Mutex.use_rw ~protect:true mutex (fun () ->
392 stats.errors <- stats.errors + 1);
393 raise ex)
394 in
395
396 let ep_pool = { pool = eio_pool; stats; mutex } in
397 Hashtbl.add pool.endpoints endpoint ep_pool;
398 ep_pool)
399
400(** {1 Public API - Pool Creation} *)
401
402let create ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock) ?tls
403 ?(config = Config.default) () : t =
404 Log.info (fun m ->
405 m
406 "Creating new connection pool (max_per_endpoint=%d, max_idle=%.1fs, \
407 max_lifetime=%.1fs)"
408 (Config.max_connections_per_endpoint config)
409 (Config.max_idle_time config)
410 (Config.max_connection_lifetime config));
411
412 let pool =
413 {
414 sw;
415 net;
416 clock;
417 config;
418 tls;
419 endpoints = Hashtbl.create 16;
420 endpoints_mutex = Eio.Mutex.create ();
421 }
422 in
423
424 (* Auto-cleanup on switch release *)
425 Eio.Switch.on_release sw (fun () ->
426 Eio.Cancel.protect (fun () ->
427 Log.info (fun m -> m "Closing connection pool");
428 (* Close all idle connections - active ones will be cleaned up by switch *)
429 Hashtbl.iter
430 (fun _endpoint _ep_pool ->
431 (* Connections are bound to the switch and will be auto-closed *)
432 ())
433 pool.endpoints;
434
435 Hashtbl.clear pool.endpoints));
436
437 T pool
438
439(** {1 Public API - Connection Management} *)
440
441let connection_internal ~sw (T pool) endpoint =
442 Log.debug (fun m -> m "Acquiring connection to %a" Endpoint.pp endpoint);
443 let ep_pool = get_or_create_endpoint_pool pool endpoint in
444
445 (* Create promises for connection handoff and cleanup signal *)
446 let conn_promise, conn_resolver = Eio.Promise.create () in
447 let done_promise, done_resolver = Eio.Promise.create () in
448
449 (* Increment active count *)
450 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
451 ep_pool.stats.active <- ep_pool.stats.active + 1);
452
453 (* Fork a daemon fiber to manage the connection lifecycle.
454 Important: Fork under pool.sw, not the caller's sw, so the daemon
455 survives when the caller's switch ends and can return the connection
456 to the pool for reuse. *)
457 Eio.Fiber.fork_daemon ~sw:pool.sw (fun () ->
458 Fun.protect
459 ~finally:(fun () ->
460 (* Decrement active count *)
461 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
462 ep_pool.stats.active <- ep_pool.stats.active - 1);
463 Log.debug (fun m -> m "Released connection to %a" Endpoint.pp endpoint))
464 (fun () ->
465 (* Use Eio.Pool for resource management *)
466 Eio.Pool.use ep_pool.pool (fun conn ->
467 Log.debug (fun m ->
468 m "Using connection to %a (uses=%d)" Endpoint.pp endpoint
469 (Connection.use_count conn));
470
471 (* Update last used time and use count *)
472 Connection.update_usage conn ~now:(get_time pool);
473
474 (* Update idle stats (connection taken from idle pool) *)
475 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
476 ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
477
478 (* Hand off connection to caller *)
479 Eio.Promise.resolve conn_resolver conn.flow;
480
481 try
482 (* Wait for switch to signal cleanup *)
483 Eio.Promise.await done_promise;
484
485 (* Success - connection will be returned to pool by Eio.Pool *)
486 (* Update idle stats (connection returned to idle pool) *)
487 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
488 ep_pool.stats.idle <- ep_pool.stats.idle + 1);
489
490 `Stop_daemon
491 with e ->
492 (* Error during connection usage - close so it won't be reused.
493 The exception already has context from where it was raised. *)
494 close_internal pool conn;
495
496 (* Update error stats *)
497 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
498 ep_pool.stats.errors <- ep_pool.stats.errors + 1);
499
500 raise e)));
501
502 (* Signal cleanup when switch ends *)
503 Eio.Switch.on_release sw (fun () ->
504 Eio.Promise.resolve done_resolver ());
505
506 (* Return the connection *)
507 Eio.Promise.await conn_promise
508
509let connection ~sw t endpoint = connection_internal ~sw t endpoint
510
511let with_connection t endpoint f =
512 Eio.Switch.run (fun sw -> f (connection ~sw t endpoint))
513
514(** {1 Public API - Statistics} *)
515
516let stats (T pool) endpoint =
517 match Hashtbl.find_opt pool.endpoints endpoint with
518 | Some ep_pool ->
519 Eio.Mutex.use_ro ep_pool.mutex (fun () -> snapshot_stats ep_pool.stats)
520 | None ->
521 (* No pool for this endpoint yet *)
522 Stats.make ~active:0 ~idle:0 ~total_created:0 ~total_reused:0
523 ~total_closed:0 ~errors:0
524
525let all_stats (T pool) =
526 Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
527 Hashtbl.fold
528 (fun endpoint ep_pool acc ->
529 let stats =
530 Eio.Mutex.use_ro ep_pool.mutex (fun () ->
531 snapshot_stats ep_pool.stats)
532 in
533 (endpoint, stats) :: acc)
534 pool.endpoints [])
535
536(** {1 Public API - Pool Management} *)
537
538let clear_endpoint (T pool) endpoint =
539 Log.info (fun m -> m "Clearing endpoint %a from pool" Endpoint.pp endpoint);
540 match Hashtbl.find_opt pool.endpoints endpoint with
541 | Some _ep_pool ->
542 Eio.Cancel.protect (fun () ->
543 (* Remove endpoint pool from hashtable *)
544 (* Idle connections will be discarded *)
545 (* Active connections will be closed when returned *)
546 Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () ->
547 Hashtbl.remove pool.endpoints endpoint))
548 | None ->
549 Log.debug (fun m ->
550 m "No endpoint pool found for %a" Endpoint.pp endpoint)