My agentic slop goes here. Not intended for anyone else!
1(** Conpool - Protocol-agnostic TCP/IP connection pooling library for Eio *)
2
3let src = Logs.Src.create "conpool" ~doc:"Connection pooling library"
4module Log = (val Logs.src_log src : Logs.LOG)
5
6module Endpoint = struct
7 type t = {
8 host : string;
9 port : int;
10 }
11
12 let make ~host ~port = { host; port }
13
14 let host t = t.host
15 let port t = t.port
16
17 let pp fmt t =
18 Format.fprintf fmt "%s:%d" t.host t.port
19
20 let equal t1 t2 =
21 String.equal t1.host t2.host && t1.port = t2.port
22
23 let hash t =
24 Hashtbl.hash (t.host, t.port)
25end
26
27module Tls_config = struct
28 type t = {
29 config : Tls.Config.client;
30 servername : string option;
31 }
32
33 let make ~config ?servername () = { config; servername }
34
35 let config t = t.config
36 let servername t = t.servername
37
38 let pp fmt t =
39 Format.fprintf fmt "TLS(servername=%s)"
40 (match t.servername with Some s -> s | None -> "<default>")
41end
42
43module Connection = struct
44 type t = {
45 flow : [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t;
46 created_at : float;
47 mutable last_used : float;
48 mutable use_count : int;
49 endpoint : Endpoint.t;
50 }
51
52 let flow t = t.flow
53 let endpoint t = t.endpoint
54 let created_at t = t.created_at
55 let last_used t = t.last_used
56 let use_count t = t.use_count
57
58 let pp fmt t =
59 Format.fprintf fmt "Connection(endpoint=%a, created=%.2f, last_used=%.2f, uses=%d)"
60 Endpoint.pp t.endpoint
61 t.created_at
62 t.last_used
63 t.use_count
64end
65
66module Config = struct
67 type t = {
68 max_connections_per_endpoint : int;
69 max_idle_time : float;
70 max_connection_lifetime : float;
71 max_connection_uses : int option;
72 health_check : ([`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t -> bool) option;
73 connect_timeout : float option;
74 connect_retry_count : int;
75 connect_retry_delay : float;
76 on_connection_created : (Endpoint.t -> unit) option;
77 on_connection_closed : (Endpoint.t -> unit) option;
78 on_connection_reused : (Endpoint.t -> unit) option;
79 }
80
81 let make
82 ?(max_connections_per_endpoint = 10)
83 ?(max_idle_time = 60.0)
84 ?(max_connection_lifetime = 300.0)
85 ?max_connection_uses
86 ?health_check
87 ?(connect_timeout = 10.0)
88 ?(connect_retry_count = 3)
89 ?(connect_retry_delay = 0.1)
90 ?on_connection_created
91 ?on_connection_closed
92 ?on_connection_reused
93 () =
94 {
95 max_connections_per_endpoint;
96 max_idle_time;
97 max_connection_lifetime;
98 max_connection_uses;
99 health_check;
100 connect_timeout = Some connect_timeout;
101 connect_retry_count;
102 connect_retry_delay;
103 on_connection_created;
104 on_connection_closed;
105 on_connection_reused;
106 }
107
108 let default = make ()
109
110 let max_connections_per_endpoint t = t.max_connections_per_endpoint
111 let max_idle_time t = t.max_idle_time
112 let max_connection_lifetime t = t.max_connection_lifetime
113 let max_connection_uses t = t.max_connection_uses
114 let health_check t = t.health_check
115 let connect_timeout t = t.connect_timeout
116 let connect_retry_count t = t.connect_retry_count
117 let connect_retry_delay t = t.connect_retry_delay
118
119 let pp fmt t =
120 Format.fprintf fmt
121 "@[<v>Config:@,\
122 - max_connections_per_endpoint: %d@,\
123 - max_idle_time: %.1fs@,\
124 - max_connection_lifetime: %.1fs@,\
125 - max_connection_uses: %s@,\
126 - connect_timeout: %s@,\
127 - connect_retry_count: %d@,\
128 - connect_retry_delay: %.2fs@]"
129 t.max_connections_per_endpoint
130 t.max_idle_time
131 t.max_connection_lifetime
132 (match t.max_connection_uses with Some n -> string_of_int n | None -> "unlimited")
133 (match t.connect_timeout with Some f -> Printf.sprintf "%.1fs" f | None -> "none")
134 t.connect_retry_count
135 t.connect_retry_delay
136end
137
138module Stats = struct
139 type t = {
140 active : int;
141 idle : int;
142 total_created : int;
143 total_reused : int;
144 total_closed : int;
145 errors : int;
146 }
147
148 let active t = t.active
149 let idle t = t.idle
150 let total_created t = t.total_created
151 let total_reused t = t.total_reused
152 let total_closed t = t.total_closed
153 let errors t = t.errors
154
155 let pp fmt t =
156 Format.fprintf fmt
157 "@[<v>Stats:@,\
158 - Active: %d@,\
159 - Idle: %d@,\
160 - Created: %d@,\
161 - Reused: %d@,\
162 - Closed: %d@,\
163 - Errors: %d@]"
164 t.active
165 t.idle
166 t.total_created
167 t.total_reused
168 t.total_closed
169 t.errors
170end
171
172type endp_stats = {
173 mutable active : int;
174 mutable idle : int;
175 mutable total_created : int;
176 mutable total_reused : int;
177 mutable total_closed : int;
178 mutable errors : int;
179}
180
181type endpoint_pool = {
182 pool : Connection.t Eio.Pool.t;
183 stats : endp_stats;
184 mutex : Eio.Mutex.t;
185}
186
187type ('clock, 'net) t = {
188 sw : Eio.Switch.t;
189 net : 'net;
190 clock : 'clock;
191 config : Config.t;
192 tls : Tls_config.t option;
193 endpoints : (Endpoint.t, endpoint_pool) Hashtbl.t;
194 endpoints_mutex : Eio.Mutex.t;
195}
196
197module EndpointTbl = Hashtbl.Make(struct
198 type t = Endpoint.t
199 let equal = Endpoint.equal
200 let hash = Endpoint.hash
201end)
202
203let get_time pool =
204 Eio.Time.now pool.clock
205
206let create_endp_stats () = {
207 active = 0;
208 idle = 0;
209 total_created = 0;
210 total_reused = 0;
211 total_closed = 0;
212 errors = 0;
213}
214
215let snapshot_stats (stats : endp_stats) : Stats.t = {
216 active = stats.active;
217 idle = stats.idle;
218 total_created = stats.total_created;
219 total_reused = stats.total_reused;
220 total_closed = stats.total_closed;
221 errors = stats.errors;
222}
223
224(** {1 DNS Resolution} *)
225
226let resolve_endpoint pool endpoint =
227 Log.debug (fun m -> m "Resolving %a..." Endpoint.pp endpoint);
228 let addrs = Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint) ~service:(string_of_int (Endpoint.port endpoint)) in
229 Log.debug (fun m -> m "Got address list for %a" Endpoint.pp endpoint);
230 match addrs with
231 | addr :: _ ->
232 Log.debug (fun m -> m "Resolved %a to %a"
233 Endpoint.pp endpoint Eio.Net.Sockaddr.pp addr);
234 addr
235 | [] ->
236 Log.err (fun m -> m "Failed to resolve hostname: %s" (Endpoint.host endpoint));
237 failwith (Printf.sprintf "Failed to resolve hostname: %s" (Endpoint.host endpoint))
238
239(** {1 Connection Creation with Retry} *)
240
241let rec create_connection_with_retry pool endpoint attempt =
242 if attempt > pool.config.connect_retry_count then begin
243 Log.err (fun m -> m "Failed to connect to %a after %d attempts"
244 Endpoint.pp endpoint pool.config.connect_retry_count);
245 failwith (Printf.sprintf "Failed to connect to %s:%d after %d attempts"
246 (Endpoint.host endpoint) (Endpoint.port endpoint) pool.config.connect_retry_count)
247 end;
248
249 Log.debug (fun m -> m "Connecting to %a (attempt %d/%d)"
250 Endpoint.pp endpoint attempt pool.config.connect_retry_count);
251
252 try
253 let addr = resolve_endpoint pool endpoint in
254 Log.debug (fun m -> m "Resolved %a to address" Endpoint.pp endpoint);
255
256 (* Connect with optional timeout *)
257 let socket =
258 match pool.config.connect_timeout with
259 | Some timeout ->
260 Eio.Time.with_timeout_exn pool.clock timeout
261 (fun () -> Eio.Net.connect ~sw:pool.sw pool.net addr)
262 | None ->
263 Eio.Net.connect ~sw:pool.sw pool.net addr
264 in
265
266 Log.debug (fun m -> m "TCP connection established to %a" Endpoint.pp endpoint);
267
268 let flow = match pool.tls with
269 | None -> (socket :> [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t)
270 | Some tls_cfg ->
271 Log.debug (fun m -> m "Initiating TLS handshake with %a" Endpoint.pp endpoint);
272 let host = match Tls_config.servername tls_cfg with
273 | Some name -> Domain_name.(host_exn (of_string_exn name))
274 | None -> Domain_name.(host_exn (of_string_exn (Endpoint.host endpoint)))
275 in
276 let tls_flow = Tls_eio.client_of_flow ~host (Tls_config.config tls_cfg) socket in
277 Log.info (fun m -> m "TLS connection established to %a" Endpoint.pp endpoint);
278 (tls_flow :> [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t)
279 in
280
281 let now = get_time pool in
282 Log.info (fun m -> m "Connection created to %a" Endpoint.pp endpoint);
283 {
284 Connection.flow;
285 created_at = now;
286 last_used = now;
287 use_count = 0;
288 endpoint;
289 }
290
291 with
292 | Eio.Time.Timeout ->
293 Log.warn (fun m -> m "Connection timeout to %a (attempt %d)" Endpoint.pp endpoint attempt);
294 (* Exponential backoff *)
295 let delay = pool.config.connect_retry_delay *. (2.0 ** float_of_int (attempt - 1)) in
296 Eio.Time.sleep pool.clock delay;
297 create_connection_with_retry pool endpoint (attempt + 1)
298 | e ->
299 (* Other errors - retry with backoff *)
300 Log.warn (fun m -> m "Connection attempt %d to %a failed: %s"
301 attempt Endpoint.pp endpoint (Printexc.to_string e));
302 if attempt < pool.config.connect_retry_count then (
303 let delay = pool.config.connect_retry_delay *. (2.0 ** float_of_int (attempt - 1)) in
304 Eio.Time.sleep pool.clock delay;
305 create_connection_with_retry pool endpoint (attempt + 1)
306 ) else
307 raise e
308
309let create_connection pool endpoint =
310 create_connection_with_retry pool endpoint 1
311
312(** {1 Connection Validation} *)
313
314let is_healthy pool ?(check_readable = false) conn =
315 let now = get_time pool in
316
317 (* Check age *)
318 let age = now -. Connection.created_at conn in
319 if age > pool.config.max_connection_lifetime then begin
320 Log.debug (fun m -> m "Connection to %a unhealthy: exceeded max lifetime (%.2fs > %.2fs)"
321 Endpoint.pp (Connection.endpoint conn) age pool.config.max_connection_lifetime);
322 false
323 end
324
325 (* Check idle time *)
326 else if (now -. Connection.last_used conn) > pool.config.max_idle_time then begin
327 let idle_time = now -. Connection.last_used conn in
328 Log.debug (fun m -> m "Connection to %a unhealthy: exceeded max idle time (%.2fs > %.2fs)"
329 Endpoint.pp (Connection.endpoint conn) idle_time pool.config.max_idle_time);
330 false
331 end
332
333 (* Check use count *)
334 else if (match pool.config.max_connection_uses with
335 | Some max -> Connection.use_count conn >= max
336 | None -> false) then begin
337 Log.debug (fun m -> m "Connection to %a unhealthy: exceeded max use count (%d)"
338 Endpoint.pp (Connection.endpoint conn) (Connection.use_count conn));
339 false
340 end
341
342 (* Optional: custom health check *)
343 else if (match pool.config.health_check with
344 | Some check ->
345 (try
346 let healthy = check (Connection.flow conn) in
347 if not healthy then
348 Log.debug (fun m -> m "Connection to %a failed custom health check"
349 Endpoint.pp (Connection.endpoint conn));
350 not healthy
351 with e ->
352 Log.debug (fun m -> m "Connection to %a health check raised exception: %s"
353 Endpoint.pp (Connection.endpoint conn) (Printexc.to_string e));
354 true) (* Exception in health check = unhealthy *)
355 | None -> false) then
356 false
357
358 (* Optional: check if socket still connected *)
359 else if check_readable then
360 try
361 (* TODO avsm: a sockopt for this? *)
362 true
363 with
364 | _ -> false
365
366 else begin
367 Log.debug (fun m -> m "Connection to %a is healthy (age=%.2fs, idle=%.2fs, uses=%d)"
368 Endpoint.pp (Connection.endpoint conn)
369 age
370 (now -. Connection.last_used conn)
371 (Connection.use_count conn));
372 true
373 end
374
375(** {1 Internal Pool Operations} *)
376
377let close_internal pool conn =
378 Log.debug (fun m -> m "Closing connection to %a (age=%.2fs, uses=%d)"
379 Endpoint.pp (Connection.endpoint conn)
380 (get_time pool -. Connection.created_at conn)
381 (Connection.use_count conn));
382
383 Eio.Cancel.protect (fun () ->
384 try
385 Eio.Flow.close (Connection.flow conn)
386 with _ -> ()
387 );
388
389 (* Call hook if configured *)
390 Option.iter (fun f -> f (Connection.endpoint conn)) pool.config.on_connection_closed
391
392let get_or_create_endpoint_pool pool endpoint =
393 Log.debug (fun m -> m "Getting or creating endpoint pool for %a" Endpoint.pp endpoint);
394
395 (* First try with read lock *)
396 match Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
397 Hashtbl.find_opt pool.endpoints endpoint
398 ) with
399 | Some ep_pool ->
400 Log.debug (fun m -> m "Found existing endpoint pool for %a" Endpoint.pp endpoint);
401 ep_pool
402 | None ->
403 Log.debug (fun m -> m "No existing pool, need to create for %a" Endpoint.pp endpoint);
404 (* Need to create - use write lock *)
405 Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () ->
406 (* Check again in case another fiber created it *)
407 match Hashtbl.find_opt pool.endpoints endpoint with
408 | Some ep_pool ->
409 Log.debug (fun m -> m "Another fiber created pool for %a" Endpoint.pp endpoint);
410 ep_pool
411 | None ->
412 (* Create new endpoint pool *)
413 let stats = create_endp_stats () in
414 let mutex = Eio.Mutex.create () in
415
416 Log.info (fun m -> m "Creating new endpoint pool for %a (max_connections=%d)"
417 Endpoint.pp endpoint pool.config.max_connections_per_endpoint);
418
419 Log.debug (fun m -> m "About to create Eio.Pool for %a" Endpoint.pp endpoint);
420
421 let eio_pool = Eio.Pool.create
422 pool.config.max_connections_per_endpoint
423 ~validate:(fun conn ->
424 Log.debug (fun m -> m "Validate called for connection to %a" Endpoint.pp endpoint);
425 (* Called before reusing from pool *)
426 let healthy = is_healthy pool ~check_readable:false conn in
427
428 if healthy then (
429 Log.debug (fun m -> m "Reusing connection to %a from pool" Endpoint.pp endpoint);
430
431 (* Update stats for reuse *)
432 Eio.Mutex.use_rw ~protect:true mutex (fun () ->
433 stats.total_reused <- stats.total_reused + 1
434 );
435
436 (* Call hook if configured *)
437 Option.iter (fun f -> f endpoint) pool.config.on_connection_reused;
438
439 (* Run health check if configured *)
440 match pool.config.health_check with
441 | Some check ->
442 (try check (Connection.flow conn)
443 with _ -> false)
444 | None -> true
445 ) else begin
446 Log.debug (fun m -> m "Connection to %a failed validation, creating new one" Endpoint.pp endpoint);
447 false
448 end
449 )
450 ~dispose:(fun conn ->
451 (* Called when removing from pool *)
452 Eio.Cancel.protect (fun () ->
453 close_internal pool conn;
454
455 (* Update stats *)
456 Eio.Mutex.use_rw ~protect:true mutex (fun () ->
457 stats.total_closed <- stats.total_closed + 1
458 )
459 )
460 )
461 (fun () ->
462 Log.debug (fun m -> m "Factory function called for %a" Endpoint.pp endpoint);
463 try
464 let conn = create_connection pool endpoint in
465
466 Log.debug (fun m -> m "Connection created successfully for %a" Endpoint.pp endpoint);
467
468 (* Update stats *)
469 Eio.Mutex.use_rw ~protect:true mutex (fun () ->
470 stats.total_created <- stats.total_created + 1
471 );
472
473 (* Call hook if configured *)
474 Option.iter (fun f -> f endpoint) pool.config.on_connection_created;
475
476 conn
477 with e ->
478 Log.err (fun m -> m "Factory function failed for %a: %s"
479 Endpoint.pp endpoint (Printexc.to_string e));
480 (* Update error stats *)
481 Eio.Mutex.use_rw ~protect:true mutex (fun () ->
482 stats.errors <- stats.errors + 1
483 );
484 raise e
485 )
486 in
487
488 Log.debug (fun m -> m "Eio.Pool created successfully for %a" Endpoint.pp endpoint);
489
490 let ep_pool = {
491 pool = eio_pool;
492 stats;
493 mutex;
494 } in
495
496 Hashtbl.add pool.endpoints endpoint ep_pool;
497 Log.debug (fun m -> m "Endpoint pool added to hashtable for %a" Endpoint.pp endpoint);
498 ep_pool
499 )
500
501(** {1 Public API - Pool Creation} *)
502
503let create ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock) ?tls ?(config = Config.default) () : ('clock Eio.Time.clock, 'net Eio.Net.t) t =
504 Log.info (fun m -> m "Creating new connection pool (max_per_endpoint=%d, max_idle=%.1fs, max_lifetime=%.1fs)"
505 config.max_connections_per_endpoint
506 config.max_idle_time
507 config.max_connection_lifetime);
508
509 let pool = {
510 sw;
511 net;
512 clock;
513 config;
514 tls;
515 endpoints = Hashtbl.create 16;
516 endpoints_mutex = Eio.Mutex.create ();
517 } in
518
519 (* Auto-cleanup on switch release *)
520 Eio.Switch.on_release sw (fun () ->
521 Eio.Cancel.protect (fun () ->
522 Log.info (fun m -> m "Closing connection pool");
523 (* Close all idle connections - active ones will be cleaned up by switch *)
524 Hashtbl.iter (fun _endpoint _ep_pool ->
525 (* Connections are bound to the switch and will be auto-closed *)
526 ()
527 ) pool.endpoints;
528
529 Hashtbl.clear pool.endpoints
530 )
531 );
532
533 pool
534
535(** {1 Public API - Connection Management} *)
536
537let with_connection (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) endpoint f =
538 Log.debug (fun m -> m "Acquiring connection to %a" Endpoint.pp endpoint);
539 let ep_pool = get_or_create_endpoint_pool pool endpoint in
540
541 (* Increment active count *)
542 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
543 ep_pool.stats.active <- ep_pool.stats.active + 1
544 );
545
546 Fun.protect
547 ~finally:(fun () ->
548 (* Decrement active count *)
549 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
550 ep_pool.stats.active <- ep_pool.stats.active - 1
551 );
552 Log.debug (fun m -> m "Released connection to %a" Endpoint.pp endpoint)
553 )
554 (fun () ->
555 (* Use Eio.Pool for resource management *)
556 Eio.Pool.use ep_pool.pool (fun conn ->
557 Log.debug (fun m -> m "Using connection to %a (uses=%d)"
558 Endpoint.pp endpoint (Connection.use_count conn));
559
560 (* Update last used time and use count *)
561 conn.last_used <- get_time pool;
562 conn.use_count <- conn.use_count + 1;
563
564 (* Update idle stats (connection taken from idle pool) *)
565 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
566 ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1)
567 );
568
569 try
570 let result = f conn.flow in
571
572 (* Success - connection will be returned to pool by Eio.Pool *)
573 (* Update idle stats (connection returned to idle pool) *)
574 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
575 ep_pool.stats.idle <- ep_pool.stats.idle + 1
576 );
577
578 result
579 with e ->
580 (* Error - close connection so it won't be reused *)
581 Log.warn (fun m -> m "Error using connection to %a: %s"
582 Endpoint.pp endpoint (Printexc.to_string e));
583 close_internal pool conn;
584
585 (* Update error stats *)
586 Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
587 ep_pool.stats.errors <- ep_pool.stats.errors + 1
588 );
589
590 raise e
591 )
592 )
593
594let acquire ~sw (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) endpoint =
595 (* Create a connection bound to the provided switch *)
596 let conn = create_connection pool endpoint in
597
598 (* Register cleanup on switch *)
599 Eio.Switch.on_release sw (fun () ->
600 close_internal pool conn
601 );
602
603 conn
604
605let release (_pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) _conn =
606 failwith "release: manual connection management not yet implemented - use with_connection instead"
607
608let close (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) conn =
609 close_internal pool conn
610
611let validate_and_release (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) conn =
612 if is_healthy pool conn then
613 release pool conn
614 else
615 close pool conn
616
617(** {1 Public API - Statistics} *)
618
619let stats (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) endpoint =
620 match Hashtbl.find_opt pool.endpoints endpoint with
621 | Some ep_pool ->
622 Eio.Mutex.use_ro ep_pool.mutex (fun () ->
623 snapshot_stats ep_pool.stats
624 )
625 | None ->
626 (* No pool for this endpoint yet *)
627 {
628 Stats.active = 0;
629 idle = 0;
630 total_created = 0;
631 total_reused = 0;
632 total_closed = 0;
633 errors = 0;
634 }
635
636let all_stats (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) =
637 Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
638 Hashtbl.fold (fun endpoint ep_pool acc ->
639 let stats = Eio.Mutex.use_ro ep_pool.mutex (fun () ->
640 snapshot_stats ep_pool.stats
641 ) in
642 (endpoint, stats) :: acc
643 ) pool.endpoints []
644 )
645
646(** {1 Public API - Pool Management} *)
647
648let close_all_connections (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) endpoint =
649 match Hashtbl.find_opt pool.endpoints endpoint with
650 | Some _ep_pool ->
651 Eio.Cancel.protect (fun () ->
652 (* Connections will be closed by switch cleanup *)
653 (* Just remove from hashtable *)
654 Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () ->
655 Hashtbl.remove pool.endpoints endpoint
656 )
657 )
658 | None ->
659 () (* No connections to close *)
660
661let close_pool (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) =
662 Eio.Cancel.protect (fun () ->
663 (* Connections will be closed by switch cleanup *)
664 Hashtbl.clear pool.endpoints
665 )