TCP/TLS connection pooling for Eio
1(*--------------------------------------------------------------------------- 2 Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved. 3 SPDX-License-Identifier: ISC 4 ---------------------------------------------------------------------------*) 5 6(** Conpool - Protocol-agnostic TCP/IP connection pooling library for Eio *) 7 8let src = Logs.Src.create "conpool" ~doc:"Connection pooling library" 9 10module Log = (val Logs.src_log src : Logs.LOG) 11 12(* Re-export submodules *) 13module Endpoint = Endpoint 14module Config = Config 15module Stats = Stats 16module Cmd = Cmd 17 18(** {1 Error Types} *) 19 20type error = 21 | Dns_resolution_failed of { hostname : string } 22 | Connection_failed of { 23 endpoint : Endpoint.t; 24 attempts : int; 25 last_error : string; 26 } 27 | Connection_timeout of { endpoint : Endpoint.t; timeout : float } 28 | Invalid_config of string 29 | Invalid_endpoint of string 30 31exception Pool_error of error 32 33let pp_error ppf = function 34 | Dns_resolution_failed { hostname } -> 35 Fmt.pf ppf "DNS resolution failed for hostname: %s" hostname 36 | Connection_failed { endpoint; attempts; last_error } -> 37 Fmt.pf ppf "Failed to connect to %a after %d attempts: %s" Endpoint.pp 38 endpoint attempts last_error 39 | Connection_timeout { endpoint; timeout } -> 40 Fmt.pf ppf "Connection timeout to %a after %.2fs" Endpoint.pp endpoint 41 timeout 42 | Invalid_config msg -> Fmt.pf ppf "Invalid configuration: %s" msg 43 | Invalid_endpoint msg -> Fmt.pf ppf "Invalid endpoint: %s" msg 44 45type Eio.Exn.err += E of error 46 47let err e = Eio.Exn.create (E e) 48 49let () = 50 Eio.Exn.register_pp (fun f -> function 51 | E e -> 52 Fmt.string f "Conpool "; 53 pp_error f e; 54 true 55 | _ -> false) 56 57(** {1 Connection Types} *) 58 59type connection_ty = [Eio.Resource.close_ty | Eio.Flow.two_way_ty] 60type connection = connection_ty Eio.Resource.t 61 62type endp_stats = { 63 mutable active : int; 64 mutable idle : int; 65 mutable total_created : int; 66 mutable total_reused : int; 67 mutable total_closed : int; 68 mutable errors : int; 69} 70 71type endpoint_pool = { 72 pool : Connection.t Eio.Pool.t; 73 stats : endp_stats; 74 mutex : Eio.Mutex.t; 75} 76 77type ('clock, 'net) internal = { 78 sw : Eio.Switch.t; 79 net : 'net; 80 clock : 'clock; 81 config : Config.t; 82 tls : Tls.Config.client option; 83 endpoints : (Endpoint.t, endpoint_pool) Hashtbl.t; 84 endpoints_mutex : Eio.Mutex.t; 85} 86 87type t = T : ('clock Eio.Time.clock, 'net Eio.Net.t) internal -> t 88 89module EndpointTbl = Hashtbl.Make (struct 90 type t = Endpoint.t 91 92 let equal = Endpoint.equal 93 let hash = Endpoint.hash 94end) 95 96let get_time (pool : ('clock, 'net) internal) = Eio.Time.now pool.clock 97 98let create_endp_stats () = 99 { 100 active = 0; 101 idle = 0; 102 total_created = 0; 103 total_reused = 0; 104 total_closed = 0; 105 errors = 0; 106 } 107 108let snapshot_stats (stats : endp_stats) : Stats.t = 109 Stats.make ~active:stats.active ~idle:stats.idle 110 ~total_created:stats.total_created ~total_reused:stats.total_reused 111 ~total_closed:stats.total_closed ~errors:stats.errors 112 113(** {1 DNS Resolution} *) 114 115let resolve_endpoint (pool : ('clock, 'net) internal) endpoint = 116 Log.debug (fun m -> m "Resolving %a..." Endpoint.pp endpoint); 117 let addrs = 118 Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint) 119 ~service:(string_of_int (Endpoint.port endpoint)) 120 in 121 Log.debug (fun m -> m "Got address list for %a" Endpoint.pp endpoint); 122 match addrs with 123 | addr :: _ -> 124 Log.debug (fun m -> 125 m "Resolved %a to %a" Endpoint.pp endpoint Eio.Net.Sockaddr.pp addr); 126 addr 127 | [] -> 128 Log.err (fun m -> 129 m "Failed to resolve hostname: %s" (Endpoint.host endpoint)); 130 raise 131 (Pool_error 132 (Dns_resolution_failed { hostname = Endpoint.host endpoint })) 133 134(** {1 Connection Creation with Retry} *) 135 136let rec create_connection_with_retry (pool : ('clock, 'net) internal) endpoint 137 attempt last_error = 138 let retry_count = Config.connect_retry_count pool.config in 139 if attempt > retry_count then begin 140 Log.err (fun m -> 141 m "Failed to connect to %a after %d attempts" Endpoint.pp endpoint 142 retry_count); 143 raise 144 (Pool_error 145 (Connection_failed { endpoint; attempts = retry_count; last_error })) 146 end; 147 148 Log.debug (fun m -> 149 m "Connecting to %a (attempt %d/%d)" Endpoint.pp endpoint attempt 150 retry_count); 151 152 try 153 let addr = resolve_endpoint pool endpoint in 154 Log.debug (fun m -> m "Resolved %a to address" Endpoint.pp endpoint); 155 156 (* Connect with optional timeout *) 157 let socket = 158 match Config.connect_timeout pool.config with 159 | Some timeout -> 160 Eio.Time.with_timeout_exn pool.clock timeout (fun () -> 161 Eio.Net.connect ~sw:pool.sw pool.net addr) 162 | None -> Eio.Net.connect ~sw:pool.sw pool.net addr 163 in 164 165 Log.debug (fun m -> 166 m "TCP connection established to %a" Endpoint.pp endpoint); 167 168 let flow = 169 match pool.tls with 170 | None -> 171 (socket :> connection) 172 | Some tls_config -> 173 Log.debug (fun m -> 174 m "Initiating TLS handshake with %a" Endpoint.pp endpoint); 175 let host = 176 Domain_name.(host_exn (of_string_exn (Endpoint.host endpoint))) 177 in 178 let tls_flow = Tls_eio.client_of_flow ~host tls_config socket in 179 Log.info (fun m -> 180 m "TLS connection established to %a" Endpoint.pp endpoint); 181 (tls_flow :> connection) 182 in 183 184 let now = get_time pool in 185 Log.info (fun m -> m "Connection created to %a" Endpoint.pp endpoint); 186 { 187 Connection.flow; 188 created_at = now; 189 last_used = now; 190 use_count = 0; 191 endpoint; 192 mutex = Eio.Mutex.create (); 193 } 194 with 195 | Eio.Time.Timeout as e -> 196 Log.warn (fun m -> 197 m "Connection timeout to %a (attempt %d)" Endpoint.pp endpoint attempt); 198 let error_msg = Printexc.to_string e in 199 if attempt >= Config.connect_retry_count pool.config then 200 (* Last attempt - convert to our error type *) 201 match Config.connect_timeout pool.config with 202 | Some timeout -> 203 raise (Pool_error (Connection_timeout { endpoint; timeout })) 204 | None -> 205 raise 206 (Pool_error 207 (Connection_failed 208 { endpoint; attempts = attempt; last_error = error_msg })) 209 else begin 210 (* Retry with exponential backoff *) 211 let delay = 212 Config.connect_retry_delay pool.config 213 *. (2.0 ** float_of_int (attempt - 1)) 214 in 215 Eio.Time.sleep pool.clock delay; 216 create_connection_with_retry pool endpoint (attempt + 1) error_msg 217 end 218 | e -> 219 (* Other errors - retry with backoff *) 220 let error_msg = Printexc.to_string e in 221 Log.warn (fun m -> 222 m "Connection attempt %d to %a failed: %s" attempt Endpoint.pp 223 endpoint error_msg); 224 if attempt < Config.connect_retry_count pool.config then ( 225 let delay = 226 Config.connect_retry_delay pool.config 227 *. (2.0 ** float_of_int (attempt - 1)) 228 in 229 Eio.Time.sleep pool.clock delay; 230 create_connection_with_retry pool endpoint (attempt + 1) error_msg) 231 else 232 raise 233 (Pool_error 234 (Connection_failed 235 { endpoint; attempts = attempt; last_error = error_msg })) 236 237let create_connection (pool : ('clock, 'net) internal) endpoint = 238 create_connection_with_retry pool endpoint 1 "No attempts made" 239 240(** {1 Connection Validation} *) 241 242let is_healthy (pool : ('clock, 'net) internal) ?(check_readable = false) conn = 243 let now = get_time pool in 244 245 (* Check age *) 246 let age = now -. Connection.created_at conn in 247 let max_lifetime = Config.max_connection_lifetime pool.config in 248 if age > max_lifetime then begin 249 Log.debug (fun m -> 250 m "Connection to %a unhealthy: exceeded max lifetime (%.2fs > %.2fs)" 251 Endpoint.pp (Connection.endpoint conn) age max_lifetime); 252 false 253 end 254 (* Check idle time *) 255 else begin 256 let max_idle = Config.max_idle_time pool.config in 257 if now -. Connection.last_used conn > max_idle then begin 258 let idle_time = now -. Connection.last_used conn in 259 Log.debug (fun m -> 260 m "Connection to %a unhealthy: exceeded max idle time (%.2fs > %.2fs)" 261 Endpoint.pp (Connection.endpoint conn) idle_time max_idle); 262 false 263 end (* Check use count *) 264 else if 265 match Config.max_connection_uses pool.config with 266 | Some max -> Connection.use_count conn >= max 267 | None -> false 268 then begin 269 Log.debug (fun m -> 270 m "Connection to %a unhealthy: exceeded max use count (%d)" 271 Endpoint.pp (Connection.endpoint conn) 272 (Connection.use_count conn)); 273 false 274 end (* Optional: custom health check *) 275 else if 276 match Config.health_check pool.config with 277 | Some check -> ( 278 try 279 let healthy = check (Connection.flow conn) in 280 if not healthy then 281 Log.debug (fun m -> 282 m "Connection to %a failed custom health check" Endpoint.pp 283 (Connection.endpoint conn)); 284 not healthy 285 with e -> 286 Log.debug (fun m -> 287 m "Connection to %a health check raised exception: %s" 288 Endpoint.pp (Connection.endpoint conn) (Printexc.to_string e)); 289 true (* Exception in health check = unhealthy *)) 290 | None -> false 291 then false (* Optional: check if socket still connected *) 292 else if check_readable then 293 try 294 (* TODO avsm: a sockopt for this? *) 295 true 296 with _ -> false 297 else begin 298 Log.debug (fun m -> 299 m "Connection to %a is healthy (age=%.2fs, idle=%.2fs, uses=%d)" 300 Endpoint.pp (Connection.endpoint conn) age 301 (now -. Connection.last_used conn) 302 (Connection.use_count conn)); 303 true 304 end 305 end 306 307(** {1 Internal Pool Operations} *) 308 309let close_internal (pool : ('clock, 'net) internal) conn = 310 Log.debug (fun m -> 311 m "Closing connection to %a (age=%.2fs, uses=%d)" Endpoint.pp 312 (Connection.endpoint conn) 313 (get_time pool -. Connection.created_at conn) 314 (Connection.use_count conn)); 315 316 Eio.Cancel.protect (fun () -> 317 try Eio.Flow.close (Connection.flow conn) with _ -> ()); 318 319 (* Call hook if configured *) 320 Option.iter 321 (fun f -> f (Connection.endpoint conn)) 322 (Config.on_connection_closed pool.config) 323 324let get_or_create_endpoint_pool (pool : ('clock, 'net) internal) endpoint = 325 Log.debug (fun m -> 326 m "Getting or creating endpoint pool for %a" Endpoint.pp endpoint); 327 328 (* First try with read lock *) 329 match 330 Eio.Mutex.use_ro pool.endpoints_mutex (fun () -> 331 Hashtbl.find_opt pool.endpoints endpoint) 332 with 333 | Some ep_pool -> 334 Log.debug (fun m -> 335 m "Found existing endpoint pool for %a" Endpoint.pp endpoint); 336 ep_pool 337 | None -> 338 Log.debug (fun m -> 339 m "No existing pool, need to create for %a" Endpoint.pp endpoint); 340 (* Need to create - use write lock *) 341 Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () -> 342 (* Check again in case another fiber created it *) 343 match Hashtbl.find_opt pool.endpoints endpoint with 344 | Some ep_pool -> 345 Log.debug (fun m -> 346 m "Another fiber created pool for %a" Endpoint.pp endpoint); 347 ep_pool 348 | None -> 349 (* Create new endpoint pool *) 350 let stats = create_endp_stats () in 351 let mutex = Eio.Mutex.create () in 352 353 Log.info (fun m -> 354 m "Creating new endpoint pool for %a (max_connections=%d)" 355 Endpoint.pp endpoint 356 (Config.max_connections_per_endpoint pool.config)); 357 358 Log.debug (fun m -> 359 m "About to create Eio.Pool for %a" Endpoint.pp endpoint); 360 361 let eio_pool = 362 Eio.Pool.create 363 (Config.max_connections_per_endpoint pool.config) 364 ~validate:(fun conn -> 365 Log.debug (fun m -> 366 m "Validate called for connection to %a" Endpoint.pp 367 endpoint); 368 (* Called before reusing from pool *) 369 let healthy = is_healthy pool ~check_readable:false conn in 370 371 if healthy then ( 372 Log.debug (fun m -> 373 m "Reusing connection to %a from pool" Endpoint.pp 374 endpoint); 375 376 (* Update stats for reuse *) 377 Eio.Mutex.use_rw ~protect:true mutex (fun () -> 378 stats.total_reused <- stats.total_reused + 1); 379 380 (* Call hook if configured *) 381 Option.iter 382 (fun f -> f endpoint) 383 (Config.on_connection_reused pool.config); 384 385 (* Run health check if configured *) 386 match Config.health_check pool.config with 387 | Some check -> ( 388 try check (Connection.flow conn) with _ -> false) 389 | None -> true) 390 else begin 391 Log.debug (fun m -> 392 m 393 "Connection to %a failed validation, creating new \ 394 one" 395 Endpoint.pp endpoint); 396 false 397 end) 398 ~dispose:(fun conn -> 399 (* Called when removing from pool *) 400 Eio.Cancel.protect (fun () -> 401 close_internal pool conn; 402 403 (* Update stats *) 404 Eio.Mutex.use_rw ~protect:true mutex (fun () -> 405 stats.total_closed <- stats.total_closed + 1))) 406 (fun () -> 407 Log.debug (fun m -> 408 m "Factory function called for %a" Endpoint.pp endpoint); 409 try 410 let conn = create_connection pool endpoint in 411 412 Log.debug (fun m -> 413 m "Connection created successfully for %a" Endpoint.pp 414 endpoint); 415 416 (* Update stats *) 417 Eio.Mutex.use_rw ~protect:true mutex (fun () -> 418 stats.total_created <- stats.total_created + 1); 419 420 (* Call hook if configured *) 421 Option.iter 422 (fun f -> f endpoint) 423 (Config.on_connection_created pool.config); 424 425 conn 426 with e -> 427 Log.err (fun m -> 428 m "Factory function failed for %a: %s" Endpoint.pp 429 endpoint (Printexc.to_string e)); 430 (* Update error stats *) 431 Eio.Mutex.use_rw ~protect:true mutex (fun () -> 432 stats.errors <- stats.errors + 1); 433 raise e) 434 in 435 436 Log.debug (fun m -> 437 m "Eio.Pool created successfully for %a" Endpoint.pp endpoint); 438 439 let ep_pool = { pool = eio_pool; stats; mutex } in 440 441 Hashtbl.add pool.endpoints endpoint ep_pool; 442 Log.debug (fun m -> 443 m "Endpoint pool added to hashtable for %a" Endpoint.pp 444 endpoint); 445 ep_pool) 446 447(** {1 Public API - Pool Creation} *) 448 449let create ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock) ?tls 450 ?(config = Config.default) () : t = 451 Log.info (fun m -> 452 m 453 "Creating new connection pool (max_per_endpoint=%d, max_idle=%.1fs, \ 454 max_lifetime=%.1fs)" 455 (Config.max_connections_per_endpoint config) 456 (Config.max_idle_time config) 457 (Config.max_connection_lifetime config)); 458 459 let pool = 460 { 461 sw; 462 net; 463 clock; 464 config; 465 tls; 466 endpoints = Hashtbl.create 16; 467 endpoints_mutex = Eio.Mutex.create (); 468 } 469 in 470 471 (* Auto-cleanup on switch release *) 472 Eio.Switch.on_release sw (fun () -> 473 Eio.Cancel.protect (fun () -> 474 Log.info (fun m -> m "Closing connection pool"); 475 (* Close all idle connections - active ones will be cleaned up by switch *) 476 Hashtbl.iter 477 (fun _endpoint _ep_pool -> 478 (* Connections are bound to the switch and will be auto-closed *) 479 ()) 480 pool.endpoints; 481 482 Hashtbl.clear pool.endpoints)); 483 484 T pool 485 486(** {1 Public API - Connection Management} *) 487 488let connection_internal ~sw (T pool) endpoint = 489 Log.debug (fun m -> m "Acquiring connection to %a" Endpoint.pp endpoint); 490 let ep_pool = get_or_create_endpoint_pool pool endpoint in 491 492 (* Create promises for connection handoff and cleanup signal *) 493 let conn_promise, conn_resolver = Eio.Promise.create () in 494 let done_promise, done_resolver = Eio.Promise.create () in 495 496 (* Increment active count *) 497 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 498 ep_pool.stats.active <- ep_pool.stats.active + 1); 499 500 (* Fork a daemon fiber to manage the connection lifecycle *) 501 Eio.Fiber.fork_daemon ~sw (fun () -> 502 Fun.protect 503 ~finally:(fun () -> 504 (* Decrement active count *) 505 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 506 ep_pool.stats.active <- ep_pool.stats.active - 1); 507 Log.debug (fun m -> m "Released connection to %a" Endpoint.pp endpoint)) 508 (fun () -> 509 (* Use Eio.Pool for resource management *) 510 Eio.Pool.use ep_pool.pool (fun conn -> 511 Log.debug (fun m -> 512 m "Using connection to %a (uses=%d)" Endpoint.pp endpoint 513 (Connection.use_count conn)); 514 515 (* Update last used time and use count *) 516 Connection.update_usage conn ~now:(get_time pool); 517 518 (* Update idle stats (connection taken from idle pool) *) 519 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 520 ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1)); 521 522 (* Hand off connection to caller *) 523 Eio.Promise.resolve conn_resolver conn.flow; 524 525 try 526 (* Wait for switch to signal cleanup *) 527 Eio.Promise.await done_promise; 528 529 (* Success - connection will be returned to pool by Eio.Pool *) 530 (* Update idle stats (connection returned to idle pool) *) 531 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 532 ep_pool.stats.idle <- ep_pool.stats.idle + 1); 533 534 `Stop_daemon 535 with e -> 536 (* Error - close connection so it won't be reused *) 537 Log.warn (fun m -> 538 m "Error with connection to %a: %s" Endpoint.pp endpoint 539 (Printexc.to_string e)); 540 close_internal pool conn; 541 542 (* Update error stats *) 543 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 544 ep_pool.stats.errors <- ep_pool.stats.errors + 1); 545 546 raise e))); 547 548 (* Signal cleanup when switch ends *) 549 Eio.Switch.on_release sw (fun () -> 550 Eio.Promise.resolve done_resolver ()); 551 552 (* Return the connection *) 553 Eio.Promise.await conn_promise 554 555let connection ~sw t endpoint = connection_internal ~sw t endpoint 556 557let with_connection t endpoint f = 558 Eio.Switch.run (fun sw -> f (connection ~sw t endpoint)) 559 560let with_connection_exn t endpoint f = 561 try with_connection t endpoint f with Pool_error e -> raise (err e) 562 563(** {1 Public API - Statistics} *) 564 565let stats (T pool) endpoint = 566 match Hashtbl.find_opt pool.endpoints endpoint with 567 | Some ep_pool -> 568 Eio.Mutex.use_ro ep_pool.mutex (fun () -> snapshot_stats ep_pool.stats) 569 | None -> 570 (* No pool for this endpoint yet *) 571 Stats.make ~active:0 ~idle:0 ~total_created:0 ~total_reused:0 572 ~total_closed:0 ~errors:0 573 574let all_stats (T pool) = 575 Eio.Mutex.use_ro pool.endpoints_mutex (fun () -> 576 Hashtbl.fold 577 (fun endpoint ep_pool acc -> 578 let stats = 579 Eio.Mutex.use_ro ep_pool.mutex (fun () -> 580 snapshot_stats ep_pool.stats) 581 in 582 (endpoint, stats) :: acc) 583 pool.endpoints []) 584 585(** {1 Public API - Pool Management} *) 586 587let clear_endpoint (T pool) endpoint = 588 Log.info (fun m -> m "Clearing endpoint %a from pool" Endpoint.pp endpoint); 589 match Hashtbl.find_opt pool.endpoints endpoint with 590 | Some _ep_pool -> 591 Eio.Cancel.protect (fun () -> 592 (* Remove endpoint pool from hashtable *) 593 (* Idle connections will be discarded *) 594 (* Active connections will be closed when returned *) 595 Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () -> 596 Hashtbl.remove pool.endpoints endpoint)) 597 | None -> 598 Log.debug (fun m -> 599 m "No endpoint pool found for %a" Endpoint.pp endpoint)