···
1
+
(** Conpool - Protocol-agnostic TCP/IP connection pooling library for Eio *)
3
+
let src = Logs.Src.create "conpool" ~doc:"Connection pooling library"
4
+
module Log = (val Logs.src_log src : Logs.LOG)
6
+
module Endpoint = struct
12
+
let make ~host ~port = { host; port }
18
+
Format.fprintf fmt "%s:%d" t.host t.port
21
+
String.equal t1.host t2.host && t1.port = t2.port
24
+
Hashtbl.hash (t.host, t.port)
27
+
module Tls_config = struct
29
+
config : Tls.Config.client;
30
+
servername : string option;
33
+
let make ~config ?servername () = { config; servername }
35
+
let config t = t.config
36
+
let servername t = t.servername
39
+
Format.fprintf fmt "TLS(servername=%s)"
40
+
(match t.servername with Some s -> s | None -> "<default>")
43
+
(* Internal connection type - not exposed in public API *)
44
+
module Connection = struct
46
+
flow : [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t;
48
+
mutable last_used : float;
49
+
mutable use_count : int;
50
+
endpoint : Endpoint.t;
54
+
let endpoint t = t.endpoint
55
+
let created_at t = t.created_at
56
+
let last_used t = t.last_used
57
+
let use_count t = t.use_count
60
+
module Config = struct
62
+
max_connections_per_endpoint : int;
63
+
max_idle_time : float;
64
+
max_connection_lifetime : float;
65
+
max_connection_uses : int option;
66
+
health_check : ([`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t -> bool) option;
67
+
connect_timeout : float option;
68
+
connect_retry_count : int;
69
+
connect_retry_delay : float;
70
+
on_connection_created : (Endpoint.t -> unit) option;
71
+
on_connection_closed : (Endpoint.t -> unit) option;
72
+
on_connection_reused : (Endpoint.t -> unit) option;
76
+
?(max_connections_per_endpoint = 10)
77
+
?(max_idle_time = 60.0)
78
+
?(max_connection_lifetime = 300.0)
79
+
?max_connection_uses
81
+
?(connect_timeout = 10.0)
82
+
?(connect_retry_count = 3)
83
+
?(connect_retry_delay = 0.1)
84
+
?on_connection_created
85
+
?on_connection_closed
86
+
?on_connection_reused
89
+
max_connections_per_endpoint;
91
+
max_connection_lifetime;
92
+
max_connection_uses;
94
+
connect_timeout = Some connect_timeout;
95
+
connect_retry_count;
96
+
connect_retry_delay;
97
+
on_connection_created;
98
+
on_connection_closed;
99
+
on_connection_reused;
102
+
let default = make ()
104
+
let max_connections_per_endpoint t = t.max_connections_per_endpoint
105
+
let max_idle_time t = t.max_idle_time
106
+
let max_connection_lifetime t = t.max_connection_lifetime
107
+
let max_connection_uses t = t.max_connection_uses
108
+
let health_check t = t.health_check
109
+
let connect_timeout t = t.connect_timeout
110
+
let connect_retry_count t = t.connect_retry_count
111
+
let connect_retry_delay t = t.connect_retry_delay
116
+
- max_connections_per_endpoint: %d@,\
117
+
- max_idle_time: %.1fs@,\
118
+
- max_connection_lifetime: %.1fs@,\
119
+
- max_connection_uses: %s@,\
120
+
- connect_timeout: %s@,\
121
+
- connect_retry_count: %d@,\
122
+
- connect_retry_delay: %.2fs@]"
123
+
t.max_connections_per_endpoint
125
+
t.max_connection_lifetime
126
+
(match t.max_connection_uses with Some n -> string_of_int n | None -> "unlimited")
127
+
(match t.connect_timeout with Some f -> Printf.sprintf "%.1fs" f | None -> "none")
128
+
t.connect_retry_count
129
+
t.connect_retry_delay
132
+
module Stats = struct
136
+
total_created : int;
137
+
total_reused : int;
138
+
total_closed : int;
142
+
let active t = t.active
143
+
let idle t = t.idle
144
+
let total_created t = t.total_created
145
+
let total_reused t = t.total_reused
146
+
let total_closed t = t.total_closed
147
+
let errors t = t.errors
166
+
type endp_stats = {
167
+
mutable active : int;
168
+
mutable idle : int;
169
+
mutable total_created : int;
170
+
mutable total_reused : int;
171
+
mutable total_closed : int;
172
+
mutable errors : int;
175
+
type endpoint_pool = {
176
+
pool : Connection.t Eio.Pool.t;
177
+
stats : endp_stats;
178
+
mutex : Eio.Mutex.t;
181
+
type ('clock, 'net) t = {
186
+
tls : Tls_config.t option;
187
+
endpoints : (Endpoint.t, endpoint_pool) Hashtbl.t;
188
+
endpoints_mutex : Eio.Mutex.t;
191
+
module EndpointTbl = Hashtbl.Make(struct
192
+
type t = Endpoint.t
193
+
let equal = Endpoint.equal
194
+
let hash = Endpoint.hash
197
+
let get_time pool =
198
+
Eio.Time.now pool.clock
200
+
let create_endp_stats () = {
209
+
let snapshot_stats (stats : endp_stats) : Stats.t = {
210
+
active = stats.active;
212
+
total_created = stats.total_created;
213
+
total_reused = stats.total_reused;
214
+
total_closed = stats.total_closed;
215
+
errors = stats.errors;
218
+
(** {1 DNS Resolution} *)
220
+
let resolve_endpoint pool endpoint =
221
+
Log.debug (fun m -> m "Resolving %a..." Endpoint.pp endpoint);
222
+
let addrs = Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint) ~service:(string_of_int (Endpoint.port endpoint)) in
223
+
Log.debug (fun m -> m "Got address list for %a" Endpoint.pp endpoint);
226
+
Log.debug (fun m -> m "Resolved %a to %a"
227
+
Endpoint.pp endpoint Eio.Net.Sockaddr.pp addr);
230
+
Log.err (fun m -> m "Failed to resolve hostname: %s" (Endpoint.host endpoint));
231
+
failwith (Printf.sprintf "Failed to resolve hostname: %s" (Endpoint.host endpoint))
233
+
(** {1 Connection Creation with Retry} *)
235
+
let rec create_connection_with_retry pool endpoint attempt =
236
+
if attempt > pool.config.connect_retry_count then begin
237
+
Log.err (fun m -> m "Failed to connect to %a after %d attempts"
238
+
Endpoint.pp endpoint pool.config.connect_retry_count);
239
+
failwith (Printf.sprintf "Failed to connect to %s:%d after %d attempts"
240
+
(Endpoint.host endpoint) (Endpoint.port endpoint) pool.config.connect_retry_count)
243
+
Log.debug (fun m -> m "Connecting to %a (attempt %d/%d)"
244
+
Endpoint.pp endpoint attempt pool.config.connect_retry_count);
247
+
let addr = resolve_endpoint pool endpoint in
248
+
Log.debug (fun m -> m "Resolved %a to address" Endpoint.pp endpoint);
250
+
(* Connect with optional timeout *)
252
+
match pool.config.connect_timeout with
254
+
Eio.Time.with_timeout_exn pool.clock timeout
255
+
(fun () -> Eio.Net.connect ~sw:pool.sw pool.net addr)
257
+
Eio.Net.connect ~sw:pool.sw pool.net addr
260
+
Log.debug (fun m -> m "TCP connection established to %a" Endpoint.pp endpoint);
262
+
let flow = match pool.tls with
263
+
| None -> (socket :> [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t)
265
+
Log.debug (fun m -> m "Initiating TLS handshake with %a" Endpoint.pp endpoint);
266
+
let host = match Tls_config.servername tls_cfg with
267
+
| Some name -> Domain_name.(host_exn (of_string_exn name))
268
+
| None -> Domain_name.(host_exn (of_string_exn (Endpoint.host endpoint)))
270
+
let tls_flow = Tls_eio.client_of_flow ~host (Tls_config.config tls_cfg) socket in
271
+
Log.info (fun m -> m "TLS connection established to %a" Endpoint.pp endpoint);
272
+
(tls_flow :> [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t)
275
+
let now = get_time pool in
276
+
Log.info (fun m -> m "Connection created to %a" Endpoint.pp endpoint);
286
+
| Eio.Time.Timeout ->
287
+
Log.warn (fun m -> m "Connection timeout to %a (attempt %d)" Endpoint.pp endpoint attempt);
288
+
(* Exponential backoff *)
289
+
let delay = pool.config.connect_retry_delay *. (2.0 ** float_of_int (attempt - 1)) in
290
+
Eio.Time.sleep pool.clock delay;
291
+
create_connection_with_retry pool endpoint (attempt + 1)
293
+
(* Other errors - retry with backoff *)
294
+
Log.warn (fun m -> m "Connection attempt %d to %a failed: %s"
295
+
attempt Endpoint.pp endpoint (Printexc.to_string e));
296
+
if attempt < pool.config.connect_retry_count then (
297
+
let delay = pool.config.connect_retry_delay *. (2.0 ** float_of_int (attempt - 1)) in
298
+
Eio.Time.sleep pool.clock delay;
299
+
create_connection_with_retry pool endpoint (attempt + 1)
303
+
let create_connection pool endpoint =
304
+
create_connection_with_retry pool endpoint 1
306
+
(** {1 Connection Validation} *)
308
+
let is_healthy pool ?(check_readable = false) conn =
309
+
let now = get_time pool in
312
+
let age = now -. Connection.created_at conn in
313
+
if age > pool.config.max_connection_lifetime then begin
314
+
Log.debug (fun m -> m "Connection to %a unhealthy: exceeded max lifetime (%.2fs > %.2fs)"
315
+
Endpoint.pp (Connection.endpoint conn) age pool.config.max_connection_lifetime);
319
+
(* Check idle time *)
320
+
else if (now -. Connection.last_used conn) > pool.config.max_idle_time then begin
321
+
let idle_time = now -. Connection.last_used conn in
322
+
Log.debug (fun m -> m "Connection to %a unhealthy: exceeded max idle time (%.2fs > %.2fs)"
323
+
Endpoint.pp (Connection.endpoint conn) idle_time pool.config.max_idle_time);
327
+
(* Check use count *)
328
+
else if (match pool.config.max_connection_uses with
329
+
| Some max -> Connection.use_count conn >= max
330
+
| None -> false) then begin
331
+
Log.debug (fun m -> m "Connection to %a unhealthy: exceeded max use count (%d)"
332
+
Endpoint.pp (Connection.endpoint conn) (Connection.use_count conn));
336
+
(* Optional: custom health check *)
337
+
else if (match pool.config.health_check with
340
+
let healthy = check (Connection.flow conn) in
341
+
if not healthy then
342
+
Log.debug (fun m -> m "Connection to %a failed custom health check"
343
+
Endpoint.pp (Connection.endpoint conn));
346
+
Log.debug (fun m -> m "Connection to %a health check raised exception: %s"
347
+
Endpoint.pp (Connection.endpoint conn) (Printexc.to_string e));
348
+
true) (* Exception in health check = unhealthy *)
349
+
| None -> false) then
352
+
(* Optional: check if socket still connected *)
353
+
else if check_readable then
355
+
(* TODO avsm: a sockopt for this? *)
361
+
Log.debug (fun m -> m "Connection to %a is healthy (age=%.2fs, idle=%.2fs, uses=%d)"
362
+
Endpoint.pp (Connection.endpoint conn)
364
+
(now -. Connection.last_used conn)
365
+
(Connection.use_count conn));
369
+
(** {1 Internal Pool Operations} *)
371
+
let close_internal pool conn =
372
+
Log.debug (fun m -> m "Closing connection to %a (age=%.2fs, uses=%d)"
373
+
Endpoint.pp (Connection.endpoint conn)
374
+
(get_time pool -. Connection.created_at conn)
375
+
(Connection.use_count conn));
377
+
Eio.Cancel.protect (fun () ->
379
+
Eio.Flow.close (Connection.flow conn)
383
+
(* Call hook if configured *)
384
+
Option.iter (fun f -> f (Connection.endpoint conn)) pool.config.on_connection_closed
386
+
let get_or_create_endpoint_pool pool endpoint =
387
+
Log.debug (fun m -> m "Getting or creating endpoint pool for %a" Endpoint.pp endpoint);
389
+
(* First try with read lock *)
390
+
match Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
391
+
Hashtbl.find_opt pool.endpoints endpoint
394
+
Log.debug (fun m -> m "Found existing endpoint pool for %a" Endpoint.pp endpoint);
397
+
Log.debug (fun m -> m "No existing pool, need to create for %a" Endpoint.pp endpoint);
398
+
(* Need to create - use write lock *)
399
+
Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () ->
400
+
(* Check again in case another fiber created it *)
401
+
match Hashtbl.find_opt pool.endpoints endpoint with
403
+
Log.debug (fun m -> m "Another fiber created pool for %a" Endpoint.pp endpoint);
406
+
(* Create new endpoint pool *)
407
+
let stats = create_endp_stats () in
408
+
let mutex = Eio.Mutex.create () in
410
+
Log.info (fun m -> m "Creating new endpoint pool for %a (max_connections=%d)"
411
+
Endpoint.pp endpoint pool.config.max_connections_per_endpoint);
413
+
Log.debug (fun m -> m "About to create Eio.Pool for %a" Endpoint.pp endpoint);
415
+
let eio_pool = Eio.Pool.create
416
+
pool.config.max_connections_per_endpoint
417
+
~validate:(fun conn ->
418
+
Log.debug (fun m -> m "Validate called for connection to %a" Endpoint.pp endpoint);
419
+
(* Called before reusing from pool *)
420
+
let healthy = is_healthy pool ~check_readable:false conn in
423
+
Log.debug (fun m -> m "Reusing connection to %a from pool" Endpoint.pp endpoint);
425
+
(* Update stats for reuse *)
426
+
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
427
+
stats.total_reused <- stats.total_reused + 1
430
+
(* Call hook if configured *)
431
+
Option.iter (fun f -> f endpoint) pool.config.on_connection_reused;
433
+
(* Run health check if configured *)
434
+
match pool.config.health_check with
436
+
(try check (Connection.flow conn)
440
+
Log.debug (fun m -> m "Connection to %a failed validation, creating new one" Endpoint.pp endpoint);
444
+
~dispose:(fun conn ->
445
+
(* Called when removing from pool *)
446
+
Eio.Cancel.protect (fun () ->
447
+
close_internal pool conn;
450
+
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
451
+
stats.total_closed <- stats.total_closed + 1
456
+
Log.debug (fun m -> m "Factory function called for %a" Endpoint.pp endpoint);
458
+
let conn = create_connection pool endpoint in
460
+
Log.debug (fun m -> m "Connection created successfully for %a" Endpoint.pp endpoint);
463
+
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
464
+
stats.total_created <- stats.total_created + 1
467
+
(* Call hook if configured *)
468
+
Option.iter (fun f -> f endpoint) pool.config.on_connection_created;
472
+
Log.err (fun m -> m "Factory function failed for %a: %s"
473
+
Endpoint.pp endpoint (Printexc.to_string e));
474
+
(* Update error stats *)
475
+
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
476
+
stats.errors <- stats.errors + 1
482
+
Log.debug (fun m -> m "Eio.Pool created successfully for %a" Endpoint.pp endpoint);
490
+
Hashtbl.add pool.endpoints endpoint ep_pool;
491
+
Log.debug (fun m -> m "Endpoint pool added to hashtable for %a" Endpoint.pp endpoint);
495
+
(** {1 Public API - Pool Creation} *)
497
+
let create ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock) ?tls ?(config = Config.default) () : ('clock Eio.Time.clock, 'net Eio.Net.t) t =
498
+
Log.info (fun m -> m "Creating new connection pool (max_per_endpoint=%d, max_idle=%.1fs, max_lifetime=%.1fs)"
499
+
config.max_connections_per_endpoint
500
+
config.max_idle_time
501
+
config.max_connection_lifetime);
509
+
endpoints = Hashtbl.create 16;
510
+
endpoints_mutex = Eio.Mutex.create ();
513
+
(* Auto-cleanup on switch release *)
514
+
Eio.Switch.on_release sw (fun () ->
515
+
Eio.Cancel.protect (fun () ->
516
+
Log.info (fun m -> m "Closing connection pool");
517
+
(* Close all idle connections - active ones will be cleaned up by switch *)
518
+
Hashtbl.iter (fun _endpoint _ep_pool ->
519
+
(* Connections are bound to the switch and will be auto-closed *)
523
+
Hashtbl.clear pool.endpoints
529
+
(** {1 Public API - Connection Management} *)
531
+
let with_connection (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) endpoint f =
532
+
Log.debug (fun m -> m "Acquiring connection to %a" Endpoint.pp endpoint);
533
+
let ep_pool = get_or_create_endpoint_pool pool endpoint in
535
+
(* Increment active count *)
536
+
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
537
+
ep_pool.stats.active <- ep_pool.stats.active + 1
541
+
~finally:(fun () ->
542
+
(* Decrement active count *)
543
+
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
544
+
ep_pool.stats.active <- ep_pool.stats.active - 1
546
+
Log.debug (fun m -> m "Released connection to %a" Endpoint.pp endpoint)
549
+
(* Use Eio.Pool for resource management *)
550
+
Eio.Pool.use ep_pool.pool (fun conn ->
551
+
Log.debug (fun m -> m "Using connection to %a (uses=%d)"
552
+
Endpoint.pp endpoint (Connection.use_count conn));
554
+
(* Update last used time and use count *)
555
+
conn.last_used <- get_time pool;
556
+
conn.use_count <- conn.use_count + 1;
558
+
(* Update idle stats (connection taken from idle pool) *)
559
+
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
560
+
ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1)
564
+
let result = f conn.flow in
566
+
(* Success - connection will be returned to pool by Eio.Pool *)
567
+
(* Update idle stats (connection returned to idle pool) *)
568
+
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
569
+
ep_pool.stats.idle <- ep_pool.stats.idle + 1
574
+
(* Error - close connection so it won't be reused *)
575
+
Log.warn (fun m -> m "Error using connection to %a: %s"
576
+
Endpoint.pp endpoint (Printexc.to_string e));
577
+
close_internal pool conn;
579
+
(* Update error stats *)
580
+
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
581
+
ep_pool.stats.errors <- ep_pool.stats.errors + 1
588
+
(** {1 Public API - Statistics} *)
590
+
let stats (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) endpoint =
591
+
match Hashtbl.find_opt pool.endpoints endpoint with
593
+
Eio.Mutex.use_ro ep_pool.mutex (fun () ->
594
+
snapshot_stats ep_pool.stats
597
+
(* No pool for this endpoint yet *)
607
+
let all_stats (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) =
608
+
Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
609
+
Hashtbl.fold (fun endpoint ep_pool acc ->
610
+
let stats = Eio.Mutex.use_ro ep_pool.mutex (fun () ->
611
+
snapshot_stats ep_pool.stats
613
+
(endpoint, stats) :: acc
614
+
) pool.endpoints []
617
+
(** {1 Public API - Pool Management} *)
619
+
let clear_endpoint (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) endpoint =
620
+
Log.info (fun m -> m "Clearing endpoint %a from pool" Endpoint.pp endpoint);
621
+
match Hashtbl.find_opt pool.endpoints endpoint with
623
+
Eio.Cancel.protect (fun () ->
624
+
(* Remove endpoint pool from hashtable *)
625
+
(* Idle connections will be discarded *)
626
+
(* Active connections will be closed when returned *)
627
+
Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () ->
628
+
Hashtbl.remove pool.endpoints endpoint
632
+
Log.debug (fun m -> m "No endpoint pool found for %a" Endpoint.pp endpoint)