TCP/TLS connection pooling for Eio
1(*---------------------------------------------------------------------------
2 Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
3 SPDX-License-Identifier: ISC
4 ---------------------------------------------------------------------------*)
5
6(** Conpool - Protocol-agnostic TCP/IP connection pooling library for Eio *)
7
8let src = Logs.Src.create "conpool" ~doc:"Connection pooling library"
9
10module Log = (val Logs.src_log src : Logs.LOG)
11
12(* Re-export submodules *)
13module Endpoint = Endpoint
14module Config = Config
15module Stats = Stats
16module Cmd = Cmd
17
18(** {1 Error Types} *)
19
20type error =
21 | Dns_resolution_failed of { hostname : string }
22 | Connection_failed of {
23 endpoint : Endpoint.t;
24 attempts : int;
25 last_error : string;
26 }
27 | Connection_timeout of { endpoint : Endpoint.t; timeout : float }
28 | Invalid_config of string
29 | Invalid_endpoint of string
30
31exception Pool_error of error
32
33let pp_error ppf = function
34 | Dns_resolution_failed { hostname } ->
35 Fmt.pf ppf "DNS resolution failed for hostname: %s" hostname
36 | Connection_failed { endpoint; attempts; last_error } ->
37 Fmt.pf ppf "Failed to connect to %a after %d attempts: %s" Endpoint.pp
38 endpoint attempts last_error
39 | Connection_timeout { endpoint; timeout } ->
40 Fmt.pf ppf "Connection timeout to %a after %.2fs" Endpoint.pp endpoint
41 timeout
42 | Invalid_config msg -> Fmt.pf ppf "Invalid configuration: %s" msg
43 | Invalid_endpoint msg -> Fmt.pf ppf "Invalid endpoint: %s" msg
44
45type Eio.Exn.err += E of error
46
47let err e = Eio.Exn.create (E e)
48
49let () =
50 Eio.Exn.register_pp (fun f -> function
51 | E e ->
52 Fmt.string f "Conpool ";
53 pp_error f e;
54 true
55 | _ -> false)
56
57(** {1 Connection Types} *)
58
59type connection_ty = [Eio.Resource.close_ty | Eio.Flow.two_way_ty]
60type connection = connection_ty Eio.Resource.t
61
62type endp_stats = {
63 mutable active : int;
64 mutable idle : int;
65 mutable total_created : int;
66 mutable total_reused : int;
67 mutable total_closed : int;
68 mutable errors : int;
69}
70
71type endpoint_pool = {
72 pool : Connection.t Eio.Pool.t;
73 stats : endp_stats;
74 mutex : Eio.Mutex.t;
75}
76
77type ('clock, 'net) internal = {
78 sw : Eio.Switch.t;
79 net : 'net;
80 clock : 'clock;
81 config : Config.t;
82 tls : Tls.Config.client option;
83 endpoints : (Endpoint.t, endpoint_pool) Hashtbl.t;
84 endpoints_mutex : Eio.Mutex.t;
85}
86
87type t = T : ('clock Eio.Time.clock, 'net Eio.Net.t) internal -> t
88
89module EndpointTbl = Hashtbl.Make (struct
90 type t = Endpoint.t
91
92 let equal = Endpoint.equal
93 let hash = Endpoint.hash
94end)
95
96let get_time (pool : ('clock, 'net) internal) = Eio.Time.now pool.clock
97
98let create_endp_stats () =
99 {
100 active = 0;
101 idle = 0;
102 total_created = 0;
103 total_reused = 0;
104 total_closed = 0;
105 errors = 0;
106 }
107
108let snapshot_stats (stats : endp_stats) : Stats.t =
109 Stats.make ~active:stats.active ~idle:stats.idle
110 ~total_created:stats.total_created ~total_reused:stats.total_reused
111 ~total_closed:stats.total_closed ~errors:stats.errors
112
113(** {1 DNS Resolution} *)
114
115let resolve_endpoint (pool : ('clock, 'net) internal) endpoint =
116 Log.debug (fun m -> m "Resolving %a..." Endpoint.pp endpoint);
117 let addrs =
118 Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint)
119 ~service:(string_of_int (Endpoint.port endpoint))
120 in
121 Log.debug (fun m -> m "Got address list for %a" Endpoint.pp endpoint);
122 match addrs with
123 | addr :: _ ->
124 Log.debug (fun m ->
125 m "Resolved %a to %a" Endpoint.pp endpoint Eio.Net.Sockaddr.pp addr);
126 addr
127 | [] ->
128 Log.err (fun m ->
129 m "Failed to resolve hostname: %s" (Endpoint.host endpoint));
130 raise
131 (Pool_error
132 (Dns_resolution_failed { hostname = Endpoint.host endpoint }))
133
134(** {1 Connection Creation with Retry} *)
135
136let rec create_connection_with_retry (pool : ('clock, 'net) internal) endpoint
137 attempt last_error =
138 let retry_count = Config.connect_retry_count pool.config in
139 if attempt > retry_count then begin
140 Log.err (fun m ->
141 m "Failed to connect to %a after %d attempts" Endpoint.pp endpoint
142 retry_count);
143 raise
144 (Pool_error
145 (Connection_failed { endpoint; attempts = retry_count; last_error }))
146 end;
147
148 Log.debug (fun m ->
149 m "Connecting to %a (attempt %d/%d)" Endpoint.pp endpoint attempt
150 retry_count);
151
152 try
153 let addr = resolve_endpoint pool endpoint in
154 Log.debug (fun m -> m "Resolved %a to address" Endpoint.pp endpoint);
155
156 (* Connect with optional timeout *)
157 let socket =
158 match Config.connect_timeout pool.config with
159 | Some timeout ->
160 Eio.Time.with_timeout_exn pool.clock timeout (fun () ->
161 Eio.Net.connect ~sw:pool.sw pool.net addr)
162 | None -> Eio.Net.connect ~sw:pool.sw pool.net addr
163 in
164
165 Log.debug (fun m ->
166 m "TCP connection established to %a" Endpoint.pp endpoint);
167
168 let flow =
169 match pool.tls with
170 | None ->
171 (socket :> connection)
172 | Some tls_config ->
173 Log.debug (fun m ->
174 m "Initiating TLS handshake with %a" Endpoint.pp endpoint);
175 let host =
176 Domain_name.(host_exn (of_string_exn (Endpoint.host endpoint)))
177 in
178 let tls_flow = Tls_eio.client_of_flow ~host tls_config socket in
179 Log.info (fun m ->
180 m "TLS connection established to %a" Endpoint.pp endpoint);
181 (tls_flow :> connection)
182 in
183
184 let now = get_time pool in
185 Log.info (fun m -> m "Connection created to %a" Endpoint.pp endpoint);
186 {
187 Connection.flow;
188 created_at = now;
189 last_used = now;
190 use_count = 0;
191 endpoint;
192 mutex = Eio.Mutex.create ();
193 }
194 with
195 | Eio.Time.Timeout as e ->
196 Log.warn (fun m ->
197 m "Connection timeout to %a (attempt %d)" Endpoint.pp endpoint attempt);
198 let error_msg = Printexc.to_string e in
199 if attempt >= Config.connect_retry_count pool.config then
200 (* Last attempt - convert to our error type *)
201 match Config.connect_timeout pool.config with
202 | Some timeout ->
203 raise (Pool_error (Connection_timeout { endpoint; timeout }))
204 | None ->
205 raise
206 (Pool_error
207 (Connection_failed
208 { endpoint; attempts = attempt; last_error = error_msg }))
209 else begin
210 (* Retry with exponential backoff *)
211 let delay =
212 Config.connect_retry_delay pool.config
213 *. (2.0 ** float_of_int (attempt - 1))
214 in
215 Eio.Time.sleep pool.clock delay;
216 create_connection_with_retry pool endpoint (attempt + 1) error_msg
217 end
218 | e ->
219 (* Other errors - retry with backoff *)
220 let error_msg = Printexc.to_string e in
221 Log.warn (fun m ->
222 m "Connection attempt %d to %a failed: %s" attempt Endpoint.pp
223 endpoint error_msg);
224 if attempt < Config.connect_retry_count pool.config then (
225 let delay =
226 Config.connect_retry_delay pool.config
227 *. (2.0 ** float_of_int (attempt - 1))
228 in
229 Eio.Time.sleep pool.clock delay;
230 create_connection_with_retry pool endpoint (attempt + 1) error_msg)
231 else
232 raise
233 (Pool_error
234 (Connection_failed
235 { endpoint; attempts = attempt; last_error = error_msg }))
236
237let create_connection (pool : ('clock, 'net) internal) endpoint =
238 create_connection_with_retry pool endpoint 1 "No attempts made"
239
240(** {1 Connection Validation} *)
241
242let is_healthy (pool : ('clock, 'net) internal) ?(check_readable = false) conn =
243 let now = get_time pool in
244
245 (* Check age *)
246 let age = now -. Connection.created_at conn in
247 let max_lifetime = Config.max_connection_lifetime pool.config in
248 if age > max_lifetime then begin
249 Log.debug (fun m ->
250 m "Connection to %a unhealthy: exceeded max lifetime (%.2fs > %.2fs)"
251 Endpoint.pp (Connection.endpoint conn) age max_lifetime);
252 false
253 end
254 (* Check idle time *)
255 else begin
256 let max_idle = Config.max_idle_time pool.config in
257 if now -. Connection.last_used conn > max_idle then begin
258 let idle_time = now -. Connection.last_used conn in
259 Log.debug (fun m ->
260 m "Connection to %a unhealthy: exceeded max idle time (%.2fs > %.2fs)"
261 Endpoint.pp (Connection.endpoint conn) idle_time max_idle);
262 false
263 end (* Check use count *)
264 else if
265 match Config.max_connection_uses pool.config with
266 | Some max -> Connection.use_count conn >= max
267 | None -> false
268 then begin
269 Log.debug (fun m ->
270 m "Connection to %a unhealthy: exceeded max use count (%d)"
271 Endpoint.pp (Connection.endpoint conn)
272 (Connection.use_count conn));
273 false
274 end (* Optional: custom health check *)
275 else if
276 match Config.health_check pool.config with
277 | Some check -> (
278 try
279 let healthy = check (Connection.flow conn) in
280 if not healthy then
281 Log.debug (fun m ->
282 m "Connection to %a failed custom health check" Endpoint.pp
283 (Connection.endpoint conn));
284 not healthy
285 with e ->
286 Log.debug (fun m ->
287 m "Connection to %a health check raised exception: %s"
288 Endpoint.pp (Connection.endpoint conn) (Printexc.to_string e));
289 true (* Exception in health check = unhealthy *))
290 | None -> false
291 then false (* Optional: check if socket still connected *)
292 else if check_readable then
293 try
294 (* TODO avsm: a sockopt for this? *)
295 true
296 with _ -> false
297 else begin
298 Log.debug (fun m ->
299 m "Connection to %a is healthy (age=%.2fs, idle=%.2fs, uses=%d)"
300 Endpoint.pp (Connection.endpoint conn) age
301 (now -. Connection.last_used conn)
302 (Connection.use_count conn));
303 true
304 end
305 end
306
307(** {1 Internal Pool Operations} *)
308
309let close_internal (pool : ('clock, 'net) internal) conn =
310 Log.debug (fun m ->
311 m "Closing connection to %a (age=%.2fs, uses=%d)" Endpoint.pp
312 (Connection.endpoint conn)
313 (get_time pool -. Connection.created_at conn)
314 (Connection.use_count conn));
315
316 Eio.Cancel.protect (fun () ->
317 try Eio.Flow.close (Connection.flow conn) with _ -> ());
318
319 (* Call hook if configured *)
320 Option.iter
321 (fun f -> f (Connection.endpoint conn))
322 (Config.on_connection_closed pool.config)
323
324let get_or_create_endpoint_pool (pool : ('clock, 'net) internal) endpoint =
325 Log.debug (fun m ->
326 m "Getting or creating endpoint pool for %a" Endpoint.pp endpoint);
327
328 (* First try with read lock *)
329 match
330 Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
331 Hashtbl.find_opt pool.endpoints endpoint)
332 with
333 | Some ep_pool ->
334 Log.debug (fun m ->
335 m "Found existing endpoint pool for %a" Endpoint.pp endpoint);
336 ep_pool
337 | None ->
338 Log.debug (fun m ->
339 m "No existing pool, need to create for %a" Endpoint.pp endpoint);
340 (* Need to create - use write lock *)
341 Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () ->
342 (* Check again in case another fiber created it *)
343 match Hashtbl.find_opt pool.endpoints endpoint with
344 | Some ep_pool ->
345 Log.debug (fun m ->
346 m "Another fiber created pool for %a" Endpoint.pp endpoint);
347 ep_pool
348 | None ->
349 (* Create new endpoint pool *)
350 let stats = create_endp_stats () in
351 let mutex = Eio.Mutex.create () in
352
353 Log.info (fun m ->
354 m "Creating new endpoint pool for %a (max_connections=%d)"
355 Endpoint.pp endpoint
356 (Config.max_connections_per_endpoint pool.config));
357
358 Log.debug (fun m ->
359 m "About to create Eio.Pool for %a" Endpoint.pp endpoint);
360
361 let eio_pool =
362 Eio.Pool.create
363 (Config.max_connections_per_endpoint pool.config)
364 ~validate:(fun conn ->
365 Log.debug (fun m ->
366 m "Validate called for connection to %a" Endpoint.pp
367 endpoint);
368 (* Called before reusing from pool *)
369 let healthy = is_healthy pool ~check_readable:false conn in
370
371 if healthy then (
372 Log.debug (fun m ->
373 m "Reusing connection to %a from pool" Endpoint.pp
374 endpoint);
375
376 (* Update stats for reuse *)
377 Eio.Mutex.use_rw ~protect:true mutex (fun () ->
378 stats.total_reused <- stats.total_reused + 1);
379
380 (* Call hook if configured *)
381 Option.iter
382 (fun f -> f endpoint)
383 (Config.on_connection_reused pool.config);
384
385 (* Run health check if configured *)
386 match Config.health_check pool.config with
387 | Some check -> (
388 try check (Connection.flow conn) with _ -> false)
389 | None -> true)
390 else begin
391 Log.debug (fun m ->
392 m
393 "Connection to %a failed validation, creating new \
394 one"
395 Endpoint.pp endpoint);
396 false
397 end)
398 ~dispose:(fun conn ->
399 (* Called when removing from pool *)
400 Eio.Cancel.protect (fun () ->
401 close_internal pool conn;
402
403 (* Update stats *)
404 Eio.Mutex.use_rw ~protect:true mutex (fun () ->
405 stats.total_closed <- stats.total_closed + 1)))
406 (fun () ->
407 Log.debug (fun m ->
408 m "Factory function called for %a" Endpoint.pp endpoint);
409 try
410 let conn = create_connection pool endpoint in
411
412 Log.debug (fun m ->
413 m "Connection created successfully for %a" Endpoint.pp
414 endpoint);
415
416 (* Update stats *)
417 Eio.Mutex.use_rw ~protect:true mutex (fun () ->
418 stats.total_created <- stats.total_created + 1);
419
420 (* Call hook if configured *)
421 Option.iter
422 (fun f -> f endpoint)
423 (Config.on_connection_created pool.config);
424
425 conn
426 with e ->
427 Log.err (fun m ->
428 m "Factory function failed for %a: %s" Endpoint.pp
429 endpoint (Printexc.to_string e));
430 (* Update error stats *)
431 Eio.Mutex.use_rw ~protect:true mutex (fun () ->
432 stats.errors <- stats.errors + 1);
433 raise e)
434 in
435
436 Log.debug (fun m ->
437 m "Eio.Pool created successfully for %a" Endpoint.pp endpoint);
438
439 let ep_pool = { pool = eio_pool; stats; mutex } in
440
441 Hashtbl.add pool.endpoints endpoint ep_pool;
442 Log.debug (fun m ->
443 m "Endpoint pool added to hashtable for %a" Endpoint.pp
444 endpoint);
445 ep_pool)
446
447(** {1 Public API - Pool Creation} *)
448
449let create ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock) ?tls
450 ?(config = Config.default) () : t =
451 Log.info (fun m ->
452 m
453 "Creating new connection pool (max_per_endpoint=%d, max_idle=%.1fs, \
454 max_lifetime=%.1fs)"
455 (Config.max_connections_per_endpoint config)
456 (Config.max_idle_time config)
457 (Config.max_connection_lifetime config));
458
459 let pool =
460 {
461 sw;
462 net;
463 clock;
464 config;
465 tls;
466 endpoints = Hashtbl.create 16;
467 endpoints_mutex = Eio.Mutex.create ();
468 }
469 in
470
471 (* Auto-cleanup on switch release *)
472 Eio.Switch.on_release sw (fun () ->
473 Eio.Cancel.protect (fun () ->
474 Log.info (fun m -> m "Closing connection pool");
475 (* Close all idle connections - active ones will be cleaned up by switch *)
476 Hashtbl.iter
477 (fun _endpoint _ep_pool ->
478 (* Connections are bound to the switch and will be auto-closed *)
479 ())
480 pool.endpoints;
481
482 Hashtbl.clear pool.endpoints));
483
484 T pool
485
486(** {1 Public API - Connection Management} *)
487
488let connection_internal ~sw (T pool) endpoint =
489 Log.debug (fun m -> m "Acquiring connection to %a" Endpoint.pp endpoint);
490 let ep_pool = get_or_create_endpoint_pool pool endpoint in
491
492 (* Create promises for connection handoff and cleanup signal *)
493 let conn_promise, conn_resolver = Eio.Promise.create () in
494 let done_promise, done_resolver = Eio.Promise.create () in
495
496 (* Increment active count *)
497 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
498 ep_pool.stats.active <- ep_pool.stats.active + 1);
499
500 (* Fork a daemon fiber to manage the connection lifecycle *)
501 Eio.Fiber.fork_daemon ~sw (fun () ->
502 Fun.protect
503 ~finally:(fun () ->
504 (* Decrement active count *)
505 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
506 ep_pool.stats.active <- ep_pool.stats.active - 1);
507 Log.debug (fun m -> m "Released connection to %a" Endpoint.pp endpoint))
508 (fun () ->
509 (* Use Eio.Pool for resource management *)
510 Eio.Pool.use ep_pool.pool (fun conn ->
511 Log.debug (fun m ->
512 m "Using connection to %a (uses=%d)" Endpoint.pp endpoint
513 (Connection.use_count conn));
514
515 (* Update last used time and use count *)
516 Connection.update_usage conn ~now:(get_time pool);
517
518 (* Update idle stats (connection taken from idle pool) *)
519 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
520 ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
521
522 (* Hand off connection to caller *)
523 Eio.Promise.resolve conn_resolver conn.flow;
524
525 try
526 (* Wait for switch to signal cleanup *)
527 Eio.Promise.await done_promise;
528
529 (* Success - connection will be returned to pool by Eio.Pool *)
530 (* Update idle stats (connection returned to idle pool) *)
531 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
532 ep_pool.stats.idle <- ep_pool.stats.idle + 1);
533
534 `Stop_daemon
535 with e ->
536 (* Error - close connection so it won't be reused *)
537 Log.warn (fun m ->
538 m "Error with connection to %a: %s" Endpoint.pp endpoint
539 (Printexc.to_string e));
540 close_internal pool conn;
541
542 (* Update error stats *)
543 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
544 ep_pool.stats.errors <- ep_pool.stats.errors + 1);
545
546 raise e)));
547
548 (* Signal cleanup when switch ends *)
549 Eio.Switch.on_release sw (fun () ->
550 Eio.Promise.resolve done_resolver ());
551
552 (* Return the connection *)
553 Eio.Promise.await conn_promise
554
555let connection ~sw t endpoint = connection_internal ~sw t endpoint
556
557let with_connection t endpoint f =
558 Eio.Switch.run (fun sw -> f (connection ~sw t endpoint))
559
560let with_connection_exn t endpoint f =
561 try with_connection t endpoint f with Pool_error e -> raise (err e)
562
563(** {1 Public API - Statistics} *)
564
565let stats (T pool) endpoint =
566 match Hashtbl.find_opt pool.endpoints endpoint with
567 | Some ep_pool ->
568 Eio.Mutex.use_ro ep_pool.mutex (fun () -> snapshot_stats ep_pool.stats)
569 | None ->
570 (* No pool for this endpoint yet *)
571 Stats.make ~active:0 ~idle:0 ~total_created:0 ~total_reused:0
572 ~total_closed:0 ~errors:0
573
574let all_stats (T pool) =
575 Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
576 Hashtbl.fold
577 (fun endpoint ep_pool acc ->
578 let stats =
579 Eio.Mutex.use_ro ep_pool.mutex (fun () ->
580 snapshot_stats ep_pool.stats)
581 in
582 (endpoint, stats) :: acc)
583 pool.endpoints [])
584
585(** {1 Public API - Pool Management} *)
586
587let clear_endpoint (T pool) endpoint =
588 Log.info (fun m -> m "Clearing endpoint %a from pool" Endpoint.pp endpoint);
589 match Hashtbl.find_opt pool.endpoints endpoint with
590 | Some _ep_pool ->
591 Eio.Cancel.protect (fun () ->
592 (* Remove endpoint pool from hashtable *)
593 (* Idle connections will be discarded *)
594 (* Active connections will be closed when returned *)
595 Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () ->
596 Hashtbl.remove pool.endpoints endpoint))
597 | None ->
598 Log.debug (fun m ->
599 m "No endpoint pool found for %a" Endpoint.pp endpoint)