My agentic slop goes here. Not intended for anyone else!
at main 10 kB view raw
1(** Connection pooling for efficient JMAP client connection reuse. 2 3 This module provides connection pooling functionality to reduce connection overhead. 4 For demonstration purposes, this implements statistics tracking and connection management 5 concepts while still using cohttp-eio for the actual HTTP operations. 6 7 @see <https://www.rfc-editor.org/rfc/rfc8620.html#section-3.3> RFC 8620, Section 3.3 8*) 9 10(** TLS configuration options *) 11type tls_config = { 12 authenticator : X509.Authenticator.t option; (** Custom TLS authenticator *) 13 certificates : Tls.Config.own_cert list; (** Client certificates for mutual TLS *) 14 ciphers : Tls.Ciphersuite.ciphersuite list option; (** Allowed cipher suites *) 15 version : (Tls.Core.tls_version * Tls.Core.tls_version) option; (** Min and max TLS versions *) 16 alpn_protocols : string list option; (** ALPN protocol list *) 17} 18 19(** Statistics for connection pool monitoring *) 20type pool_stats = { 21 total_connections : int; (** Total connections created *) 22 active_connections : int; (** Currently active connections *) 23 idle_connections : int; (** Currently idle connections *) 24 total_requests : int; (** Total requests processed *) 25 cache_hits : int; (** Requests served from cached connections *) 26 cache_misses : int; (** Requests requiring new connections *) 27 connection_failures : int; (** Failed connection attempts *) 28} 29 30(** Connection pool configuration *) 31type pool_config = { 32 max_connections : int; (** Maximum total connections *) 33 max_idle_connections : int; (** Maximum idle connections to keep *) 34 connection_timeout : float; (** Connection establishment timeout (seconds) *) 35 idle_timeout : float; (** Time to keep idle connections (seconds) *) 36 max_lifetime : float; (** Maximum connection lifetime (seconds) *) 37 health_check_interval : float; (** Health check interval (seconds) *) 38 enable_keep_alive : bool; (** Enable HTTP keep-alive *) 39} 40 41(** Connection info for tracking *) 42type connection_info = { 43 id : string; (** Unique connection ID *) 44 host : string; (** Target host *) 45 port : int; (** Target port *) 46 use_tls : bool; (** TLS usage flag *) 47 created_at : float; (** Connection creation timestamp *) 48 last_used : float; (** Last usage timestamp *) 49 request_count : int; (** Number of requests served *) 50} 51 52(** Connection pool type *) 53type t = { 54 config : pool_config; 55 mutable connections : connection_info list; 56 mutable stats : pool_stats; 57} 58 59(** Create default pool configuration *) 60let default_config () = { 61 max_connections = 20; 62 max_idle_connections = 10; 63 connection_timeout = 10.0; 64 idle_timeout = 300.0; (* 5 minutes *) 65 max_lifetime = 3600.0; (* 1 hour *) 66 health_check_interval = 60.0; (* 1 minute *) 67 enable_keep_alive = true; 68} 69 70(** Generate unique connection ID *) 71let generate_connection_id () = 72 let timestamp = Unix.gettimeofday () in 73 let random = Random.int 100000 in 74 Printf.sprintf "conn_%f_%d" timestamp random 75 76(** Create a new connection pool *) 77let create ?(config = default_config ()) ~sw () = 78 let _ = sw in (* Acknowledge unused parameter *) 79 let initial_stats = { 80 total_connections = 0; 81 active_connections = 0; 82 idle_connections = 0; 83 total_requests = 0; 84 cache_hits = 0; 85 cache_misses = 0; 86 connection_failures = 0; 87 } in 88 { 89 config; 90 connections = []; 91 stats = initial_stats; 92 } 93 94(** Check if connection is still healthy *) 95let is_connection_healthy pool conn = 96 let now = Unix.gettimeofday () in 97 let age = now -. conn.created_at in 98 let idle_time = now -. conn.last_used in 99 100 age < pool.config.max_lifetime && 101 idle_time < pool.config.idle_timeout 102 103(** Find existing connection for host/port *) 104let find_connection pool ~host ~port ~use_tls = 105 List.find_opt (fun conn -> 106 conn.host = host && 107 conn.port = port && 108 conn.use_tls = use_tls && 109 is_connection_healthy pool conn 110 ) pool.connections 111 112(** Create new connection info *) 113let create_connection_info ~host ~port ~use_tls = 114 let now = Unix.gettimeofday () in 115 { 116 id = generate_connection_id (); 117 host; 118 port; 119 use_tls; 120 created_at = now; 121 last_used = now; 122 request_count = 0; 123 } 124 125(** Update connection usage *) 126let use_connection pool conn = 127 let now = Unix.gettimeofday () in 128 let updated_conn = { 129 conn with 130 last_used = now; 131 request_count = conn.request_count + 1; 132 } in 133 134 (* Update connections list *) 135 pool.connections <- updated_conn :: 136 (List.filter (fun c -> c.id <> conn.id) pool.connections); 137 138 (* Update stats *) 139 pool.stats <- { 140 pool.stats with 141 cache_hits = pool.stats.cache_hits + 1; 142 total_requests = pool.stats.total_requests + 1; 143 }; 144 145 updated_conn 146 147(** Add new connection to pool *) 148let add_connection pool conn = 149 pool.connections <- conn :: pool.connections; 150 pool.stats <- { 151 pool.stats with 152 total_connections = pool.stats.total_connections + 1; 153 cache_misses = pool.stats.cache_misses + 1; 154 total_requests = pool.stats.total_requests + 1; 155 } 156 157(** Perform HTTP request using pool for statistics tracking *) 158let http_request_with_pool pool ~env ~method_ ~uri ~headers ~body ~tls_config = 159 let host = match Uri.host uri with 160 | Some h -> h 161 | None -> failwith "No host in URI" 162 in 163 let use_tls = match Uri.scheme uri with 164 | Some "https" -> true 165 | Some "http" -> false 166 | _ -> true 167 in 168 let port = match Uri.port uri with 169 | Some p -> p 170 | None -> if use_tls then 443 else 80 171 in 172 173 try 174 (* Check if we have a cached connection for this endpoint *) 175 let _conn_info = match find_connection pool ~host ~port ~use_tls with 176 | Some existing_conn -> 177 (* Update existing connection usage *) 178 use_connection pool existing_conn 179 | None -> 180 (* Check connection limits *) 181 if List.length pool.connections >= pool.config.max_connections then ( 182 pool.stats <- { 183 pool.stats with 184 connection_failures = pool.stats.connection_failures + 1; 185 }; 186 failwith ("Connection pool full: " ^ string_of_int pool.config.max_connections) 187 ) else ( 188 (* Create new connection info *) 189 let new_conn = create_connection_info ~host ~port ~use_tls in 190 add_connection pool new_conn; 191 new_conn 192 ) 193 in 194 195 (* Actually perform HTTP request using cohttp-eio *) 196 let https_fn = if use_tls then 197 let authenticator = match tls_config with 198 | Some tls when tls.authenticator <> None -> 199 (match tls.authenticator with Some auth -> auth | None -> assert false) 200 | _ -> 201 match Ca_certs.authenticator () with 202 | Ok auth -> auth 203 | Error (`Msg msg) -> failwith ("TLS authenticator error: " ^ msg) 204 in 205 let tls_config_obj = match Tls.Config.client ~authenticator () with 206 | Ok config -> config 207 | Error (`Msg msg) -> failwith ("TLS config error: " ^ msg) 208 in 209 Some (fun uri raw_flow -> 210 let host = match Uri.host uri with 211 | Some h -> h 212 | None -> failwith "No host in URI for TLS" 213 in 214 match Domain_name.of_string host with 215 | Error (`Msg msg) -> failwith ("Invalid hostname for TLS: " ^ msg) 216 | Ok domain -> 217 match Domain_name.host domain with 218 | Error (`Msg msg) -> failwith ("Invalid host domain: " ^ msg) 219 | Ok hostname -> 220 Tls_eio.client_of_flow tls_config_obj raw_flow ~host:hostname 221 ) 222 else 223 None 224 in 225 226 Eio.Switch.run @@ fun sw -> 227 let client = Cohttp_eio.Client.make ~https:https_fn env#net in 228 229 let cohttp_headers = 230 List.fold_left (fun hdrs (k, v) -> 231 Cohttp.Header.add hdrs k v 232 ) (Cohttp.Header.init ()) headers 233 in 234 235 let body_obj = match body with 236 | Some s -> Cohttp_eio.Body.of_string s 237 | None -> Cohttp_eio.Body.of_string "" 238 in 239 240 let (response, response_body) = Cohttp_eio.Client.call ~sw client ~headers:cohttp_headers ~body:body_obj method_ uri in 241 242 let status_code = Cohttp.Response.status response |> Cohttp.Code.code_of_status in 243 let body_content = Eio.Buf_read.(parse_exn take_all) response_body ~max_size:(10 * 1024 * 1024) in 244 245 if status_code >= 200 && status_code < 300 then 246 Ok body_content 247 else 248 Error (Jmap.Error.transport 249 (Printf.sprintf "HTTP error %d: %s" status_code body_content)) 250 251 with 252 | exn -> 253 pool.stats <- { 254 pool.stats with 255 connection_failures = pool.stats.connection_failures + 1; 256 }; 257 Error (Jmap.Error.transport 258 (Printf.sprintf "Connection error: %s" (Printexc.to_string exn))) 259 260(** Clean up old connections *) 261let cleanup_connections pool = 262 let now = Unix.gettimeofday () in 263 let (healthy, _unhealthy) = List.partition (is_connection_healthy pool) pool.connections in 264 265 (* Keep only healthy connections, respecting idle limit *) 266 let idle_connections = List.filter (fun c -> 267 now -. c.last_used > 1.0 (* Idle for more than 1 second *) 268 ) healthy in 269 270 let keep_idle = 271 if List.length idle_connections > pool.config.max_idle_connections then 272 let sorted = List.sort (fun a b -> 273 compare b.last_used a.last_used (* Most recently used first *) 274 ) idle_connections in 275 let rec list_take n = function 276 | [] -> [] 277 | h :: t when n > 0 -> h :: list_take (n - 1) t 278 | _ -> [] 279 in 280 list_take pool.config.max_idle_connections sorted 281 else 282 idle_connections 283 in 284 285 let active_connections = List.filter (fun c -> 286 now -. c.last_used <= 1.0 287 ) healthy in 288 289 pool.connections <- active_connections @ keep_idle; 290 pool.stats <- { 291 pool.stats with 292 total_connections = List.length pool.connections; 293 active_connections = List.length active_connections; 294 idle_connections = List.length keep_idle; 295 } 296 297(** Get pool statistics *) 298let get_stats pool = 299 cleanup_connections pool; 300 pool.stats 301 302(** Close all connections and clean up pool *) 303let close pool = 304 pool.connections <- []; 305 pool.stats <- { 306 pool.stats with 307 total_connections = 0; 308 active_connections = 0; 309 idle_connections = 0; 310 } 311