My agentic slop goes here. Not intended for anyone else!
1(** Connection pooling for efficient JMAP client connection reuse.
2
3 This module provides connection pooling functionality to reduce connection overhead.
4 For demonstration purposes, this implements statistics tracking and connection management
5 concepts while still using cohttp-eio for the actual HTTP operations.
6
7 @see <https://www.rfc-editor.org/rfc/rfc8620.html#section-3.3> RFC 8620, Section 3.3
8*)
9
10(** TLS configuration options *)
11type tls_config = {
12 authenticator : X509.Authenticator.t option; (** Custom TLS authenticator *)
13 certificates : Tls.Config.own_cert list; (** Client certificates for mutual TLS *)
14 ciphers : Tls.Ciphersuite.ciphersuite list option; (** Allowed cipher suites *)
15 version : (Tls.Core.tls_version * Tls.Core.tls_version) option; (** Min and max TLS versions *)
16 alpn_protocols : string list option; (** ALPN protocol list *)
17}
18
19(** Statistics for connection pool monitoring *)
20type pool_stats = {
21 total_connections : int; (** Total connections created *)
22 active_connections : int; (** Currently active connections *)
23 idle_connections : int; (** Currently idle connections *)
24 total_requests : int; (** Total requests processed *)
25 cache_hits : int; (** Requests served from cached connections *)
26 cache_misses : int; (** Requests requiring new connections *)
27 connection_failures : int; (** Failed connection attempts *)
28}
29
30(** Connection pool configuration *)
31type pool_config = {
32 max_connections : int; (** Maximum total connections *)
33 max_idle_connections : int; (** Maximum idle connections to keep *)
34 connection_timeout : float; (** Connection establishment timeout (seconds) *)
35 idle_timeout : float; (** Time to keep idle connections (seconds) *)
36 max_lifetime : float; (** Maximum connection lifetime (seconds) *)
37 health_check_interval : float; (** Health check interval (seconds) *)
38 enable_keep_alive : bool; (** Enable HTTP keep-alive *)
39}
40
41(** Connection info for tracking *)
42type connection_info = {
43 id : string; (** Unique connection ID *)
44 host : string; (** Target host *)
45 port : int; (** Target port *)
46 use_tls : bool; (** TLS usage flag *)
47 created_at : float; (** Connection creation timestamp *)
48 last_used : float; (** Last usage timestamp *)
49 request_count : int; (** Number of requests served *)
50}
51
52(** Connection pool type *)
53type t = {
54 config : pool_config;
55 mutable connections : connection_info list;
56 mutable stats : pool_stats;
57}
58
59(** Create default pool configuration *)
60let default_config () = {
61 max_connections = 20;
62 max_idle_connections = 10;
63 connection_timeout = 10.0;
64 idle_timeout = 300.0; (* 5 minutes *)
65 max_lifetime = 3600.0; (* 1 hour *)
66 health_check_interval = 60.0; (* 1 minute *)
67 enable_keep_alive = true;
68}
69
70(** Generate unique connection ID *)
71let generate_connection_id () =
72 let timestamp = Unix.gettimeofday () in
73 let random = Random.int 100000 in
74 Printf.sprintf "conn_%f_%d" timestamp random
75
76(** Create a new connection pool *)
77let create ?(config = default_config ()) ~sw () =
78 let _ = sw in (* Acknowledge unused parameter *)
79 let initial_stats = {
80 total_connections = 0;
81 active_connections = 0;
82 idle_connections = 0;
83 total_requests = 0;
84 cache_hits = 0;
85 cache_misses = 0;
86 connection_failures = 0;
87 } in
88 {
89 config;
90 connections = [];
91 stats = initial_stats;
92 }
93
94(** Check if connection is still healthy *)
95let is_connection_healthy pool conn =
96 let now = Unix.gettimeofday () in
97 let age = now -. conn.created_at in
98 let idle_time = now -. conn.last_used in
99
100 age < pool.config.max_lifetime &&
101 idle_time < pool.config.idle_timeout
102
103(** Find existing connection for host/port *)
104let find_connection pool ~host ~port ~use_tls =
105 List.find_opt (fun conn ->
106 conn.host = host &&
107 conn.port = port &&
108 conn.use_tls = use_tls &&
109 is_connection_healthy pool conn
110 ) pool.connections
111
112(** Create new connection info *)
113let create_connection_info ~host ~port ~use_tls =
114 let now = Unix.gettimeofday () in
115 {
116 id = generate_connection_id ();
117 host;
118 port;
119 use_tls;
120 created_at = now;
121 last_used = now;
122 request_count = 0;
123 }
124
125(** Update connection usage *)
126let use_connection pool conn =
127 let now = Unix.gettimeofday () in
128 let updated_conn = {
129 conn with
130 last_used = now;
131 request_count = conn.request_count + 1;
132 } in
133
134 (* Update connections list *)
135 pool.connections <- updated_conn ::
136 (List.filter (fun c -> c.id <> conn.id) pool.connections);
137
138 (* Update stats *)
139 pool.stats <- {
140 pool.stats with
141 cache_hits = pool.stats.cache_hits + 1;
142 total_requests = pool.stats.total_requests + 1;
143 };
144
145 updated_conn
146
147(** Add new connection to pool *)
148let add_connection pool conn =
149 pool.connections <- conn :: pool.connections;
150 pool.stats <- {
151 pool.stats with
152 total_connections = pool.stats.total_connections + 1;
153 cache_misses = pool.stats.cache_misses + 1;
154 total_requests = pool.stats.total_requests + 1;
155 }
156
157(** Perform HTTP request using pool for statistics tracking *)
158let http_request_with_pool pool ~env ~method_ ~uri ~headers ~body ~tls_config =
159 let host = match Uri.host uri with
160 | Some h -> h
161 | None -> failwith "No host in URI"
162 in
163 let use_tls = match Uri.scheme uri with
164 | Some "https" -> true
165 | Some "http" -> false
166 | _ -> true
167 in
168 let port = match Uri.port uri with
169 | Some p -> p
170 | None -> if use_tls then 443 else 80
171 in
172
173 try
174 (* Check if we have a cached connection for this endpoint *)
175 let _conn_info = match find_connection pool ~host ~port ~use_tls with
176 | Some existing_conn ->
177 (* Update existing connection usage *)
178 use_connection pool existing_conn
179 | None ->
180 (* Check connection limits *)
181 if List.length pool.connections >= pool.config.max_connections then (
182 pool.stats <- {
183 pool.stats with
184 connection_failures = pool.stats.connection_failures + 1;
185 };
186 failwith ("Connection pool full: " ^ string_of_int pool.config.max_connections)
187 ) else (
188 (* Create new connection info *)
189 let new_conn = create_connection_info ~host ~port ~use_tls in
190 add_connection pool new_conn;
191 new_conn
192 )
193 in
194
195 (* Actually perform HTTP request using cohttp-eio *)
196 let https_fn = if use_tls then
197 let authenticator = match tls_config with
198 | Some tls when tls.authenticator <> None ->
199 (match tls.authenticator with Some auth -> auth | None -> assert false)
200 | _ ->
201 match Ca_certs.authenticator () with
202 | Ok auth -> auth
203 | Error (`Msg msg) -> failwith ("TLS authenticator error: " ^ msg)
204 in
205 let tls_config_obj = match Tls.Config.client ~authenticator () with
206 | Ok config -> config
207 | Error (`Msg msg) -> failwith ("TLS config error: " ^ msg)
208 in
209 Some (fun uri raw_flow ->
210 let host = match Uri.host uri with
211 | Some h -> h
212 | None -> failwith "No host in URI for TLS"
213 in
214 match Domain_name.of_string host with
215 | Error (`Msg msg) -> failwith ("Invalid hostname for TLS: " ^ msg)
216 | Ok domain ->
217 match Domain_name.host domain with
218 | Error (`Msg msg) -> failwith ("Invalid host domain: " ^ msg)
219 | Ok hostname ->
220 Tls_eio.client_of_flow tls_config_obj raw_flow ~host:hostname
221 )
222 else
223 None
224 in
225
226 Eio.Switch.run @@ fun sw ->
227 let client = Cohttp_eio.Client.make ~https:https_fn env#net in
228
229 let cohttp_headers =
230 List.fold_left (fun hdrs (k, v) ->
231 Cohttp.Header.add hdrs k v
232 ) (Cohttp.Header.init ()) headers
233 in
234
235 let body_obj = match body with
236 | Some s -> Cohttp_eio.Body.of_string s
237 | None -> Cohttp_eio.Body.of_string ""
238 in
239
240 let (response, response_body) = Cohttp_eio.Client.call ~sw client ~headers:cohttp_headers ~body:body_obj method_ uri in
241
242 let status_code = Cohttp.Response.status response |> Cohttp.Code.code_of_status in
243 let body_content = Eio.Buf_read.(parse_exn take_all) response_body ~max_size:(10 * 1024 * 1024) in
244
245 if status_code >= 200 && status_code < 300 then
246 Ok body_content
247 else
248 Error (Jmap.Error.transport
249 (Printf.sprintf "HTTP error %d: %s" status_code body_content))
250
251 with
252 | exn ->
253 pool.stats <- {
254 pool.stats with
255 connection_failures = pool.stats.connection_failures + 1;
256 };
257 Error (Jmap.Error.transport
258 (Printf.sprintf "Connection error: %s" (Printexc.to_string exn)))
259
260(** Clean up old connections *)
261let cleanup_connections pool =
262 let now = Unix.gettimeofday () in
263 let (healthy, _unhealthy) = List.partition (is_connection_healthy pool) pool.connections in
264
265 (* Keep only healthy connections, respecting idle limit *)
266 let idle_connections = List.filter (fun c ->
267 now -. c.last_used > 1.0 (* Idle for more than 1 second *)
268 ) healthy in
269
270 let keep_idle =
271 if List.length idle_connections > pool.config.max_idle_connections then
272 let sorted = List.sort (fun a b ->
273 compare b.last_used a.last_used (* Most recently used first *)
274 ) idle_connections in
275 let rec list_take n = function
276 | [] -> []
277 | h :: t when n > 0 -> h :: list_take (n - 1) t
278 | _ -> []
279 in
280 list_take pool.config.max_idle_connections sorted
281 else
282 idle_connections
283 in
284
285 let active_connections = List.filter (fun c ->
286 now -. c.last_used <= 1.0
287 ) healthy in
288
289 pool.connections <- active_connections @ keep_idle;
290 pool.stats <- {
291 pool.stats with
292 total_connections = List.length pool.connections;
293 active_connections = List.length active_connections;
294 idle_connections = List.length keep_idle;
295 }
296
297(** Get pool statistics *)
298let get_stats pool =
299 cleanup_connections pool;
300 pool.stats
301
302(** Close all connections and clean up pool *)
303let close pool =
304 pool.connections <- [];
305 pool.stats <- {
306 pool.stats with
307 total_connections = 0;
308 active_connections = 0;
309 idle_connections = 0;
310 }
311