My agentic slop goes here. Not intended for anyone else!
at jsont 22 kB view raw
1(** Conpool - Protocol-agnostic TCP/IP connection pooling library for Eio *) 2 3let src = Logs.Src.create "conpool" ~doc:"Connection pooling library" 4module Log = (val Logs.src_log src : Logs.LOG) 5 6module Endpoint = struct 7 type t = { 8 host : string; 9 port : int; 10 } 11 12 let make ~host ~port = { host; port } 13 14 let host t = t.host 15 let port t = t.port 16 17 let pp fmt t = 18 Format.fprintf fmt "%s:%d" t.host t.port 19 20 let equal t1 t2 = 21 String.equal t1.host t2.host && t1.port = t2.port 22 23 let hash t = 24 Hashtbl.hash (t.host, t.port) 25end 26 27module Tls_config = struct 28 type t = { 29 config : Tls.Config.client; 30 servername : string option; 31 } 32 33 let make ~config ?servername () = { config; servername } 34 35 let config t = t.config 36 let servername t = t.servername 37 38 let pp fmt t = 39 Format.fprintf fmt "TLS(servername=%s)" 40 (match t.servername with Some s -> s | None -> "<default>") 41end 42 43module Connection = struct 44 type t = { 45 flow : [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t; 46 created_at : float; 47 mutable last_used : float; 48 mutable use_count : int; 49 endpoint : Endpoint.t; 50 } 51 52 let flow t = t.flow 53 let endpoint t = t.endpoint 54 let created_at t = t.created_at 55 let last_used t = t.last_used 56 let use_count t = t.use_count 57 58 let pp fmt t = 59 Format.fprintf fmt "Connection(endpoint=%a, created=%.2f, last_used=%.2f, uses=%d)" 60 Endpoint.pp t.endpoint 61 t.created_at 62 t.last_used 63 t.use_count 64end 65 66module Config = struct 67 type t = { 68 max_connections_per_endpoint : int; 69 max_idle_time : float; 70 max_connection_lifetime : float; 71 max_connection_uses : int option; 72 health_check : ([`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t -> bool) option; 73 connect_timeout : float option; 74 connect_retry_count : int; 75 connect_retry_delay : float; 76 on_connection_created : (Endpoint.t -> unit) option; 77 on_connection_closed : (Endpoint.t -> unit) option; 78 on_connection_reused : (Endpoint.t -> unit) option; 79 } 80 81 let make 82 ?(max_connections_per_endpoint = 10) 83 ?(max_idle_time = 60.0) 84 ?(max_connection_lifetime = 300.0) 85 ?max_connection_uses 86 ?health_check 87 ?(connect_timeout = 10.0) 88 ?(connect_retry_count = 3) 89 ?(connect_retry_delay = 0.1) 90 ?on_connection_created 91 ?on_connection_closed 92 ?on_connection_reused 93 () = 94 { 95 max_connections_per_endpoint; 96 max_idle_time; 97 max_connection_lifetime; 98 max_connection_uses; 99 health_check; 100 connect_timeout = Some connect_timeout; 101 connect_retry_count; 102 connect_retry_delay; 103 on_connection_created; 104 on_connection_closed; 105 on_connection_reused; 106 } 107 108 let default = make () 109 110 let max_connections_per_endpoint t = t.max_connections_per_endpoint 111 let max_idle_time t = t.max_idle_time 112 let max_connection_lifetime t = t.max_connection_lifetime 113 let max_connection_uses t = t.max_connection_uses 114 let health_check t = t.health_check 115 let connect_timeout t = t.connect_timeout 116 let connect_retry_count t = t.connect_retry_count 117 let connect_retry_delay t = t.connect_retry_delay 118 119 let pp fmt t = 120 Format.fprintf fmt 121 "@[<v>Config:@,\ 122 - max_connections_per_endpoint: %d@,\ 123 - max_idle_time: %.1fs@,\ 124 - max_connection_lifetime: %.1fs@,\ 125 - max_connection_uses: %s@,\ 126 - connect_timeout: %s@,\ 127 - connect_retry_count: %d@,\ 128 - connect_retry_delay: %.2fs@]" 129 t.max_connections_per_endpoint 130 t.max_idle_time 131 t.max_connection_lifetime 132 (match t.max_connection_uses with Some n -> string_of_int n | None -> "unlimited") 133 (match t.connect_timeout with Some f -> Printf.sprintf "%.1fs" f | None -> "none") 134 t.connect_retry_count 135 t.connect_retry_delay 136end 137 138module Stats = struct 139 type t = { 140 active : int; 141 idle : int; 142 total_created : int; 143 total_reused : int; 144 total_closed : int; 145 errors : int; 146 } 147 148 let active t = t.active 149 let idle t = t.idle 150 let total_created t = t.total_created 151 let total_reused t = t.total_reused 152 let total_closed t = t.total_closed 153 let errors t = t.errors 154 155 let pp fmt t = 156 Format.fprintf fmt 157 "@[<v>Stats:@,\ 158 - Active: %d@,\ 159 - Idle: %d@,\ 160 - Created: %d@,\ 161 - Reused: %d@,\ 162 - Closed: %d@,\ 163 - Errors: %d@]" 164 t.active 165 t.idle 166 t.total_created 167 t.total_reused 168 t.total_closed 169 t.errors 170end 171 172type endp_stats = { 173 mutable active : int; 174 mutable idle : int; 175 mutable total_created : int; 176 mutable total_reused : int; 177 mutable total_closed : int; 178 mutable errors : int; 179} 180 181type endpoint_pool = { 182 pool : Connection.t Eio.Pool.t; 183 stats : endp_stats; 184 mutex : Eio.Mutex.t; 185} 186 187type ('clock, 'net) t = { 188 sw : Eio.Switch.t; 189 net : 'net; 190 clock : 'clock; 191 config : Config.t; 192 tls : Tls_config.t option; 193 endpoints : (Endpoint.t, endpoint_pool) Hashtbl.t; 194 endpoints_mutex : Eio.Mutex.t; 195} 196 197module EndpointTbl = Hashtbl.Make(struct 198 type t = Endpoint.t 199 let equal = Endpoint.equal 200 let hash = Endpoint.hash 201end) 202 203let get_time pool = 204 Eio.Time.now pool.clock 205 206let create_endp_stats () = { 207 active = 0; 208 idle = 0; 209 total_created = 0; 210 total_reused = 0; 211 total_closed = 0; 212 errors = 0; 213} 214 215let snapshot_stats (stats : endp_stats) : Stats.t = { 216 active = stats.active; 217 idle = stats.idle; 218 total_created = stats.total_created; 219 total_reused = stats.total_reused; 220 total_closed = stats.total_closed; 221 errors = stats.errors; 222} 223 224(** {1 DNS Resolution} *) 225 226let resolve_endpoint pool endpoint = 227 Log.debug (fun m -> m "Resolving %a..." Endpoint.pp endpoint); 228 let addrs = Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint) ~service:(string_of_int (Endpoint.port endpoint)) in 229 Log.debug (fun m -> m "Got address list for %a" Endpoint.pp endpoint); 230 match addrs with 231 | addr :: _ -> 232 Log.debug (fun m -> m "Resolved %a to %a" 233 Endpoint.pp endpoint Eio.Net.Sockaddr.pp addr); 234 addr 235 | [] -> 236 Log.err (fun m -> m "Failed to resolve hostname: %s" (Endpoint.host endpoint)); 237 failwith (Printf.sprintf "Failed to resolve hostname: %s" (Endpoint.host endpoint)) 238 239(** {1 Connection Creation with Retry} *) 240 241let rec create_connection_with_retry pool endpoint attempt = 242 if attempt > pool.config.connect_retry_count then begin 243 Log.err (fun m -> m "Failed to connect to %a after %d attempts" 244 Endpoint.pp endpoint pool.config.connect_retry_count); 245 failwith (Printf.sprintf "Failed to connect to %s:%d after %d attempts" 246 (Endpoint.host endpoint) (Endpoint.port endpoint) pool.config.connect_retry_count) 247 end; 248 249 Log.debug (fun m -> m "Connecting to %a (attempt %d/%d)" 250 Endpoint.pp endpoint attempt pool.config.connect_retry_count); 251 252 try 253 let addr = resolve_endpoint pool endpoint in 254 Log.debug (fun m -> m "Resolved %a to address" Endpoint.pp endpoint); 255 256 (* Connect with optional timeout *) 257 let socket = 258 match pool.config.connect_timeout with 259 | Some timeout -> 260 Eio.Time.with_timeout_exn pool.clock timeout 261 (fun () -> Eio.Net.connect ~sw:pool.sw pool.net addr) 262 | None -> 263 Eio.Net.connect ~sw:pool.sw pool.net addr 264 in 265 266 Log.debug (fun m -> m "TCP connection established to %a" Endpoint.pp endpoint); 267 268 let flow = match pool.tls with 269 | None -> (socket :> [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t) 270 | Some tls_cfg -> 271 Log.debug (fun m -> m "Initiating TLS handshake with %a" Endpoint.pp endpoint); 272 let host = match Tls_config.servername tls_cfg with 273 | Some name -> Domain_name.(host_exn (of_string_exn name)) 274 | None -> Domain_name.(host_exn (of_string_exn (Endpoint.host endpoint))) 275 in 276 let tls_flow = Tls_eio.client_of_flow ~host (Tls_config.config tls_cfg) socket in 277 Log.info (fun m -> m "TLS connection established to %a" Endpoint.pp endpoint); 278 (tls_flow :> [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t) 279 in 280 281 let now = get_time pool in 282 Log.info (fun m -> m "Connection created to %a" Endpoint.pp endpoint); 283 { 284 Connection.flow; 285 created_at = now; 286 last_used = now; 287 use_count = 0; 288 endpoint; 289 } 290 291 with 292 | Eio.Time.Timeout -> 293 Log.warn (fun m -> m "Connection timeout to %a (attempt %d)" Endpoint.pp endpoint attempt); 294 (* Exponential backoff *) 295 let delay = pool.config.connect_retry_delay *. (2.0 ** float_of_int (attempt - 1)) in 296 Eio.Time.sleep pool.clock delay; 297 create_connection_with_retry pool endpoint (attempt + 1) 298 | e -> 299 (* Other errors - retry with backoff *) 300 Log.warn (fun m -> m "Connection attempt %d to %a failed: %s" 301 attempt Endpoint.pp endpoint (Printexc.to_string e)); 302 if attempt < pool.config.connect_retry_count then ( 303 let delay = pool.config.connect_retry_delay *. (2.0 ** float_of_int (attempt - 1)) in 304 Eio.Time.sleep pool.clock delay; 305 create_connection_with_retry pool endpoint (attempt + 1) 306 ) else 307 raise e 308 309let create_connection pool endpoint = 310 create_connection_with_retry pool endpoint 1 311 312(** {1 Connection Validation} *) 313 314let is_healthy pool ?(check_readable = false) conn = 315 let now = get_time pool in 316 317 (* Check age *) 318 let age = now -. Connection.created_at conn in 319 if age > pool.config.max_connection_lifetime then begin 320 Log.debug (fun m -> m "Connection to %a unhealthy: exceeded max lifetime (%.2fs > %.2fs)" 321 Endpoint.pp (Connection.endpoint conn) age pool.config.max_connection_lifetime); 322 false 323 end 324 325 (* Check idle time *) 326 else if (now -. Connection.last_used conn) > pool.config.max_idle_time then begin 327 let idle_time = now -. Connection.last_used conn in 328 Log.debug (fun m -> m "Connection to %a unhealthy: exceeded max idle time (%.2fs > %.2fs)" 329 Endpoint.pp (Connection.endpoint conn) idle_time pool.config.max_idle_time); 330 false 331 end 332 333 (* Check use count *) 334 else if (match pool.config.max_connection_uses with 335 | Some max -> Connection.use_count conn >= max 336 | None -> false) then begin 337 Log.debug (fun m -> m "Connection to %a unhealthy: exceeded max use count (%d)" 338 Endpoint.pp (Connection.endpoint conn) (Connection.use_count conn)); 339 false 340 end 341 342 (* Optional: custom health check *) 343 else if (match pool.config.health_check with 344 | Some check -> 345 (try 346 let healthy = check (Connection.flow conn) in 347 if not healthy then 348 Log.debug (fun m -> m "Connection to %a failed custom health check" 349 Endpoint.pp (Connection.endpoint conn)); 350 not healthy 351 with e -> 352 Log.debug (fun m -> m "Connection to %a health check raised exception: %s" 353 Endpoint.pp (Connection.endpoint conn) (Printexc.to_string e)); 354 true) (* Exception in health check = unhealthy *) 355 | None -> false) then 356 false 357 358 (* Optional: check if socket still connected *) 359 else if check_readable then 360 try 361 (* TODO avsm: a sockopt for this? *) 362 true 363 with 364 | _ -> false 365 366 else begin 367 Log.debug (fun m -> m "Connection to %a is healthy (age=%.2fs, idle=%.2fs, uses=%d)" 368 Endpoint.pp (Connection.endpoint conn) 369 age 370 (now -. Connection.last_used conn) 371 (Connection.use_count conn)); 372 true 373 end 374 375(** {1 Internal Pool Operations} *) 376 377let close_internal pool conn = 378 Log.debug (fun m -> m "Closing connection to %a (age=%.2fs, uses=%d)" 379 Endpoint.pp (Connection.endpoint conn) 380 (get_time pool -. Connection.created_at conn) 381 (Connection.use_count conn)); 382 383 Eio.Cancel.protect (fun () -> 384 try 385 Eio.Flow.close (Connection.flow conn) 386 with _ -> () 387 ); 388 389 (* Call hook if configured *) 390 Option.iter (fun f -> f (Connection.endpoint conn)) pool.config.on_connection_closed 391 392let get_or_create_endpoint_pool pool endpoint = 393 Log.debug (fun m -> m "Getting or creating endpoint pool for %a" Endpoint.pp endpoint); 394 395 (* First try with read lock *) 396 match Eio.Mutex.use_ro pool.endpoints_mutex (fun () -> 397 Hashtbl.find_opt pool.endpoints endpoint 398 ) with 399 | Some ep_pool -> 400 Log.debug (fun m -> m "Found existing endpoint pool for %a" Endpoint.pp endpoint); 401 ep_pool 402 | None -> 403 Log.debug (fun m -> m "No existing pool, need to create for %a" Endpoint.pp endpoint); 404 (* Need to create - use write lock *) 405 Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () -> 406 (* Check again in case another fiber created it *) 407 match Hashtbl.find_opt pool.endpoints endpoint with 408 | Some ep_pool -> 409 Log.debug (fun m -> m "Another fiber created pool for %a" Endpoint.pp endpoint); 410 ep_pool 411 | None -> 412 (* Create new endpoint pool *) 413 let stats = create_endp_stats () in 414 let mutex = Eio.Mutex.create () in 415 416 Log.info (fun m -> m "Creating new endpoint pool for %a (max_connections=%d)" 417 Endpoint.pp endpoint pool.config.max_connections_per_endpoint); 418 419 Log.debug (fun m -> m "About to create Eio.Pool for %a" Endpoint.pp endpoint); 420 421 let eio_pool = Eio.Pool.create 422 pool.config.max_connections_per_endpoint 423 ~validate:(fun conn -> 424 Log.debug (fun m -> m "Validate called for connection to %a" Endpoint.pp endpoint); 425 (* Called before reusing from pool *) 426 let healthy = is_healthy pool ~check_readable:false conn in 427 428 if healthy then ( 429 Log.debug (fun m -> m "Reusing connection to %a from pool" Endpoint.pp endpoint); 430 431 (* Update stats for reuse *) 432 Eio.Mutex.use_rw ~protect:true mutex (fun () -> 433 stats.total_reused <- stats.total_reused + 1 434 ); 435 436 (* Call hook if configured *) 437 Option.iter (fun f -> f endpoint) pool.config.on_connection_reused; 438 439 (* Run health check if configured *) 440 match pool.config.health_check with 441 | Some check -> 442 (try check (Connection.flow conn) 443 with _ -> false) 444 | None -> true 445 ) else begin 446 Log.debug (fun m -> m "Connection to %a failed validation, creating new one" Endpoint.pp endpoint); 447 false 448 end 449 ) 450 ~dispose:(fun conn -> 451 (* Called when removing from pool *) 452 Eio.Cancel.protect (fun () -> 453 close_internal pool conn; 454 455 (* Update stats *) 456 Eio.Mutex.use_rw ~protect:true mutex (fun () -> 457 stats.total_closed <- stats.total_closed + 1 458 ) 459 ) 460 ) 461 (fun () -> 462 Log.debug (fun m -> m "Factory function called for %a" Endpoint.pp endpoint); 463 try 464 let conn = create_connection pool endpoint in 465 466 Log.debug (fun m -> m "Connection created successfully for %a" Endpoint.pp endpoint); 467 468 (* Update stats *) 469 Eio.Mutex.use_rw ~protect:true mutex (fun () -> 470 stats.total_created <- stats.total_created + 1 471 ); 472 473 (* Call hook if configured *) 474 Option.iter (fun f -> f endpoint) pool.config.on_connection_created; 475 476 conn 477 with e -> 478 Log.err (fun m -> m "Factory function failed for %a: %s" 479 Endpoint.pp endpoint (Printexc.to_string e)); 480 (* Update error stats *) 481 Eio.Mutex.use_rw ~protect:true mutex (fun () -> 482 stats.errors <- stats.errors + 1 483 ); 484 raise e 485 ) 486 in 487 488 Log.debug (fun m -> m "Eio.Pool created successfully for %a" Endpoint.pp endpoint); 489 490 let ep_pool = { 491 pool = eio_pool; 492 stats; 493 mutex; 494 } in 495 496 Hashtbl.add pool.endpoints endpoint ep_pool; 497 Log.debug (fun m -> m "Endpoint pool added to hashtable for %a" Endpoint.pp endpoint); 498 ep_pool 499 ) 500 501(** {1 Public API - Pool Creation} *) 502 503let create ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock) ?tls ?(config = Config.default) () : ('clock Eio.Time.clock, 'net Eio.Net.t) t = 504 Log.info (fun m -> m "Creating new connection pool (max_per_endpoint=%d, max_idle=%.1fs, max_lifetime=%.1fs)" 505 config.max_connections_per_endpoint 506 config.max_idle_time 507 config.max_connection_lifetime); 508 509 let pool = { 510 sw; 511 net; 512 clock; 513 config; 514 tls; 515 endpoints = Hashtbl.create 16; 516 endpoints_mutex = Eio.Mutex.create (); 517 } in 518 519 (* Auto-cleanup on switch release *) 520 Eio.Switch.on_release sw (fun () -> 521 Eio.Cancel.protect (fun () -> 522 Log.info (fun m -> m "Closing connection pool"); 523 (* Close all idle connections - active ones will be cleaned up by switch *) 524 Hashtbl.iter (fun _endpoint _ep_pool -> 525 (* Connections are bound to the switch and will be auto-closed *) 526 () 527 ) pool.endpoints; 528 529 Hashtbl.clear pool.endpoints 530 ) 531 ); 532 533 pool 534 535(** {1 Public API - Connection Management} *) 536 537let with_connection (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) endpoint f = 538 Log.debug (fun m -> m "Acquiring connection to %a" Endpoint.pp endpoint); 539 let ep_pool = get_or_create_endpoint_pool pool endpoint in 540 541 (* Increment active count *) 542 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 543 ep_pool.stats.active <- ep_pool.stats.active + 1 544 ); 545 546 Fun.protect 547 ~finally:(fun () -> 548 (* Decrement active count *) 549 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 550 ep_pool.stats.active <- ep_pool.stats.active - 1 551 ); 552 Log.debug (fun m -> m "Released connection to %a" Endpoint.pp endpoint) 553 ) 554 (fun () -> 555 (* Use Eio.Pool for resource management *) 556 Eio.Pool.use ep_pool.pool (fun conn -> 557 Log.debug (fun m -> m "Using connection to %a (uses=%d)" 558 Endpoint.pp endpoint (Connection.use_count conn)); 559 560 (* Update last used time and use count *) 561 conn.last_used <- get_time pool; 562 conn.use_count <- conn.use_count + 1; 563 564 (* Update idle stats (connection taken from idle pool) *) 565 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 566 ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1) 567 ); 568 569 try 570 let result = f conn.flow in 571 572 (* Success - connection will be returned to pool by Eio.Pool *) 573 (* Update idle stats (connection returned to idle pool) *) 574 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 575 ep_pool.stats.idle <- ep_pool.stats.idle + 1 576 ); 577 578 result 579 with e -> 580 (* Error - close connection so it won't be reused *) 581 Log.warn (fun m -> m "Error using connection to %a: %s" 582 Endpoint.pp endpoint (Printexc.to_string e)); 583 close_internal pool conn; 584 585 (* Update error stats *) 586 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 587 ep_pool.stats.errors <- ep_pool.stats.errors + 1 588 ); 589 590 raise e 591 ) 592 ) 593 594let acquire ~sw (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) endpoint = 595 (* Create a connection bound to the provided switch *) 596 let conn = create_connection pool endpoint in 597 598 (* Register cleanup on switch *) 599 Eio.Switch.on_release sw (fun () -> 600 close_internal pool conn 601 ); 602 603 conn 604 605let release (_pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) _conn = 606 failwith "release: manual connection management not yet implemented - use with_connection instead" 607 608let close (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) conn = 609 close_internal pool conn 610 611let validate_and_release (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) conn = 612 if is_healthy pool conn then 613 release pool conn 614 else 615 close pool conn 616 617(** {1 Public API - Statistics} *) 618 619let stats (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) endpoint = 620 match Hashtbl.find_opt pool.endpoints endpoint with 621 | Some ep_pool -> 622 Eio.Mutex.use_ro ep_pool.mutex (fun () -> 623 snapshot_stats ep_pool.stats 624 ) 625 | None -> 626 (* No pool for this endpoint yet *) 627 { 628 Stats.active = 0; 629 idle = 0; 630 total_created = 0; 631 total_reused = 0; 632 total_closed = 0; 633 errors = 0; 634 } 635 636let all_stats (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) = 637 Eio.Mutex.use_ro pool.endpoints_mutex (fun () -> 638 Hashtbl.fold (fun endpoint ep_pool acc -> 639 let stats = Eio.Mutex.use_ro ep_pool.mutex (fun () -> 640 snapshot_stats ep_pool.stats 641 ) in 642 (endpoint, stats) :: acc 643 ) pool.endpoints [] 644 ) 645 646(** {1 Public API - Pool Management} *) 647 648let close_all_connections (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) endpoint = 649 match Hashtbl.find_opt pool.endpoints endpoint with 650 | Some _ep_pool -> 651 Eio.Cancel.protect (fun () -> 652 (* Connections will be closed by switch cleanup *) 653 (* Just remove from hashtable *) 654 Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () -> 655 Hashtbl.remove pool.endpoints endpoint 656 ) 657 ) 658 | None -> 659 () (* No connections to close *) 660 661let close_pool (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) = 662 Eio.Cancel.protect (fun () -> 663 (* Connections will be closed by switch cleanup *) 664 Hashtbl.clear pool.endpoints 665 )