TCP/TLS connection pooling for Eio
at main 20 kB view raw
1(*--------------------------------------------------------------------------- 2 Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved. 3 SPDX-License-Identifier: ISC 4 ---------------------------------------------------------------------------*) 5 6(** Conpool - Protocol-agnostic TCP/IP connection pooling library for Eio *) 7 8let src = Logs.Src.create "conpool" ~doc:"Connection pooling library" 9 10module Log = (val Logs.src_log src : Logs.LOG) 11 12(* Re-export submodules *) 13module Endpoint = Endpoint 14module Config = Config 15module Stats = Stats 16module Cmd = Cmd 17 18(** {1 Error Types} *) 19 20type error = 21 | Dns_resolution_failed of { hostname : string } 22 | Connection_failed of { 23 endpoint : Endpoint.t; 24 attempts : int; 25 last_error : string; 26 } 27 | Connection_timeout of { endpoint : Endpoint.t; timeout : float } 28 | Invalid_config of string 29 | Invalid_endpoint of string 30 31let pp_error ppf = function 32 | Dns_resolution_failed { hostname } -> 33 Fmt.pf ppf "DNS resolution failed for hostname: %s" hostname 34 | Connection_failed { endpoint; attempts; last_error } -> 35 Fmt.pf ppf "Failed to connect to %a after %d attempts: %s" Endpoint.pp 36 endpoint attempts last_error 37 | Connection_timeout { endpoint; timeout } -> 38 Fmt.pf ppf "Connection timeout to %a after %.2fs" Endpoint.pp endpoint 39 timeout 40 | Invalid_config msg -> Fmt.pf ppf "Invalid configuration: %s" msg 41 | Invalid_endpoint msg -> Fmt.pf ppf "Invalid endpoint: %s" msg 42 43type Eio.Exn.err += E of error 44 45let err e = Eio.Exn.create (E e) 46 47let () = 48 Eio.Exn.register_pp (fun f -> function 49 | E e -> 50 Fmt.string f "Conpool "; 51 pp_error f e; 52 true 53 | _ -> false) 54 55(** {1 Connection Types} *) 56 57type connection_ty = [Eio.Resource.close_ty | Eio.Flow.two_way_ty] 58type connection = connection_ty Eio.Resource.t 59 60type endp_stats = { 61 mutable active : int; 62 mutable idle : int; 63 mutable total_created : int; 64 mutable total_reused : int; 65 mutable total_closed : int; 66 mutable errors : int; 67} 68 69type endpoint_pool = { 70 pool : Connection.t Eio.Pool.t; 71 stats : endp_stats; 72 mutex : Eio.Mutex.t; 73} 74 75type ('clock, 'net) internal = { 76 sw : Eio.Switch.t; 77 net : 'net; 78 clock : 'clock; 79 config : Config.t; 80 tls : Tls.Config.client option; 81 endpoints : (Endpoint.t, endpoint_pool) Hashtbl.t; 82 endpoints_mutex : Eio.Mutex.t; 83} 84 85type t = T : ('clock Eio.Time.clock, 'net Eio.Net.t) internal -> t 86 87module EndpointTbl = Hashtbl.Make (struct 88 type t = Endpoint.t 89 90 let equal = Endpoint.equal 91 let hash = Endpoint.hash 92end) 93 94let get_time (pool : ('clock, 'net) internal) = Eio.Time.now pool.clock 95 96let create_endp_stats () = 97 { 98 active = 0; 99 idle = 0; 100 total_created = 0; 101 total_reused = 0; 102 total_closed = 0; 103 errors = 0; 104 } 105 106let snapshot_stats (stats : endp_stats) : Stats.t = 107 Stats.make ~active:stats.active ~idle:stats.idle 108 ~total_created:stats.total_created ~total_reused:stats.total_reused 109 ~total_closed:stats.total_closed ~errors:stats.errors 110 111(** {1 DNS Resolution} *) 112 113let resolve_endpoint (pool : ('clock, 'net) internal) endpoint = 114 Log.debug (fun m -> m "Resolving %a" Endpoint.pp endpoint); 115 try 116 let addrs = 117 Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint) 118 ~service:(string_of_int (Endpoint.port endpoint)) 119 in 120 match addrs with 121 | addr :: _ -> 122 Log.debug (fun m -> 123 m "Resolved %a to %a" Endpoint.pp endpoint Eio.Net.Sockaddr.pp addr); 124 addr 125 | [] -> 126 (* Raise exception with error code - context will be added when caught *) 127 raise (err (Dns_resolution_failed { hostname = Endpoint.host endpoint })) 128 with Eio.Io _ as ex -> 129 let bt = Printexc.get_raw_backtrace () in 130 Eio.Exn.reraise_with_context ex bt "resolving %a" Endpoint.pp endpoint 131 132(** {1 Connection Creation with Retry} *) 133 134let rec create_connection_with_retry (pool : ('clock, 'net) internal) endpoint 135 attempt last_error = 136 let retry_count = Config.connect_retry_count pool.config in 137 if attempt > retry_count then 138 (* Raise exception with error code - context will be added when caught *) 139 raise (err (Connection_failed { endpoint; attempts = retry_count; last_error })); 140 141 Log.debug (fun m -> 142 m "Connecting to %a (attempt %d/%d)" Endpoint.pp endpoint attempt 143 retry_count); 144 145 try 146 let addr = resolve_endpoint pool endpoint in 147 148 (* Connect with optional timeout *) 149 let socket = 150 try 151 match Config.connect_timeout pool.config with 152 | Some timeout -> 153 Eio.Time.with_timeout_exn pool.clock timeout (fun () -> 154 Eio.Net.connect ~sw:pool.sw pool.net addr) 155 | None -> Eio.Net.connect ~sw:pool.sw pool.net addr 156 with Eio.Io _ as ex -> 157 let bt = Printexc.get_raw_backtrace () in 158 Eio.Exn.reraise_with_context ex bt "connecting to %a" Endpoint.pp endpoint 159 in 160 161 Log.debug (fun m -> 162 m "TCP connection established to %a" Endpoint.pp endpoint); 163 164 let flow = 165 match pool.tls with 166 | None -> 167 (socket :> connection) 168 | Some tls_config -> 169 try 170 Log.debug (fun m -> 171 m "Initiating TLS handshake with %a" Endpoint.pp endpoint); 172 let host = 173 Domain_name.(host_exn (of_string_exn (Endpoint.host endpoint))) 174 in 175 let tls_flow = Tls_eio.client_of_flow ~host tls_config socket in 176 Log.info (fun m -> 177 m "TLS connection established to %a" Endpoint.pp endpoint); 178 (tls_flow :> connection) 179 with Eio.Io _ as ex -> 180 let bt = Printexc.get_raw_backtrace () in 181 Eio.Exn.reraise_with_context ex bt "TLS handshake with %a" Endpoint.pp endpoint 182 in 183 184 let now = get_time pool in 185 Log.info (fun m -> m "Connection created to %a" Endpoint.pp endpoint); 186 { 187 Connection.flow; 188 created_at = now; 189 last_used = now; 190 use_count = 0; 191 endpoint; 192 mutex = Eio.Mutex.create (); 193 } 194 with 195 | Eio.Time.Timeout -> 196 Log.warn (fun m -> 197 m "Connection timeout to %a (attempt %d)" Endpoint.pp endpoint attempt); 198 if attempt >= Config.connect_retry_count pool.config then 199 (* Last attempt - convert to our error type *) 200 match Config.connect_timeout pool.config with 201 | Some timeout -> 202 raise (err (Connection_timeout { endpoint; timeout })) 203 | None -> 204 raise (err (Connection_failed 205 { endpoint; attempts = attempt; last_error = "Timeout" })) 206 else begin 207 (* Retry with exponential backoff *) 208 let delay = 209 Config.connect_retry_delay pool.config 210 *. (2.0 ** float_of_int (attempt - 1)) 211 in 212 Eio.Time.sleep pool.clock delay; 213 create_connection_with_retry pool endpoint (attempt + 1) "Timeout" 214 end 215 | Eio.Io _ as ex -> 216 (* Eio IO errors - retry with backoff and add context on final failure *) 217 let error_msg = Printexc.to_string ex in 218 Log.warn (fun m -> 219 m "Connection attempt %d to %a failed: %s" attempt Endpoint.pp 220 endpoint error_msg); 221 if attempt < Config.connect_retry_count pool.config then ( 222 let delay = 223 Config.connect_retry_delay pool.config 224 *. (2.0 ** float_of_int (attempt - 1)) 225 in 226 Eio.Time.sleep pool.clock delay; 227 create_connection_with_retry pool endpoint (attempt + 1) error_msg) 228 else 229 let bt = Printexc.get_raw_backtrace () in 230 Eio.Exn.reraise_with_context ex bt "after %d retry attempts" attempt 231 232let create_connection (pool : ('clock, 'net) internal) endpoint = 233 create_connection_with_retry pool endpoint 1 "No attempts made" 234 235(** {1 Connection Validation} *) 236 237let is_healthy (pool : ('clock, 'net) internal) ?(check_readable = false) conn = 238 let now = get_time pool in 239 240 (* Check age *) 241 let age = now -. Connection.created_at conn in 242 let max_lifetime = Config.max_connection_lifetime pool.config in 243 if age > max_lifetime then begin 244 Log.debug (fun m -> 245 m "Connection to %a unhealthy: exceeded max lifetime (%.2fs > %.2fs)" 246 Endpoint.pp (Connection.endpoint conn) age max_lifetime); 247 false 248 end 249 (* Check idle time *) 250 else begin 251 let max_idle = Config.max_idle_time pool.config in 252 if now -. Connection.last_used conn > max_idle then begin 253 let idle_time = now -. Connection.last_used conn in 254 Log.debug (fun m -> 255 m "Connection to %a unhealthy: exceeded max idle time (%.2fs > %.2fs)" 256 Endpoint.pp (Connection.endpoint conn) idle_time max_idle); 257 false 258 end (* Check use count *) 259 else if 260 match Config.max_connection_uses pool.config with 261 | Some max -> Connection.use_count conn >= max 262 | None -> false 263 then begin 264 Log.debug (fun m -> 265 m "Connection to %a unhealthy: exceeded max use count (%d)" 266 Endpoint.pp (Connection.endpoint conn) 267 (Connection.use_count conn)); 268 false 269 end (* Optional: custom health check *) 270 else if 271 match Config.health_check pool.config with 272 | Some check -> ( 273 try 274 let healthy = check (Connection.flow conn) in 275 if not healthy then 276 Log.debug (fun m -> 277 m "Connection to %a failed custom health check" Endpoint.pp 278 (Connection.endpoint conn)); 279 not healthy 280 with e -> 281 Log.debug (fun m -> 282 m "Connection to %a health check raised exception: %s" 283 Endpoint.pp (Connection.endpoint conn) (Printexc.to_string e)); 284 true (* Exception in health check = unhealthy *)) 285 | None -> false 286 then false (* Optional: check if socket still connected *) 287 else if check_readable then 288 try 289 (* TODO avsm: a sockopt for this? *) 290 true 291 with _ -> false 292 else begin 293 Log.debug (fun m -> 294 m "Connection to %a is healthy (age=%.2fs, idle=%.2fs, uses=%d)" 295 Endpoint.pp (Connection.endpoint conn) age 296 (now -. Connection.last_used conn) 297 (Connection.use_count conn)); 298 true 299 end 300 end 301 302(** {1 Internal Pool Operations} *) 303 304let close_internal (pool : ('clock, 'net) internal) conn = 305 Log.debug (fun m -> 306 m "Closing connection to %a (age=%.2fs, uses=%d)" Endpoint.pp 307 (Connection.endpoint conn) 308 (get_time pool -. Connection.created_at conn) 309 (Connection.use_count conn)); 310 311 Eio.Cancel.protect (fun () -> 312 try Eio.Flow.close (Connection.flow conn) with _ -> ()); 313 314 (* Call hook if configured *) 315 Option.iter 316 (fun f -> f (Connection.endpoint conn)) 317 (Config.on_connection_closed pool.config) 318 319let get_or_create_endpoint_pool (pool : ('clock, 'net) internal) endpoint = 320 (* First try with read lock *) 321 match 322 Eio.Mutex.use_ro pool.endpoints_mutex (fun () -> 323 Hashtbl.find_opt pool.endpoints endpoint) 324 with 325 | Some ep_pool -> 326 ep_pool 327 | None -> 328 (* Need to create - use write lock *) 329 Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () -> 330 (* Check again in case another fiber created it *) 331 match Hashtbl.find_opt pool.endpoints endpoint with 332 | Some ep_pool -> 333 ep_pool 334 | None -> 335 (* Create new endpoint pool *) 336 let stats = create_endp_stats () in 337 let mutex = Eio.Mutex.create () in 338 339 Log.info (fun m -> 340 m "Creating endpoint pool for %a (max_connections=%d)" 341 Endpoint.pp endpoint 342 (Config.max_connections_per_endpoint pool.config)); 343 344 let eio_pool = 345 Eio.Pool.create 346 (Config.max_connections_per_endpoint pool.config) 347 ~validate:(fun conn -> 348 let healthy = is_healthy pool ~check_readable:false conn in 349 if healthy then ( 350 (* Update stats for reuse *) 351 Eio.Mutex.use_rw ~protect:true mutex (fun () -> 352 stats.total_reused <- stats.total_reused + 1); 353 354 (* Call hook if configured *) 355 Option.iter 356 (fun f -> f endpoint) 357 (Config.on_connection_reused pool.config); 358 359 (* Run health check if configured *) 360 match Config.health_check pool.config with 361 | Some check -> ( 362 try check (Connection.flow conn) with _ -> false) 363 | None -> true) 364 else 365 false) 366 ~dispose:(fun conn -> 367 (* Called when removing from pool *) 368 Eio.Cancel.protect (fun () -> 369 close_internal pool conn; 370 371 (* Update stats *) 372 Eio.Mutex.use_rw ~protect:true mutex (fun () -> 373 stats.total_closed <- stats.total_closed + 1))) 374 (fun () -> 375 try 376 let conn = create_connection pool endpoint in 377 378 (* Update stats *) 379 Eio.Mutex.use_rw ~protect:true mutex (fun () -> 380 stats.total_created <- stats.total_created + 1); 381 382 (* Call hook if configured *) 383 Option.iter 384 (fun f -> f endpoint) 385 (Config.on_connection_created pool.config); 386 387 conn 388 with Eio.Io _ as ex -> 389 (* Eio.Io exceptions already have full context from create_connection. 390 Just update error stats and let the exception propagate. *) 391 Eio.Mutex.use_rw ~protect:true mutex (fun () -> 392 stats.errors <- stats.errors + 1); 393 raise ex) 394 in 395 396 let ep_pool = { pool = eio_pool; stats; mutex } in 397 Hashtbl.add pool.endpoints endpoint ep_pool; 398 ep_pool) 399 400(** {1 Public API - Pool Creation} *) 401 402let create ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock) ?tls 403 ?(config = Config.default) () : t = 404 Log.info (fun m -> 405 m 406 "Creating new connection pool (max_per_endpoint=%d, max_idle=%.1fs, \ 407 max_lifetime=%.1fs)" 408 (Config.max_connections_per_endpoint config) 409 (Config.max_idle_time config) 410 (Config.max_connection_lifetime config)); 411 412 let pool = 413 { 414 sw; 415 net; 416 clock; 417 config; 418 tls; 419 endpoints = Hashtbl.create 16; 420 endpoints_mutex = Eio.Mutex.create (); 421 } 422 in 423 424 (* Auto-cleanup on switch release *) 425 Eio.Switch.on_release sw (fun () -> 426 Eio.Cancel.protect (fun () -> 427 Log.info (fun m -> m "Closing connection pool"); 428 (* Close all idle connections - active ones will be cleaned up by switch *) 429 Hashtbl.iter 430 (fun _endpoint _ep_pool -> 431 (* Connections are bound to the switch and will be auto-closed *) 432 ()) 433 pool.endpoints; 434 435 Hashtbl.clear pool.endpoints)); 436 437 T pool 438 439(** {1 Public API - Connection Management} *) 440 441let connection_internal ~sw (T pool) endpoint = 442 Log.debug (fun m -> m "Acquiring connection to %a" Endpoint.pp endpoint); 443 let ep_pool = get_or_create_endpoint_pool pool endpoint in 444 445 (* Create promises for connection handoff and cleanup signal *) 446 let conn_promise, conn_resolver = Eio.Promise.create () in 447 let done_promise, done_resolver = Eio.Promise.create () in 448 449 (* Increment active count *) 450 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 451 ep_pool.stats.active <- ep_pool.stats.active + 1); 452 453 (* Fork a daemon fiber to manage the connection lifecycle. 454 Important: Fork under pool.sw, not the caller's sw, so the daemon 455 survives when the caller's switch ends and can return the connection 456 to the pool for reuse. *) 457 Eio.Fiber.fork_daemon ~sw:pool.sw (fun () -> 458 Fun.protect 459 ~finally:(fun () -> 460 (* Decrement active count *) 461 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 462 ep_pool.stats.active <- ep_pool.stats.active - 1); 463 Log.debug (fun m -> m "Released connection to %a" Endpoint.pp endpoint)) 464 (fun () -> 465 (* Use Eio.Pool for resource management *) 466 Eio.Pool.use ep_pool.pool (fun conn -> 467 Log.debug (fun m -> 468 m "Using connection to %a (uses=%d)" Endpoint.pp endpoint 469 (Connection.use_count conn)); 470 471 (* Update last used time and use count *) 472 Connection.update_usage conn ~now:(get_time pool); 473 474 (* Update idle stats (connection taken from idle pool) *) 475 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 476 ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1)); 477 478 (* Hand off connection to caller *) 479 Eio.Promise.resolve conn_resolver conn.flow; 480 481 try 482 (* Wait for switch to signal cleanup *) 483 Eio.Promise.await done_promise; 484 485 (* Success - connection will be returned to pool by Eio.Pool *) 486 (* Update idle stats (connection returned to idle pool) *) 487 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 488 ep_pool.stats.idle <- ep_pool.stats.idle + 1); 489 490 `Stop_daemon 491 with e -> 492 (* Error during connection usage - close so it won't be reused. 493 The exception already has context from where it was raised. *) 494 close_internal pool conn; 495 496 (* Update error stats *) 497 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 498 ep_pool.stats.errors <- ep_pool.stats.errors + 1); 499 500 raise e))); 501 502 (* Signal cleanup when switch ends *) 503 Eio.Switch.on_release sw (fun () -> 504 Eio.Promise.resolve done_resolver ()); 505 506 (* Return the connection *) 507 Eio.Promise.await conn_promise 508 509let connection ~sw t endpoint = connection_internal ~sw t endpoint 510 511let with_connection t endpoint f = 512 Eio.Switch.run (fun sw -> f (connection ~sw t endpoint)) 513 514(** {1 Public API - Statistics} *) 515 516let stats (T pool) endpoint = 517 match Hashtbl.find_opt pool.endpoints endpoint with 518 | Some ep_pool -> 519 Eio.Mutex.use_ro ep_pool.mutex (fun () -> snapshot_stats ep_pool.stats) 520 | None -> 521 (* No pool for this endpoint yet *) 522 Stats.make ~active:0 ~idle:0 ~total_created:0 ~total_reused:0 523 ~total_closed:0 ~errors:0 524 525let all_stats (T pool) = 526 Eio.Mutex.use_ro pool.endpoints_mutex (fun () -> 527 Hashtbl.fold 528 (fun endpoint ep_pool acc -> 529 let stats = 530 Eio.Mutex.use_ro ep_pool.mutex (fun () -> 531 snapshot_stats ep_pool.stats) 532 in 533 (endpoint, stats) :: acc) 534 pool.endpoints []) 535 536(** {1 Public API - Pool Management} *) 537 538let clear_endpoint (T pool) endpoint = 539 Log.info (fun m -> m "Clearing endpoint %a from pool" Endpoint.pp endpoint); 540 match Hashtbl.find_opt pool.endpoints endpoint with 541 | Some _ep_pool -> 542 Eio.Cancel.protect (fun () -> 543 (* Remove endpoint pool from hashtable *) 544 (* Idle connections will be discarded *) 545 (* Active connections will be closed when returned *) 546 Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () -> 547 Hashtbl.remove pool.endpoints endpoint)) 548 | None -> 549 Log.debug (fun m -> 550 m "No endpoint pool found for %a" Endpoint.pp endpoint)