My agentic slop goes here. Not intended for anyone else!

more

+315 -139
stack/conpool/lib/conpool.ml
···
(** Conpool - Protocol-agnostic TCP/IP connection pooling library for Eio *)
-
(** {1 Public Types} *)
-
type endpoint = {
-
host : string;
-
port : int;
-
}
-
type tls_config = {
-
config : Tls.Config.client;
-
servername : string option;
-
}
-
type config = {
-
max_connections_per_endpoint : int;
-
max_idle_time : float;
-
max_connection_lifetime : float;
-
max_connection_uses : int option;
-
health_check : ([ `Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t -> bool) option;
-
connect_timeout : float option;
-
connect_retry_count : int;
-
connect_retry_delay : float;
-
on_connection_created : (endpoint -> unit) option;
-
on_connection_closed : (endpoint -> unit) option;
-
on_connection_reused : (endpoint -> unit) option;
-
}
-
let default_config = {
-
max_connections_per_endpoint = 10;
-
max_idle_time = 60.0;
-
max_connection_lifetime = 300.0;
-
max_connection_uses = None;
-
health_check = None;
-
connect_timeout = Some 10.0;
-
connect_retry_count = 3;
-
connect_retry_delay = 0.1;
-
on_connection_created = None;
-
on_connection_closed = None;
-
on_connection_reused = None;
-
}
-
type endpoint_stats = {
-
active : int;
-
idle : int;
-
total_created : int;
-
total_reused : int;
-
total_closed : int;
-
errors : int;
-
}
(** {1 Internal Types} *)
-
type connection_metadata = {
-
flow : [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t;
-
created_at : float;
-
mutable last_used : float;
-
mutable use_count : int;
-
endpoint : endpoint;
-
}
-
-
and endpoint_stats_mutable = {
mutable active : int;
mutable idle : int;
mutable total_created : int;
···
mutable errors : int;
}
-
and endpoint_pool = {
-
pool : connection_metadata Eio.Pool.t;
stats : endpoint_stats_mutable;
mutex : Eio.Mutex.t;
}
-
and ('clock, 'net) t = {
sw : Eio.Switch.t;
net : 'net;
clock : 'clock;
-
config : config;
-
tls : tls_config option;
-
endpoints : (endpoint, endpoint_pool) Hashtbl.t;
endpoints_mutex : Eio.Mutex.t;
}
-
type connection = connection_metadata
-
(** {1 Endpoint Hashing and Equality} *)
-
-
module Endpoint = struct
-
type t = endpoint
-
-
let equal e1 e2 =
-
String.equal e1.host e2.host && e1.port = e2.port
-
-
let hash e =
-
Hashtbl.hash (e.host, e.port)
-
end
-
-
module EndpointTbl = Hashtbl.Make(Endpoint)
(** {1 Helper Functions} *)
···
errors = 0;
}
-
let snapshot_stats (stats : endpoint_stats_mutable) : endpoint_stats = {
active = stats.active;
idle = stats.idle;
total_created = stats.total_created;
···
let resolve_endpoint pool endpoint =
(* Simple DNS resolution - returns socket address *)
-
let addrs = Eio.Net.getaddrinfo_stream pool.net endpoint.host ~service:(string_of_int endpoint.port) in
match addrs with
-
| addr :: _ -> addr
| [] ->
-
failwith (Printf.sprintf "Failed to resolve hostname: %s" endpoint.host)
(** {1 Connection Creation with Retry} *)
let rec create_connection_with_retry pool endpoint attempt =
-
if attempt > pool.config.connect_retry_count then
failwith (Printf.sprintf "Failed to connect to %s:%d after %d attempts"
-
endpoint.host endpoint.port pool.config.connect_retry_count);
try
let addr = resolve_endpoint pool endpoint in
(* Connect with optional timeout *)
let socket =
···
Eio.Net.connect ~sw:pool.sw pool.net addr
in
-
(* Wrap with TLS if configured - use coercion to upcast *)
let flow : [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t = match pool.tls with
| None -> (socket :> [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t)
| Some tls_cfg ->
-
let host = match tls_cfg.servername with
| Some name -> Domain_name.(host_exn (of_string_exn name))
-
| None -> Domain_name.(host_exn (of_string_exn endpoint.host))
in
-
let tls_flow = Tls_eio.client_of_flow ~host tls_cfg.config socket in
(tls_flow :> [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t)
in
let now = get_time pool in
{
-
flow;
created_at = now;
last_used = now;
use_count = 0;
···
with
| Eio.Time.Timeout ->
(* Exponential backoff *)
-
let clock = (pool.clock :> float Eio.Time.clock_ty Eio.Resource.t) in
let delay = pool.config.connect_retry_delay *. (2.0 ** float_of_int (attempt - 1)) in
-
Eio.Time.sleep clock delay;
create_connection_with_retry pool endpoint (attempt + 1)
| e ->
(* Other errors - retry with backoff *)
-
Logs.warn (fun m -> m "Connection attempt %d to %s:%d failed: %s"
-
attempt endpoint.host endpoint.port (Printexc.to_string e));
if attempt < pool.config.connect_retry_count then (
-
let clock = (pool.clock :> float Eio.Time.clock_ty Eio.Resource.t) in
let delay = pool.config.connect_retry_delay *. (2.0 ** float_of_int (attempt - 1)) in
-
Eio.Time.sleep clock delay;
create_connection_with_retry pool endpoint (attempt + 1)
) else
raise e
···
let now = get_time pool in
(* Check age *)
-
let age = now -. conn.created_at in
-
if age > pool.config.max_connection_lifetime then
false
(* Check idle time *)
-
else if (now -. conn.last_used) > pool.config.max_idle_time then
false
(* Check use count *)
else if (match pool.config.max_connection_uses with
-
| Some max -> conn.use_count >= max
-
| None -> false) then
false
(* Optional: custom health check *)
else if (match pool.config.health_check with
| Some check ->
-
(try not (check conn.flow)
-
with _ -> true) (* Exception in health check = unhealthy *)
| None -> false) then
false
(* Optional: check if socket still connected *)
else if check_readable then
try
-
(* Try to peek at the connection without consuming data *)
-
(* This is tricky with Eio - we'll use a simple approach *)
-
(* Just check if the flow is still valid *)
-
(* For now, skip this check as it's complex to implement correctly *)
true
with
| _ -> false
-
else
true
(** {1 Internal Pool Operations} *)
let close_internal pool conn =
Eio.Cancel.protect (fun () ->
try
-
Eio.Flow.close conn.flow
with _ -> ()
);
(* Call hook if configured *)
-
Option.iter (fun f -> f conn.endpoint) pool.config.on_connection_closed
let get_or_create_endpoint_pool pool endpoint =
-
Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
-
match Hashtbl.find_opt pool.endpoints endpoint with
-
| Some ep_pool -> ep_pool
-
| None ->
-
(* Need to create - upgrade to write lock *)
-
(* For simplicity, we'll just use a single mutex *)
-
Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () ->
-
(* Check again in case another fiber created it *)
-
match Hashtbl.find_opt pool.endpoints endpoint with
-
| Some ep_pool -> ep_pool
-
| None ->
(* Create new endpoint pool *)
let stats = create_mutable_stats () in
let mutex = Eio.Mutex.create () in
let eio_pool = Eio.Pool.create
pool.config.max_connections_per_endpoint
~validate:(fun conn ->
(* Called before reusing from pool *)
let healthy = is_healthy pool ~check_readable:false conn in
if healthy then (
(* Update stats for reuse *)
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
stats.total_reused <- stats.total_reused + 1
···
(* Run health check if configured *)
match pool.config.health_check with
| Some check ->
-
(try check conn.flow
with _ -> false)
| None -> true
-
) else
false
)
~dispose:(fun conn ->
(* Called when removing from pool *)
···
)
)
(fun () ->
-
(* Factory: create new connection *)
try
let conn = create_connection pool endpoint in
(* Update stats *)
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
···
conn
with e ->
(* Update error stats *)
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
stats.errors <- stats.errors + 1
···
)
in
let ep_pool = {
pool = eio_pool;
stats;
···
} in
Hashtbl.add pool.endpoints endpoint ep_pool;
ep_pool
)
-
)
(** {1 Public API - Pool Creation} *)
-
let create ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock) ?tls ?(config = default_config) () : ('clock Eio.Time.clock, 'net Eio.Net.t) t =
let pool = {
sw;
net;
···
(* Auto-cleanup on switch release *)
Eio.Switch.on_release sw (fun () ->
Eio.Cancel.protect (fun () ->
(* Close all idle connections - active ones will be cleaned up by switch *)
Hashtbl.iter (fun _endpoint _ep_pool ->
(* Connections are bound to the switch and will be auto-closed *)
···
(** {1 Public API - Connection Management} *)
let with_connection (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) endpoint f =
let ep_pool = get_or_create_endpoint_pool pool endpoint in
(* Increment active count *)
···
(* Decrement active count *)
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
ep_pool.stats.active <- ep_pool.stats.active - 1
-
)
)
(fun () ->
(* Use Eio.Pool for resource management *)
Eio.Pool.use ep_pool.pool (fun conn ->
(* Update last used time and use count *)
conn.last_used <- get_time pool;
conn.use_count <- conn.use_count + 1;
···
result
with e ->
(* Error - close connection so it won't be reused *)
close_internal pool conn;
(* Update error stats *)
···
)
)
-
let acquire (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) endpoint =
-
let _ep_pool = get_or_create_endpoint_pool pool endpoint in
-
(* This is more complex - we need to manually manage the pool *)
-
(* For now, we'll use a simplified version that wraps Eio.Pool.use *)
-
(* In practice, this would need custom implementation *)
-
failwith "acquire: manual connection management not yet implemented - use with_connection instead"
let release (_pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) _conn =
failwith "release: manual connection management not yet implemented - use with_connection instead"
···
let close (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) conn =
close_internal pool conn
-
let get_flow conn =
-
conn.flow
-
let validate_and_release (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) conn =
if is_healthy pool conn then
release pool conn
···
| None ->
(* No pool for this endpoint yet *)
{
-
active = 0;
idle = 0;
total_created = 0;
total_reused = 0;
···
(endpoint, stats) :: acc
) pool.endpoints []
)
-
-
let pp_stats fmt (stats : endpoint_stats) =
-
Format.fprintf fmt "@[<v>Active: %d@,Idle: %d@,Created: %d@,Reused: %d@,Closed: %d@,Errors: %d@]"
-
stats.active
-
stats.idle
-
stats.total_created
-
stats.total_reused
-
stats.total_closed
-
stats.errors
(** {1 Public API - Pool Management} *)
···
(** Conpool - Protocol-agnostic TCP/IP connection pooling library for Eio *)
+
let src = Logs.Src.create "conpool" ~doc:"Connection pooling library"
+
module Log = (val Logs.src_log src : Logs.LOG)
+
+
(** {1 Public Module Types} *)
+
+
module Endpoint = struct
+
type t = {
+
host : string;
+
port : int;
+
}
+
+
let make ~host ~port = { host; port }
+
+
let host t = t.host
+
let port t = t.port
+
+
let pp fmt t =
+
Format.fprintf fmt "%s:%d" t.host t.port
+
+
let equal t1 t2 =
+
String.equal t1.host t2.host && t1.port = t2.port
+
+
let hash t =
+
Hashtbl.hash (t.host, t.port)
+
end
+
+
module Tls_config = struct
+
type t = {
+
config : Tls.Config.client;
+
servername : string option;
+
}
+
+
let make ~config ?servername () = { config; servername }
+
+
let config t = t.config
+
let servername t = t.servername
+
+
let pp fmt t =
+
Format.fprintf fmt "TLS(servername=%s)"
+
(match t.servername with Some s -> s | None -> "<default>")
+
end
+
+
module Connection = struct
+
type t = {
+
flow : [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t;
+
created_at : float;
+
mutable last_used : float;
+
mutable use_count : int;
+
endpoint : Endpoint.t;
+
}
+
+
let get_flow t = t.flow
+
let endpoint t = t.endpoint
+
let created_at t = t.created_at
+
let last_used t = t.last_used
+
let use_count t = t.use_count
+
let pp fmt t =
+
Format.fprintf fmt "Connection(endpoint=%a, created=%.2f, last_used=%.2f, uses=%d)"
+
Endpoint.pp t.endpoint
+
t.created_at
+
t.last_used
+
t.use_count
+
end
+
+
module Config = struct
+
type t = {
+
max_connections_per_endpoint : int;
+
max_idle_time : float;
+
max_connection_lifetime : float;
+
max_connection_uses : int option;
+
health_check : ([`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t -> bool) option;
+
connect_timeout : float option;
+
connect_retry_count : int;
+
connect_retry_delay : float;
+
on_connection_created : (Endpoint.t -> unit) option;
+
on_connection_closed : (Endpoint.t -> unit) option;
+
on_connection_reused : (Endpoint.t -> unit) option;
+
}
+
+
let make
+
?(max_connections_per_endpoint = 10)
+
?(max_idle_time = 60.0)
+
?(max_connection_lifetime = 300.0)
+
?max_connection_uses
+
?health_check
+
?(connect_timeout = 10.0)
+
?(connect_retry_count = 3)
+
?(connect_retry_delay = 0.1)
+
?on_connection_created
+
?on_connection_closed
+
?on_connection_reused
+
() =
+
{
+
max_connections_per_endpoint;
+
max_idle_time;
+
max_connection_lifetime;
+
max_connection_uses;
+
health_check;
+
connect_timeout = Some connect_timeout;
+
connect_retry_count;
+
connect_retry_delay;
+
on_connection_created;
+
on_connection_closed;
+
on_connection_reused;
+
}
+
+
let default = make ()
+
+
let max_connections_per_endpoint t = t.max_connections_per_endpoint
+
let max_idle_time t = t.max_idle_time
+
let max_connection_lifetime t = t.max_connection_lifetime
+
let max_connection_uses t = t.max_connection_uses
+
let health_check t = t.health_check
+
let connect_timeout t = t.connect_timeout
+
let connect_retry_count t = t.connect_retry_count
+
let connect_retry_delay t = t.connect_retry_delay
+
let pp fmt t =
+
Format.fprintf fmt
+
"@[<v>Config:@,\
+
- max_connections_per_endpoint: %d@,\
+
- max_idle_time: %.1fs@,\
+
- max_connection_lifetime: %.1fs@,\
+
- max_connection_uses: %s@,\
+
- connect_timeout: %s@,\
+
- connect_retry_count: %d@,\
+
- connect_retry_delay: %.2fs@]"
+
t.max_connections_per_endpoint
+
t.max_idle_time
+
t.max_connection_lifetime
+
(match t.max_connection_uses with Some n -> string_of_int n | None -> "unlimited")
+
(match t.connect_timeout with Some f -> Printf.sprintf "%.1fs" f | None -> "none")
+
t.connect_retry_count
+
t.connect_retry_delay
+
end
+
module Stats = struct
+
type t = {
+
active : int;
+
idle : int;
+
total_created : int;
+
total_reused : int;
+
total_closed : int;
+
errors : int;
+
}
+
let active t = t.active
+
let idle t = t.idle
+
let total_created t = t.total_created
+
let total_reused t = t.total_reused
+
let total_closed t = t.total_closed
+
let errors t = t.errors
+
let pp fmt t =
+
Format.fprintf fmt
+
"@[<v>Stats:@,\
+
- Active: %d@,\
+
- Idle: %d@,\
+
- Created: %d@,\
+
- Reused: %d@,\
+
- Closed: %d@,\
+
- Errors: %d@]"
+
t.active
+
t.idle
+
t.total_created
+
t.total_reused
+
t.total_closed
+
t.errors
+
end
(** {1 Internal Types} *)
+
type endpoint_stats_mutable = {
mutable active : int;
mutable idle : int;
mutable total_created : int;
···
mutable errors : int;
}
+
type endpoint_pool = {
+
pool : Connection.t Eio.Pool.t;
stats : endpoint_stats_mutable;
mutex : Eio.Mutex.t;
}
+
type ('clock, 'net) t = {
sw : Eio.Switch.t;
net : 'net;
clock : 'clock;
+
config : Config.t;
+
tls : Tls_config.t option;
+
endpoints : (Endpoint.t, endpoint_pool) Hashtbl.t;
endpoints_mutex : Eio.Mutex.t;
}
+
(** {1 Endpoint Hashing} *)
+
module EndpointTbl = Hashtbl.Make(struct
+
type t = Endpoint.t
+
let equal = Endpoint.equal
+
let hash = Endpoint.hash
+
end)
(** {1 Helper Functions} *)
···
errors = 0;
}
+
let snapshot_stats (stats : endpoint_stats_mutable) : Stats.t = {
active = stats.active;
idle = stats.idle;
total_created = stats.total_created;
···
let resolve_endpoint pool endpoint =
(* Simple DNS resolution - returns socket address *)
+
Log.debug (fun m -> m "Resolving %a..." Endpoint.pp endpoint);
+
let addrs = Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint) ~service:(string_of_int (Endpoint.port endpoint)) in
+
Log.debug (fun m -> m "Got address list for %a" Endpoint.pp endpoint);
match addrs with
+
| addr :: _ ->
+
Log.debug (fun m -> m "Resolved %a to %a"
+
Endpoint.pp endpoint Eio.Net.Sockaddr.pp addr);
+
addr
| [] ->
+
Log.err (fun m -> m "Failed to resolve hostname: %s" (Endpoint.host endpoint));
+
failwith (Printf.sprintf "Failed to resolve hostname: %s" (Endpoint.host endpoint))
(** {1 Connection Creation with Retry} *)
let rec create_connection_with_retry pool endpoint attempt =
+
if attempt > pool.config.connect_retry_count then begin
+
Log.err (fun m -> m "Failed to connect to %a after %d attempts"
+
Endpoint.pp endpoint pool.config.connect_retry_count);
failwith (Printf.sprintf "Failed to connect to %s:%d after %d attempts"
+
(Endpoint.host endpoint) (Endpoint.port endpoint) pool.config.connect_retry_count)
+
end;
+
+
Log.debug (fun m -> m "Connecting to %a (attempt %d/%d)"
+
Endpoint.pp endpoint attempt pool.config.connect_retry_count);
try
let addr = resolve_endpoint pool endpoint in
+
Log.debug (fun m -> m "Resolved %a to address" Endpoint.pp endpoint);
(* Connect with optional timeout *)
let socket =
···
Eio.Net.connect ~sw:pool.sw pool.net addr
in
+
Log.debug (fun m -> m "TCP connection established to %a" Endpoint.pp endpoint);
+
let flow : [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t = match pool.tls with
| None -> (socket :> [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t)
| Some tls_cfg ->
+
Log.debug (fun m -> m "Initiating TLS handshake with %a" Endpoint.pp endpoint);
+
let host = match Tls_config.servername tls_cfg with
| Some name -> Domain_name.(host_exn (of_string_exn name))
+
| None -> Domain_name.(host_exn (of_string_exn (Endpoint.host endpoint)))
in
+
let tls_flow = Tls_eio.client_of_flow ~host (Tls_config.config tls_cfg) socket in
+
Log.info (fun m -> m "TLS connection established to %a" Endpoint.pp endpoint);
(tls_flow :> [`Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t)
in
let now = get_time pool in
+
Log.info (fun m -> m "Connection created to %a" Endpoint.pp endpoint);
{
+
Connection.flow;
created_at = now;
last_used = now;
use_count = 0;
···
with
| Eio.Time.Timeout ->
+
Log.warn (fun m -> m "Connection timeout to %a (attempt %d)" Endpoint.pp endpoint attempt);
(* Exponential backoff *)
let delay = pool.config.connect_retry_delay *. (2.0 ** float_of_int (attempt - 1)) in
+
Eio.Time.sleep pool.clock delay;
create_connection_with_retry pool endpoint (attempt + 1)
| e ->
(* Other errors - retry with backoff *)
+
Log.warn (fun m -> m "Connection attempt %d to %a failed: %s"
+
attempt Endpoint.pp endpoint (Printexc.to_string e));
if attempt < pool.config.connect_retry_count then (
let delay = pool.config.connect_retry_delay *. (2.0 ** float_of_int (attempt - 1)) in
+
Eio.Time.sleep pool.clock delay;
create_connection_with_retry pool endpoint (attempt + 1)
) else
raise e
···
let now = get_time pool in
(* Check age *)
+
let age = now -. Connection.created_at conn in
+
if age > pool.config.max_connection_lifetime then begin
+
Log.debug (fun m -> m "Connection to %a unhealthy: exceeded max lifetime (%.2fs > %.2fs)"
+
Endpoint.pp (Connection.endpoint conn) age pool.config.max_connection_lifetime);
false
+
end
(* Check idle time *)
+
else if (now -. Connection.last_used conn) > pool.config.max_idle_time then begin
+
let idle_time = now -. Connection.last_used conn in
+
Log.debug (fun m -> m "Connection to %a unhealthy: exceeded max idle time (%.2fs > %.2fs)"
+
Endpoint.pp (Connection.endpoint conn) idle_time pool.config.max_idle_time);
false
+
end
(* Check use count *)
else if (match pool.config.max_connection_uses with
+
| Some max -> Connection.use_count conn >= max
+
| None -> false) then begin
+
Log.debug (fun m -> m "Connection to %a unhealthy: exceeded max use count (%d)"
+
Endpoint.pp (Connection.endpoint conn) (Connection.use_count conn));
false
+
end
(* Optional: custom health check *)
else if (match pool.config.health_check with
| Some check ->
+
(try
+
let healthy = check (Connection.get_flow conn) in
+
if not healthy then
+
Log.debug (fun m -> m "Connection to %a failed custom health check"
+
Endpoint.pp (Connection.endpoint conn));
+
not healthy
+
with e ->
+
Log.debug (fun m -> m "Connection to %a health check raised exception: %s"
+
Endpoint.pp (Connection.endpoint conn) (Printexc.to_string e));
+
true) (* Exception in health check = unhealthy *)
| None -> false) then
false
(* Optional: check if socket still connected *)
else if check_readable then
try
+
(* TODO avsm: a sockopt for this? *)
true
with
| _ -> false
+
else begin
+
Log.debug (fun m -> m "Connection to %a is healthy (age=%.2fs, idle=%.2fs, uses=%d)"
+
Endpoint.pp (Connection.endpoint conn)
+
age
+
(now -. Connection.last_used conn)
+
(Connection.use_count conn));
true
+
end
(** {1 Internal Pool Operations} *)
let close_internal pool conn =
+
Log.debug (fun m -> m "Closing connection to %a (age=%.2fs, uses=%d)"
+
Endpoint.pp (Connection.endpoint conn)
+
(get_time pool -. Connection.created_at conn)
+
(Connection.use_count conn));
+
Eio.Cancel.protect (fun () ->
try
+
Eio.Flow.close (Connection.get_flow conn)
with _ -> ()
);
(* Call hook if configured *)
+
Option.iter (fun f -> f (Connection.endpoint conn)) pool.config.on_connection_closed
let get_or_create_endpoint_pool pool endpoint =
+
Log.debug (fun m -> m "Getting or creating endpoint pool for %a" Endpoint.pp endpoint);
+
+
(* First try with read lock *)
+
match Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
+
Hashtbl.find_opt pool.endpoints endpoint
+
) with
+
| Some ep_pool ->
+
Log.debug (fun m -> m "Found existing endpoint pool for %a" Endpoint.pp endpoint);
+
ep_pool
+
| None ->
+
Log.debug (fun m -> m "No existing pool, need to create for %a" Endpoint.pp endpoint);
+
(* Need to create - use write lock *)
+
Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () ->
+
(* Check again in case another fiber created it *)
+
match Hashtbl.find_opt pool.endpoints endpoint with
+
| Some ep_pool ->
+
Log.debug (fun m -> m "Another fiber created pool for %a" Endpoint.pp endpoint);
+
ep_pool
+
| None ->
(* Create new endpoint pool *)
let stats = create_mutable_stats () in
let mutex = Eio.Mutex.create () in
+
Log.info (fun m -> m "Creating new endpoint pool for %a (max_connections=%d)"
+
Endpoint.pp endpoint pool.config.max_connections_per_endpoint);
+
+
Log.debug (fun m -> m "About to create Eio.Pool for %a" Endpoint.pp endpoint);
+
let eio_pool = Eio.Pool.create
pool.config.max_connections_per_endpoint
~validate:(fun conn ->
+
Log.debug (fun m -> m "Validate called for connection to %a" Endpoint.pp endpoint);
(* Called before reusing from pool *)
let healthy = is_healthy pool ~check_readable:false conn in
if healthy then (
+
Log.debug (fun m -> m "Reusing connection to %a from pool" Endpoint.pp endpoint);
+
(* Update stats for reuse *)
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
stats.total_reused <- stats.total_reused + 1
···
(* Run health check if configured *)
match pool.config.health_check with
| Some check ->
+
(try check (Connection.get_flow conn)
with _ -> false)
| None -> true
+
) else begin
+
Log.debug (fun m -> m "Connection to %a failed validation, creating new one" Endpoint.pp endpoint);
false
+
end
)
~dispose:(fun conn ->
(* Called when removing from pool *)
···
)
)
(fun () ->
+
Log.debug (fun m -> m "Factory function called for %a" Endpoint.pp endpoint);
try
let conn = create_connection pool endpoint in
+
+
Log.debug (fun m -> m "Connection created successfully for %a" Endpoint.pp endpoint);
(* Update stats *)
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
···
conn
with e ->
+
Log.err (fun m -> m "Factory function failed for %a: %s"
+
Endpoint.pp endpoint (Printexc.to_string e));
(* Update error stats *)
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
stats.errors <- stats.errors + 1
···
)
in
+
Log.debug (fun m -> m "Eio.Pool created successfully for %a" Endpoint.pp endpoint);
+
let ep_pool = {
pool = eio_pool;
stats;
···
} in
Hashtbl.add pool.endpoints endpoint ep_pool;
+
Log.debug (fun m -> m "Endpoint pool added to hashtable for %a" Endpoint.pp endpoint);
ep_pool
)
(** {1 Public API - Pool Creation} *)
+
let create ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock) ?tls ?(config = Config.default) () : ('clock Eio.Time.clock, 'net Eio.Net.t) t =
+
Log.info (fun m -> m "Creating new connection pool (max_per_endpoint=%d, max_idle=%.1fs, max_lifetime=%.1fs)"
+
config.max_connections_per_endpoint
+
config.max_idle_time
+
config.max_connection_lifetime);
+
let pool = {
sw;
net;
···
(* Auto-cleanup on switch release *)
Eio.Switch.on_release sw (fun () ->
Eio.Cancel.protect (fun () ->
+
Log.info (fun m -> m "Closing connection pool");
(* Close all idle connections - active ones will be cleaned up by switch *)
Hashtbl.iter (fun _endpoint _ep_pool ->
(* Connections are bound to the switch and will be auto-closed *)
···
(** {1 Public API - Connection Management} *)
let with_connection (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) endpoint f =
+
Log.debug (fun m -> m "Acquiring connection to %a" Endpoint.pp endpoint);
let ep_pool = get_or_create_endpoint_pool pool endpoint in
(* Increment active count *)
···
(* Decrement active count *)
Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () ->
ep_pool.stats.active <- ep_pool.stats.active - 1
+
);
+
Log.debug (fun m -> m "Released connection to %a" Endpoint.pp endpoint)
)
(fun () ->
(* Use Eio.Pool for resource management *)
Eio.Pool.use ep_pool.pool (fun conn ->
+
Log.debug (fun m -> m "Using connection to %a (uses=%d)"
+
Endpoint.pp endpoint (Connection.use_count conn));
+
(* Update last used time and use count *)
conn.last_used <- get_time pool;
conn.use_count <- conn.use_count + 1;
···
result
with e ->
(* Error - close connection so it won't be reused *)
+
Log.warn (fun m -> m "Error using connection to %a: %s"
+
Endpoint.pp endpoint (Printexc.to_string e));
close_internal pool conn;
(* Update error stats *)
···
)
)
+
let acquire ~sw (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) endpoint =
+
(* Create a connection bound to the provided switch *)
+
let conn = create_connection pool endpoint in
+
(* Register cleanup on switch *)
+
Eio.Switch.on_release sw (fun () ->
+
close_internal pool conn
+
);
+
+
conn
let release (_pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) _conn =
failwith "release: manual connection management not yet implemented - use with_connection instead"
···
let close (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) conn =
close_internal pool conn
let validate_and_release (pool : ('clock Eio.Time.clock, 'net Eio.Net.t) t) conn =
if is_healthy pool conn then
release pool conn
···
| None ->
(* No pool for this endpoint yet *)
{
+
Stats.active = 0;
idle = 0;
total_created = 0;
total_reused = 0;
···
(endpoint, stats) :: acc
) pool.endpoints []
)
(** {1 Public API - Pool Management} *)
+159 -82
stack/conpool/lib/conpool.mli
···
(** Conpool - Protocol-agnostic TCP/IP connection pooling library for Eio *)
(** {1 Core Types} *)
-
type endpoint = {
-
host : string;
-
port : int;
-
}
-
(** Network endpoint identified by host and port.
-
TLS configuration is per-pool, not per-endpoint. *)
-
type tls_config = {
-
config : Tls.Config.client;
-
(** TLS client configuration *)
-
servername : string option;
-
(** Optional SNI server name override. If None, uses endpoint.host *)
-
}
-
(** TLS configuration applied to all connections in a pool *)
-
type connection
-
(** Opaque connection handle with metadata *)
-
type ('clock, 'net) t
-
(** Connection pool managing multiple endpoints, parameterized by clock and network types *)
-
type config = {
-
max_connections_per_endpoint : int;
-
(** Maximum connections per (host, port) endpoint. Default: 10 *)
-
max_idle_time : float;
-
(** Maximum time (seconds) a connection can be idle before closure. Default: 60.0 *)
-
max_connection_lifetime : float;
-
(** Maximum lifetime (seconds) of any connection. Default: 300.0 *)
-
max_connection_uses : int option;
-
(** Maximum number of times a connection can be reused. None = unlimited. Default: None *)
-
health_check : ([ `Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t -> bool) option;
-
(** Optional health check function. Called before reusing idle connection. Default: None *)
-
connect_timeout : float option;
-
(** Timeout for establishing new connections. Default: Some 10.0 *)
-
connect_retry_count : int;
-
(** Number of times to retry connection on failure. Default: 3 *)
-
connect_retry_delay : float;
-
(** Initial delay between retries in seconds, uses exponential backoff. Default: 0.1 *)
-
on_connection_created : (endpoint -> unit) option;
-
(** Hook called when new connection created. Default: None *)
-
on_connection_closed : (endpoint -> unit) option;
-
(** Hook called when connection closed. Default: None *)
-
on_connection_reused : (endpoint -> unit) option;
-
(** Hook called when connection reused from pool. Default: None *)
-
}
(** Pool configuration *)
-
val default_config : config
-
(** Sensible defaults for most use cases *)
-
(** {1 Pool Creation} *)
val create :
sw:Eio.Switch.t ->
net:'net Eio.Net.t ->
clock:'clock Eio.Time.clock ->
-
?tls:tls_config ->
-
?config:config ->
unit -> ('clock Eio.Time.clock, 'net Eio.Net.t) t
(** Create connection pool bound to switch.
All connections will be closed when switch is released.
···
@param net Network interface for creating connections
@param clock Clock for timeouts and time-based validation
@param tls Optional TLS configuration applied to all connections
-
@param config Optional pool configuration (uses default_config if not provided) *)
(** {1 Connection Acquisition & Release} *)
val with_connection :
('clock Eio.Time.clock, 'net Eio.Net.t) t ->
-
endpoint ->
([ `Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t -> 'a) ->
'a
(** Acquire connection, use it, automatically release.
···
Example:
{[
Conpool.with_connection pool endpoint (fun conn ->
(* Use conn for HTTP request, Redis command, etc. *)
Eio.Flow.copy_string "GET / HTTP/1.1\r\n\r\n" conn;
···
*)
val acquire :
('clock Eio.Time.clock, 'net Eio.Net.t) t ->
-
endpoint ->
-
connection
-
(** Manually acquire connection. Must call [release] or [close] later.
-
Use [with_connection] instead unless you need explicit control. *)
val release :
('clock Eio.Time.clock, 'net Eio.Net.t) t ->
-
connection ->
unit
(** Return connection to pool. Connection must be in clean state.
If connection is unhealthy, call [close] instead. *)
val close :
('clock Eio.Time.clock, 'net Eio.Net.t) t ->
-
connection ->
unit
(** Close connection immediately, remove from pool. *)
-
-
val get_flow :
-
connection ->
-
[ `Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t
-
(** Extract underlying Eio flow from connection. *)
(** {1 Connection Validation} *)
val is_healthy :
('clock Eio.Time.clock, 'net Eio.Net.t) t ->
?check_readable:bool ->
-
connection ->
bool
(** Check if connection is healthy.
···
val validate_and_release :
('clock Eio.Time.clock, 'net Eio.Net.t) t ->
-
connection ->
unit
(** Validate connection health, then release to pool if healthy or close if not.
Equivalent to: if is_healthy pool conn then release pool conn else close pool conn *)
(** {1 Statistics & Monitoring} *)
-
type endpoint_stats = {
-
active : int; (** Connections currently in use *)
-
idle : int; (** Connections in pool waiting to be reused *)
-
total_created : int; (** Total connections created (lifetime) *)
-
total_reused : int; (** Total times connections were reused *)
-
total_closed : int; (** Total connections closed *)
-
errors : int; (** Total connection errors *)
-
}
-
(** Statistics for a specific endpoint *)
-
val stats :
('clock Eio.Time.clock, 'net Eio.Net.t) t ->
-
endpoint ->
-
endpoint_stats
(** Get statistics for specific endpoint *)
val all_stats :
('clock Eio.Time.clock, 'net Eio.Net.t) t ->
-
(endpoint * endpoint_stats) list
(** Get statistics for all endpoints in pool *)
-
val pp_stats :
-
Format.formatter ->
-
endpoint_stats ->
-
unit
-
(** Pretty-print endpoint statistics *)
-
(** {1 Pool Management} *)
val close_idle_connections :
('clock Eio.Time.clock, 'net Eio.Net.t) t ->
-
endpoint ->
unit
(** Close all idle connections for endpoint (keeps active ones) *)
val close_all_connections :
('clock Eio.Time.clock, 'net Eio.Net.t) t ->
-
endpoint ->
unit
(** Close all connections for endpoint (blocks until active ones released) *)
···
(** Conpool - Protocol-agnostic TCP/IP connection pooling library for Eio *)
+
(** {1 Logging} *)
+
+
val src : Logs.Src.t
+
(** Logs source for conpool. Configure logging with:
+
{[
+
Logs.Src.set_level Conpool.src (Some Logs.Debug);
+
Logs.set_reporter (Logs_fmt.reporter ());
+
]}
+
*)
+
(** {1 Core Types} *)
+
(** Network endpoint *)
+
module Endpoint : sig
+
type t
+
(** Network endpoint identified by host and port *)
+
+
val make : host:string -> port:int -> t
+
(** Create an endpoint *)
+
+
val host : t -> string
+
(** Get the hostname *)
+
+
val port : t -> int
+
(** Get the port number *)
+
+
val pp : Format.formatter -> t -> unit
+
(** Pretty-print an endpoint *)
+
val equal : t -> t -> bool
+
(** Compare two endpoints for equality *)
+
val hash : t -> int
+
(** Hash an endpoint *)
+
end
+
(** TLS configuration *)
+
module Tls_config : sig
+
type t
+
(** TLS configuration applied to all connections in a pool *)
+
val make : config:Tls.Config.client -> ?servername:string -> unit -> t
+
(** Create TLS configuration.
+
@param config TLS client configuration
+
@param servername Optional SNI server name override. If None, uses endpoint host *)
+
val config : t -> Tls.Config.client
+
(** Get the TLS client configuration *)
+
val servername : t -> string option
+
(** Get the SNI server name override *)
+
val pp : Format.formatter -> t -> unit
+
(** Pretty-print TLS configuration *)
+
end
+
(** Connection handle *)
+
module Connection : sig
+
type t
+
(** Opaque connection handle with metadata *)
+
val get_flow : t -> [ `Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t
+
(** Extract underlying Eio flow from connection *)
+
val endpoint : t -> Endpoint.t
+
(** Get the endpoint this connection is connected to *)
+
val created_at : t -> float
+
(** Get the creation timestamp *)
+
val last_used : t -> float
+
(** Get the last used timestamp *)
+
val use_count : t -> int
+
(** Get the number of times this connection has been used *)
+
val pp : Format.formatter -> t -> unit
+
(** Pretty-print connection metadata *)
+
end
(** Pool configuration *)
+
module Config : sig
+
type t
+
(** Pool configuration *)
+
val make :
+
?max_connections_per_endpoint:int ->
+
?max_idle_time:float ->
+
?max_connection_lifetime:float ->
+
?max_connection_uses:int ->
+
?health_check:([ `Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t -> bool) ->
+
?connect_timeout:float ->
+
?connect_retry_count:int ->
+
?connect_retry_delay:float ->
+
?on_connection_created:(Endpoint.t -> unit) ->
+
?on_connection_closed:(Endpoint.t -> unit) ->
+
?on_connection_reused:(Endpoint.t -> unit) ->
+
unit -> t
+
(** Create pool configuration with optional parameters.
+
See field descriptions for defaults. *)
+
val default : t
+
(** Sensible defaults for most use cases:
+
- max_connections_per_endpoint: 10
+
- max_idle_time: 60.0s
+
- max_connection_lifetime: 300.0s
+
- max_connection_uses: None (unlimited)
+
- health_check: None
+
- connect_timeout: 10.0s
+
- connect_retry_count: 3
+
- connect_retry_delay: 0.1s
+
- hooks: None *)
+
+
val max_connections_per_endpoint : t -> int
+
val max_idle_time : t -> float
+
val max_connection_lifetime : t -> float
+
val max_connection_uses : t -> int option
+
val health_check : t -> ([ `Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t -> bool) option
+
val connect_timeout : t -> float option
+
val connect_retry_count : t -> int
+
val connect_retry_delay : t -> float
+
+
val pp : Format.formatter -> t -> unit
+
(** Pretty-print configuration *)
+
end
+
+
(** Statistics for an endpoint *)
+
module Stats : sig
+
type t
+
(** Statistics for a specific endpoint *)
+
+
val active : t -> int
+
(** Connections currently in use *)
+
+
val idle : t -> int
+
(** Connections in pool waiting to be reused *)
+
+
val total_created : t -> int
+
(** Total connections created (lifetime) *)
+
+
val total_reused : t -> int
+
(** Total times connections were reused *)
+
+
val total_closed : t -> int
+
(** Total connections closed *)
+
+
val errors : t -> int
+
(** Total connection errors *)
+
+
val pp : Format.formatter -> t -> unit
+
(** Pretty-print endpoint statistics *)
+
end
+
+
(** {1 Connection Pool} *)
+
+
type ('clock, 'net) t
+
(** Connection pool managing multiple endpoints, parameterized by clock and network types *)
val create :
sw:Eio.Switch.t ->
net:'net Eio.Net.t ->
clock:'clock Eio.Time.clock ->
+
?tls:Tls_config.t ->
+
?config:Config.t ->
unit -> ('clock Eio.Time.clock, 'net Eio.Net.t) t
(** Create connection pool bound to switch.
All connections will be closed when switch is released.
···
@param net Network interface for creating connections
@param clock Clock for timeouts and time-based validation
@param tls Optional TLS configuration applied to all connections
+
@param config Optional pool configuration (uses Config.default if not provided) *)
(** {1 Connection Acquisition & Release} *)
val with_connection :
('clock Eio.Time.clock, 'net Eio.Net.t) t ->
+
Endpoint.t ->
([ `Close | `Flow | `R | `Shutdown | `W] Eio.Resource.t -> 'a) ->
'a
(** Acquire connection, use it, automatically release.
···
Example:
{[
+
let endpoint = Conpool.Endpoint.make ~host:"example.com" ~port:443 in
Conpool.with_connection pool endpoint (fun conn ->
(* Use conn for HTTP request, Redis command, etc. *)
Eio.Flow.copy_string "GET / HTTP/1.1\r\n\r\n" conn;
···
*)
val acquire :
+
sw:Eio.Switch.t ->
('clock Eio.Time.clock, 'net Eio.Net.t) t ->
+
Endpoint.t ->
+
Connection.t
+
(** Manually acquire connection bound to switch. Must call [release] or [close] later.
+
Use [with_connection] instead unless you need explicit control.
+
+
The connection is bound to the provided switch and will be automatically
+
closed (but not released back to pool) when the switch finishes. *)
val release :
('clock Eio.Time.clock, 'net Eio.Net.t) t ->
+
Connection.t ->
unit
(** Return connection to pool. Connection must be in clean state.
If connection is unhealthy, call [close] instead. *)
val close :
('clock Eio.Time.clock, 'net Eio.Net.t) t ->
+
Connection.t ->
unit
(** Close connection immediately, remove from pool. *)
(** {1 Connection Validation} *)
val is_healthy :
('clock Eio.Time.clock, 'net Eio.Net.t) t ->
?check_readable:bool ->
+
Connection.t ->
bool
(** Check if connection is healthy.
···
val validate_and_release :
('clock Eio.Time.clock, 'net Eio.Net.t) t ->
+
Connection.t ->
unit
(** Validate connection health, then release to pool if healthy or close if not.
Equivalent to: if is_healthy pool conn then release pool conn else close pool conn *)
(** {1 Statistics & Monitoring} *)
val stats :
('clock Eio.Time.clock, 'net Eio.Net.t) t ->
+
Endpoint.t ->
+
Stats.t
(** Get statistics for specific endpoint *)
val all_stats :
('clock Eio.Time.clock, 'net Eio.Net.t) t ->
+
(Endpoint.t * Stats.t) list
(** Get statistics for all endpoints in pool *)
(** {1 Pool Management} *)
val close_idle_connections :
('clock Eio.Time.clock, 'net Eio.Net.t) t ->
+
Endpoint.t ->
unit
(** Close all idle connections for endpoint (keeps active ones) *)
val close_all_connections :
('clock Eio.Time.clock, 'net Eio.Net.t) t ->
+
Endpoint.t ->
unit
(** Close all connections for endpoint (blocks until active ones released) *)
+7
stack/conpool/test/dune
···
···
+
(executable
+
(name test_localhost)
+
(libraries conpool eio_main logs logs.fmt))
+
+
(executable
+
(name test_simple)
+
(libraries conpool eio_main logs logs.fmt))
+211
stack/conpool/test/test_localhost.ml
···
···
+
(** Test conpool with 16 localhost servers on different 127.0.* addresses *)
+
+
open Eio.Std
+
+
(** Create a simple echo server on a specific address and port *)
+
let create_server ~sw ~net ipaddr port connections_ref =
+
let socket = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:10
+
(`Tcp (ipaddr, port))
+
in
+
+
Eio.Fiber.fork ~sw (fun () ->
+
try
+
while true do
+
Eio.Net.accept_fork socket ~sw ~on_error:(fun ex ->
+
traceln "Server %a error: %s" Eio.Net.Sockaddr.pp (`Tcp (ipaddr, port))
+
(Printexc.to_string ex)
+
) (fun flow _addr ->
+
(* Track this connection *)
+
Atomic.incr connections_ref;
+
+
(* Simple protocol: read lines and echo them back, until EOF *)
+
try
+
let buf = Eio.Buf_read.of_flow flow ~max_size:1024 in
+
while true do
+
let line = Eio.Buf_read.line buf in
+
traceln "Server on %a:%d received: %s"
+
Eio.Net.Ipaddr.pp ipaddr port line;
+
+
Eio.Flow.copy_string (line ^ "\n") flow
+
done
+
with
+
| End_of_file ->
+
traceln "Server on %a:%d client disconnected"
+
Eio.Net.Ipaddr.pp ipaddr port;
+
Eio.Flow.close flow;
+
Atomic.decr connections_ref
+
| ex ->
+
traceln "Server on %a:%d error handling connection: %s"
+
Eio.Net.Ipaddr.pp ipaddr port
+
(Printexc.to_string ex);
+
Eio.Flow.close flow;
+
Atomic.decr connections_ref
+
)
+
done
+
with Eio.Cancel.Cancelled _ -> ()
+
)
+
+
(** Generate 16 different servers on 127.0.0.1 with different ports *)
+
let generate_localhost_addresses () =
+
List.init 16 (fun i ->
+
(* Use 127.0.0.1 for all, just different ports *)
+
let addr_str = "127.0.0.1" in
+
(* Create raw IPv4 address as 4 bytes *)
+
let raw_bytes = Bytes.create 4 in
+
Bytes.set raw_bytes 0 (Char.chr 127);
+
Bytes.set raw_bytes 1 (Char.chr 0);
+
Bytes.set raw_bytes 2 (Char.chr 0);
+
Bytes.set raw_bytes 3 (Char.chr 1);
+
let addr = Eio.Net.Ipaddr.of_raw (Bytes.to_string raw_bytes) in
+
(addr_str, addr, 10000 + i)
+
)
+
+
let () =
+
(* Setup logging *)
+
Logs.set_reporter (Logs_fmt.reporter ());
+
Logs.set_level (Some Logs.Info);
+
Logs.Src.set_level Conpool.src (Some Logs.Debug);
+
+
Eio_main.run @@ fun env ->
+
Switch.run @@ fun sw ->
+
+
traceln "=== Starting 16 localhost servers ===";
+
+
(* Generate addresses *)
+
let servers = generate_localhost_addresses () in
+
+
(* Create connection counters for each server *)
+
let connection_refs = List.map (fun _ -> Atomic.make 0) servers in
+
+
(* Start all servers *)
+
List.iter2 (fun (_addr_str, addr, port) conn_ref ->
+
traceln "Starting server on %a:%d"
+
Eio.Net.Ipaddr.pp addr port;
+
create_server ~sw ~net:env#net addr port conn_ref
+
) servers connection_refs;
+
+
(* Give servers time to start *)
+
Eio.Time.sleep env#clock 0.5;
+
+
traceln "\n=== Creating connection pool ===";
+
+
(* Create connection pool *)
+
let pool_config = Conpool.Config.make
+
~max_connections_per_endpoint:5
+
~max_idle_time:30.0
+
~max_connection_lifetime:60.0
+
()
+
in
+
+
let pool = Conpool.create
+
~sw
+
~net:env#net
+
~clock:env#clock
+
~config:pool_config
+
()
+
in
+
+
traceln "\n=== Stress testing with thousands of concurrent connections ===";
+
+
(* Disable debug logging for stress test *)
+
Logs.Src.set_level Conpool.src (Some Logs.Info);
+
+
(* Create endpoints for all servers *)
+
let endpoints = List.map (fun (addr_str, _addr, port) ->
+
Conpool.Endpoint.make ~host:addr_str ~port
+
) servers in
+
+
(* Stress test: thousands of concurrent requests across all 16 servers *)
+
let num_requests = 50000 in
+
+
traceln "Launching %d concurrent requests across %d endpoints..."
+
num_requests (List.length endpoints);
+
traceln "Pool config: max %d connections per endpoint"
+
(Conpool.Config.max_connections_per_endpoint pool_config);
+
+
let start_time = Unix.gettimeofday () in
+
let success_count = Atomic.make 0 in
+
let error_count = Atomic.make 0 in
+
let last_progress = ref 0 in
+
+
(* Generate list of (endpoint, request_id) pairs *)
+
let tasks = List.init num_requests (fun i ->
+
let endpoint = List.nth endpoints (i mod List.length endpoints) in
+
(endpoint, i)
+
) in
+
+
(* Run all requests concurrently with fiber limit *)
+
Eio.Fiber.List.iter ~max_fibers:200 (fun (endpoint, req_id) ->
+
try
+
Conpool.with_connection pool endpoint (fun flow ->
+
let test_msg = Printf.sprintf "Request %d" req_id in
+
Eio.Flow.copy_string (test_msg ^ "\n") flow;
+
+
let buf = Eio.Buf_read.of_flow flow ~max_size:1024 in
+
let _response = Eio.Buf_read.line buf in
+
let count = Atomic.fetch_and_add success_count 1 + 1 in
+
+
(* Progress indicator every 5000 requests *)
+
if count / 5000 > !last_progress then begin
+
last_progress := count / 5000;
+
traceln " Progress: %d/%d (%.1f%%)"
+
count num_requests
+
(100.0 *. float_of_int count /. float_of_int num_requests)
+
end
+
)
+
with e ->
+
Atomic.incr error_count;
+
if Atomic.get error_count <= 10 then
+
traceln "Request %d to %a failed: %s"
+
req_id Conpool.Endpoint.pp endpoint (Printexc.to_string e)
+
) tasks;
+
+
let end_time = Unix.gettimeofday () in
+
let duration = end_time -. start_time in
+
let successful = Atomic.get success_count in
+
let failed = Atomic.get error_count in
+
+
traceln "\n=== Stress test results ===";
+
traceln "Total requests: %d" num_requests;
+
traceln "Successful: %d" successful;
+
traceln "Failed: %d" failed;
+
traceln "Duration: %.2fs" duration;
+
traceln "Throughput: %.0f req/s" (float_of_int successful /. duration);
+
traceln "Average latency: %.2fms" (duration *. 1000.0 /. float_of_int successful);
+
+
traceln "\n=== Connection pool statistics ===";
+
let all_stats = Conpool.all_stats pool in
+
+
(* Calculate totals *)
+
let total_created = List.fold_left (fun acc (_, s) -> acc + Conpool.Stats.total_created s) 0 all_stats in
+
let total_reused = List.fold_left (fun acc (_, s) -> acc + Conpool.Stats.total_reused s) 0 all_stats in
+
let total_closed = List.fold_left (fun acc (_, s) -> acc + Conpool.Stats.total_closed s) 0 all_stats in
+
let total_errors = List.fold_left (fun acc (_, s) -> acc + Conpool.Stats.errors s) 0 all_stats in
+
+
traceln "Total connections created: %d" total_created;
+
traceln "Total connections reused: %d" total_reused;
+
traceln "Total connections closed: %d" total_closed;
+
traceln "Total errors: %d" total_errors;
+
traceln "Connection reuse ratio: %.2fx (reused/created)"
+
(if total_created > 0 then float_of_int total_reused /. float_of_int total_created else 0.0);
+
traceln "Pool efficiency: %.1f%% (avoided creating %d connections)"
+
(if successful > 0 then 100.0 *. float_of_int total_reused /. float_of_int successful else 0.0)
+
total_reused;
+
+
traceln "\nPer-endpoint breakdown:";
+
List.iter (fun (endpoint, stats) ->
+
traceln " %a: created=%d reused=%d active=%d idle=%d"
+
Conpool.Endpoint.pp endpoint
+
(Conpool.Stats.total_created stats)
+
(Conpool.Stats.total_reused stats)
+
(Conpool.Stats.active stats)
+
(Conpool.Stats.idle stats)
+
) all_stats;
+
+
traceln "\n=== Verifying server-side connection counts ===";
+
List.iter2 (fun (addr_str, _addr, port) conn_ref ->
+
let count = Atomic.get conn_ref in
+
traceln "Server %s:%d - Active connections: %d" addr_str port count
+
) servers connection_refs;
+
+
traceln "\n=== Test completed successfully ==="
+56
stack/conpool/test/test_simple.ml
···
···
+
(** Simple test to debug connection issues *)
+
+
open Eio.Std
+
+
let () =
+
Logs.set_reporter (Logs_fmt.reporter ());
+
Logs.set_level (Some Logs.Debug);
+
Logs.Src.set_level Conpool.src (Some Logs.Debug);
+
+
Eio_main.run @@ fun env ->
+
Switch.run @@ fun sw ->
+
+
traceln "Starting simple server on 127.0.0.1:9000";
+
+
(* Start a simple echo server *)
+
let raw_bytes = Bytes.create 4 in
+
Bytes.set raw_bytes 0 (Char.chr 127);
+
Bytes.set raw_bytes 1 (Char.chr 0);
+
Bytes.set raw_bytes 2 (Char.chr 0);
+
Bytes.set raw_bytes 3 (Char.chr 1);
+
let ipaddr = Eio.Net.Ipaddr.of_raw (Bytes.to_string raw_bytes) in
+
+
let socket = Eio.Net.listen env#net ~sw ~reuse_addr:true ~backlog:10
+
(`Tcp (ipaddr, 9000))
+
in
+
+
Eio.Fiber.fork ~sw (fun () ->
+
Eio.Net.accept_fork socket ~sw ~on_error:raise (fun flow _addr ->
+
traceln "Server: accepted connection";
+
let buf = Eio.Buf_read.of_flow flow ~max_size:1024 in
+
let line = Eio.Buf_read.line buf in
+
traceln "Server: received: %s" line;
+
Eio.Flow.copy_string (line ^ "\n") flow;
+
Eio.Flow.close flow
+
)
+
);
+
+
Eio.Time.sleep env#clock 0.1;
+
+
traceln "Creating connection pool";
+
let pool = Conpool.create ~sw ~net:env#net ~clock:env#clock () in
+
+
traceln "Testing connection";
+
let endpoint = Conpool.Endpoint.make ~host:"127.0.0.1" ~port:9000 in
+
+
let response = Conpool.with_connection pool endpoint (fun flow ->
+
traceln "Client: sending message";
+
Eio.Flow.copy_string "test message\n" flow;
+
let buf = Eio.Buf_read.of_flow flow ~max_size:1024 in
+
let resp = Eio.Buf_read.line buf in
+
traceln "Client: received: %s" resp;
+
resp
+
) in
+
+
traceln "Response: %s" response;
+
traceln "Test passed!"
+165
stack/requests/lib/http_client.ml
···
···
+
(** Low-level HTTP/1.1 client over raw TCP connections for connection pooling *)
+
+
let src = Logs.Src.create "requests.http_client" ~doc:"Low-level HTTP client"
+
module Log = (val Logs.src_log src : Logs.LOG)
+
+
(** Build HTTP/1.1 request as a string *)
+
let build_request ~method_ ~uri ~headers ~body_str =
+
let path = Uri.path uri in
+
let path = if path = "" then "/" else path in
+
let query = Uri.query uri in
+
let path_with_query =
+
if query = [] then path
+
else path ^ "?" ^ (Uri.encoded_of_query query)
+
in
+
+
let host = match Uri.host uri with
+
| Some h -> h
+
| None -> failwith "URI must have a host"
+
in
+
+
let port = match Uri.port uri with
+
| Some p -> ":" ^ string_of_int p
+
| None ->
+
match Uri.scheme uri with
+
| Some "https" -> ":443"
+
| Some "http" -> ":80"
+
| _ -> ""
+
in
+
+
(* Build request line *)
+
let request_line = Printf.sprintf "%s %s HTTP/1.1\r\n" method_ path_with_query in
+
+
(* Ensure Host header is present *)
+
let headers = if not (Headers.mem "host" headers) then
+
Headers.add "host" (host ^ port) headers
+
else headers in
+
+
(* Ensure Connection header for keep-alive *)
+
let headers = if not (Headers.mem "connection" headers) then
+
Headers.add "connection" "keep-alive" headers
+
else headers in
+
+
(* Add Content-Length if we have a body *)
+
let headers =
+
if body_str <> "" && not (Headers.mem "content-length" headers) then
+
let len = String.length body_str in
+
Headers.add "content-length" (string_of_int len) headers
+
else headers
+
in
+
+
(* Build headers section *)
+
let headers_str =
+
Headers.to_list headers
+
|> List.map (fun (k, v) -> Printf.sprintf "%s: %s\r\n" k v)
+
|> String.concat ""
+
in
+
+
request_line ^ headers_str ^ "\r\n" ^ body_str
+
+
(** Parse HTTP response status line *)
+
let parse_status_line line =
+
match String.split_on_char ' ' line with
+
| "HTTP/1.1" :: code :: _ | "HTTP/1.0" :: code :: _ ->
+
(try int_of_string code
+
with _ -> failwith ("Invalid status code: " ^ code))
+
| _ -> failwith ("Invalid status line: " ^ line)
+
+
(** Parse HTTP headers from buffer reader *)
+
let parse_headers buf_read =
+
let rec read_headers acc =
+
let line = Eio.Buf_read.line buf_read in
+
if line = "" then List.rev acc
+
else begin
+
match String.index_opt line ':' with
+
| None -> read_headers acc
+
| Some idx ->
+
let name = String.sub line 0 idx |> String.trim |> String.lowercase_ascii in
+
let value = String.sub line (idx + 1) (String.length line - idx - 1) |> String.trim in
+
read_headers ((name, value) :: acc)
+
end
+
in
+
read_headers [] |> Headers.of_list
+
+
(** Read body with Content-Length *)
+
let read_fixed_body buf_read length =
+
let buf = Buffer.create (Int64.to_int length) in
+
let rec read_n remaining =
+
if remaining > 0L then begin
+
let to_read = min 8192 (Int64.to_int remaining) in
+
let chunk = Eio.Buf_read.take to_read buf_read in
+
Buffer.add_string buf chunk;
+
read_n (Int64.sub remaining (Int64.of_int (String.length chunk)))
+
end
+
in
+
read_n length;
+
Buffer.contents buf
+
+
(** Read chunked body *)
+
let read_chunked_body buf_read =
+
let buf = Buffer.create 4096 in
+
let rec read_chunks () =
+
let size_line = Eio.Buf_read.line buf_read in
+
(* Parse hex chunk size, ignore extensions after ';' *)
+
let size_str = match String.index_opt size_line ';' with
+
| Some idx -> String.sub size_line 0 idx
+
| None -> size_line
+
in
+
let chunk_size = int_of_string ("0x" ^ size_str) in
+
if chunk_size = 0 then begin
+
(* Read trailing headers (if any) until empty line *)
+
let rec skip_trailers () =
+
let line = Eio.Buf_read.line buf_read in
+
if line <> "" then skip_trailers ()
+
in
+
skip_trailers ()
+
end else begin
+
let chunk = Eio.Buf_read.take chunk_size buf_read in
+
Buffer.add_string buf chunk;
+
let _crlf = Eio.Buf_read.line buf_read in (* Read trailing CRLF *)
+
read_chunks ()
+
end
+
in
+
read_chunks ();
+
Buffer.contents buf
+
+
(** Make HTTP request over a pooled connection *)
+
let make_request ~method_ ~uri ~headers ~body_str flow =
+
Log.debug (fun m -> m "Making %s request to %s" method_ (Uri.to_string uri));
+
+
(* Build and send request *)
+
let request_str = build_request ~method_ ~uri ~headers ~body_str in
+
Eio.Flow.copy_string request_str flow;
+
+
(* Read and parse response *)
+
let buf_read = Eio.Buf_read.of_flow flow ~max_size:max_int in
+
+
(* Parse status line *)
+
let status_line = Eio.Buf_read.line buf_read in
+
let status = parse_status_line status_line in
+
+
Log.debug (fun m -> m "Received response status: %d" status);
+
+
(* Parse headers *)
+
let resp_headers = parse_headers buf_read in
+
+
(* Determine how to read body *)
+
let transfer_encoding = Headers.get "transfer-encoding" resp_headers in
+
let content_length = Headers.get "content-length" resp_headers |> Option.map Int64.of_string in
+
+
let body_str = match transfer_encoding, content_length with
+
| Some te, _ when String.lowercase_ascii te |> String.trim = "chunked" ->
+
Log.debug (fun m -> m "Reading chunked response body");
+
read_chunked_body buf_read
+
| _, Some len ->
+
Log.debug (fun m -> m "Reading fixed-length response body (%Ld bytes)" len);
+
read_fixed_body buf_read len
+
| Some other_te, None ->
+
Log.warn (fun m -> m "Unsupported transfer-encoding: %s, assuming no body" other_te);
+
""
+
| None, None ->
+
Log.debug (fun m -> m "No body indicated");
+
""
+
in
+
+
(status, resp_headers, body_str)
+18
stack/requests/test/dune
···
···
+
(test
+
(name test_requests)
+
(libraries
+
requests
+
alcotest
+
eio
+
eio_main
+
cohttp
+
cohttp-eio
+
uri
+
yojson
+
logs
+
str)
+
(deps
+
(package requests)))(executable
+
(name test_connection_pool)
+
(modules test_connection_pool)
+
(libraries requests eio_main logs logs.fmt conpool))
+63
stack/requests/test/test_connection_pool.ml
···
···
+
(** Test connection pooling with conpool integration *)
+
+
open Eio.Std
+
+
let test_connection_pooling () =
+
(* Initialize RNG for TLS *)
+
Mirage_crypto_rng_unix.use_default ();
+
+
Eio_main.run @@ fun env ->
+
Switch.run @@ fun sw ->
+
+
(* Configure logging to see connection pool activity *)
+
Logs.set_reporter (Logs_fmt.reporter ());
+
Logs.set_level (Some Logs.Info);
+
Logs.Src.set_level Conpool.src (Some Logs.Info);
+
Logs.Src.set_level Requests.One.src (Some Logs.Info);
+
+
traceln "=== Testing Connection Pooling ===\n";
+
+
(* Create a client with connection pooling *)
+
let client = Requests.One.create
+
~sw
+
~clock:env#clock
+
~net:env#net
+
~max_connections_per_host:5
+
~connection_idle_timeout:30.0
+
~verify_tls:true
+
()
+
in
+
+
traceln "Client created with connection pool";
+
traceln "Making 10 requests to example.com...\n";
+
+
(* Make multiple requests to the same host *)
+
let start_time = Unix.gettimeofday () in
+
+
for i = 1 to 10 do
+
traceln "Request %d:" i;
+
let response = Requests.One.get ~sw ~client "http://example.com" in
+
+
traceln " Status: %d" (Requests.Response.status_code response);
+
traceln " Content-Length: %s"
+
(match Requests.Response.content_length response with
+
| Some len -> Int64.to_string len
+
| None -> "unknown");
+
+
(* Body already drained - connection automatically returned to pool *)
+
traceln ""
+
done;
+
+
let elapsed = Unix.gettimeofday () -. start_time in
+
traceln "All 10 requests completed in %.2f seconds" elapsed;
+
traceln "Average: %.2f seconds per request" (elapsed /. 10.0);
+
+
traceln "\n=== Test completed successfully ==="
+
+
let () =
+
try
+
test_connection_pooling ()
+
with e ->
+
traceln "Test failed with exception: %s" (Printexc.to_string e);
+
Printexc.print_backtrace stdout;
+
exit 1
+897
stack/requests/test/test_requests.ml
···
···
+
open Eio_main
+
+
let port = ref 8088
+
+
let get_free_port () =
+
let p = !port in
+
incr port;
+
p
+
+
let string_contains s sub =
+
try
+
let _ = Str.search_forward (Str.regexp_string sub) s 0 in
+
true
+
with Not_found -> false
+
+
module Test_server = struct
+
open Cohttp_eio
+
+
let make_server ~port handler env =
+
let server_socket =
+
Eio.Net.listen env#net ~sw:env#sw ~backlog:128 ~reuse_addr:true
+
(`Tcp (Eio.Net.Ipaddr.V4.loopback, port))
+
in
+
let callback _conn req body =
+
let (resp, body_content) = handler ~request:req ~body in
+
Server.respond_string () ~status:(Http.Response.status resp)
+
~headers:(Http.Response.headers resp)
+
~body:body_content
+
in
+
let server = Server.make ~callback () in
+
Server.run server_socket server ~on_error:(fun exn ->
+
Logs.err (fun m -> m "Server error: %s" (Printexc.to_string exn))
+
)
+
+
let echo_handler ~request ~body =
+
let uri = Http.Request.resource request in
+
let meth = Http.Request.meth request in
+
let headers = Http.Request.headers request in
+
let body_str = Eio.Flow.read_all body in
+
+
let response_body =
+
`Assoc [
+
"method", `String (Cohttp.Code.string_of_method meth);
+
"uri", `String uri;
+
"headers", `Assoc (
+
Cohttp.Header.to_lines headers
+
|> List.map (fun line ->
+
match String.split_on_char ':' line with
+
| [k; v] -> (String.trim k, `String (String.trim v))
+
| _ -> ("", `String line)
+
)
+
);
+
"body", `String body_str;
+
]
+
|> Yojson.Basic.to_string
+
in
+
+
let resp = Http.Response.make ~status:`OK () in
+
let resp_headers = Cohttp.Header.add_unless_exists
+
(Http.Response.headers resp) "content-type" "application/json"
+
in
+
({ resp with headers = resp_headers }, response_body)
+
+
let status_handler status_code ~request:_ ~body:_ =
+
let status = Cohttp.Code.status_of_code status_code in
+
let resp = Http.Response.make ~status () in
+
(resp, "")
+
+
let redirect_handler target_path ~request:_ ~body:_ =
+
let resp = Http.Response.make ~status:`Moved_permanently () in
+
let headers = Cohttp.Header.add
+
(Http.Response.headers resp) "location" target_path
+
in
+
({ resp with headers }, "")
+
+
let cookie_handler ~request ~body:_ =
+
let headers = Http.Request.headers request in
+
let cookies =
+
match Cohttp.Header.get headers "cookie" with
+
| Some cookie_str -> cookie_str
+
| None -> "no cookies"
+
in
+
+
let resp = Http.Response.make ~status:`OK () in
+
let resp_headers =
+
Http.Response.headers resp
+
|> (fun h -> Cohttp.Header.add h "set-cookie" "test_cookie=test_value; Path=/")
+
|> (fun h -> Cohttp.Header.add h "set-cookie" "session=abc123; Path=/; HttpOnly")
+
in
+
({ resp with headers = resp_headers },
+
cookies)
+
+
let auth_handler ~request ~body:_ =
+
let headers = Http.Request.headers request in
+
let auth_result =
+
match Cohttp.Header.get headers "authorization" with
+
| Some auth ->
+
if String.starts_with ~prefix:"Bearer " auth then
+
let token = String.sub auth 7 (String.length auth - 7) in
+
if token = "valid_token" then "authorized"
+
else "invalid token"
+
else if String.starts_with ~prefix:"Basic " auth then
+
"basic auth received"
+
else "unknown auth"
+
| None -> "no auth"
+
in
+
+
let status =
+
if auth_result = "authorized" || auth_result = "basic auth received"
+
then `OK
+
else `Unauthorized
+
in
+
let resp = Http.Response.make ~status () in
+
(resp, auth_result)
+
+
let json_handler ~request:_ ~body =
+
let body_str = Eio.Flow.read_all body in
+
let json =
+
try
+
let parsed = Yojson.Basic.from_string body_str in
+
`Assoc [
+
"received", parsed;
+
"echo", `Bool true;
+
]
+
with _ ->
+
`Assoc [
+
"error", `String "invalid json";
+
"received", `String body_str;
+
]
+
in
+
+
let resp = Http.Response.make ~status:`OK () in
+
let resp_headers = Cohttp.Header.add_unless_exists
+
(Http.Response.headers resp) "content-type" "application/json"
+
in
+
({ resp with headers = resp_headers },
+
Yojson.Basic.to_string json)
+
+
let timeout_handler clock delay ~request:_ ~body:_ =
+
Eio.Time.sleep clock delay;
+
let resp = Http.Response.make ~status:`OK () in
+
(resp,"delayed response")
+
+
let chunked_handler _clock chunks ~request:_ ~body:_ =
+
let resp = Http.Response.make ~status:`OK () in
+
let body_str = String.concat "" chunks in
+
(resp,body_str)
+
+
let large_response_handler size ~request:_ ~body:_ =
+
let data = String.make size 'X' in
+
let resp = Http.Response.make ~status:`OK () in
+
(resp,data)
+
+
let multipart_handler ~request ~body =
+
let headers = Http.Request.headers request in
+
let content_type = Cohttp.Header.get headers "content-type" in
+
let body_str = Eio.Flow.read_all body in
+
+
let result =
+
match content_type with
+
| Some ct when String.starts_with ~prefix:"multipart/form-data" ct ->
+
Printf.sprintf "Multipart received: %d bytes" (String.length body_str)
+
| _ -> "Not multipart"
+
in
+
+
let resp = Http.Response.make ~status:`OK () in
+
(resp,result)
+
+
let router clock ~request ~body =
+
let uri = Http.Request.resource request in
+
match uri with
+
| "/" | "/echo" -> echo_handler ~request ~body
+
| "/status/200" -> status_handler 200 ~request ~body
+
| "/status/404" -> status_handler 404 ~request ~body
+
| "/status/500" -> status_handler 500 ~request ~body
+
| "/redirect" -> redirect_handler "/redirected" ~request ~body
+
| "/redirected" ->
+
let resp = Http.Response.make ~status:`OK () in
+
(resp,"redirect successful")
+
| "/cookies" -> cookie_handler ~request ~body
+
| "/auth" -> auth_handler ~request ~body
+
| "/json" -> json_handler ~request ~body
+
| "/timeout" -> timeout_handler clock 2.0 ~request ~body
+
| "/chunked" ->
+
chunked_handler clock ["chunk1"; "chunk2"; "chunk3"] ~request ~body
+
| "/large" -> large_response_handler 10000 ~request ~body
+
| "/multipart" -> multipart_handler ~request ~body
+
| _ -> status_handler 404 ~request ~body
+
+
let start_server ~port env =
+
Eio.Fiber.fork ~sw:env#sw (fun () ->
+
make_server ~port (router env#clock) env
+
);
+
Eio.Time.sleep env#clock 0.1
+
end
+
+
let test_get_request () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let req = Requests.create ~sw env in
+
let response = Requests.get req (base_url ^ "/echo") in
+
+
Alcotest.(check int) "GET status" 200 (Requests.Response.status_code response);
+
+
let body_str = Requests.Response.body response |> Eio.Flow.read_all in
+
let json = Yojson.Basic.from_string body_str in
+
let method_str =
+
json |> Yojson.Basic.Util.member "method" |> Yojson.Basic.Util.to_string
+
in
+
+
Alcotest.(check string) "GET method" "GET" method_str
+
+
let test_post_request () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let req = Requests.create ~sw env in
+
let body = Requests.Body.text "test post data" in
+
let response = Requests.post req ~body (base_url ^ "/echo") in
+
+
Alcotest.(check int) "POST status" 200 (Requests.Response.status_code response);
+
+
let body_str = Requests.Response.body response |> Eio.Flow.read_all in
+
let json = Yojson.Basic.from_string body_str in
+
let received_body =
+
json |> Yojson.Basic.Util.member "body" |> Yojson.Basic.Util.to_string
+
in
+
+
Alcotest.(check string) "POST body" "test post data" received_body
+
+
let test_put_request () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let req = Requests.create ~sw env in
+
let body = Requests.Body.text "put data" in
+
let response = Requests.put req ~body (base_url ^ "/echo") in
+
+
Alcotest.(check int) "PUT status" 200 (Requests.Response.status_code response);
+
+
let body_str = Requests.Response.body response |> Eio.Flow.read_all in
+
let json = Yojson.Basic.from_string body_str in
+
let method_str =
+
json |> Yojson.Basic.Util.member "method" |> Yojson.Basic.Util.to_string
+
in
+
+
Alcotest.(check string) "PUT method" "PUT" method_str
+
+
let test_delete_request () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let req = Requests.create ~sw env in
+
let response = Requests.delete req (base_url ^ "/echo") in
+
+
Alcotest.(check int) "DELETE status" 200 (Requests.Response.status_code response);
+
+
let body_str = Requests.Response.body response |> Eio.Flow.read_all in
+
let json = Yojson.Basic.from_string body_str in
+
let method_str =
+
json |> Yojson.Basic.Util.member "method" |> Yojson.Basic.Util.to_string
+
in
+
+
Alcotest.(check string) "DELETE method" "DELETE" method_str
+
+
let test_patch_request () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let req = Requests.create ~sw env in
+
let body = Requests.Body.of_string Requests.Mime.json {|{"patch": "data"}|} in
+
let response = Requests.patch req ~body (base_url ^ "/echo") in
+
+
Alcotest.(check int) "PATCH status" 200 (Requests.Response.status_code response);
+
+
let body_str = Requests.Response.body response |> Eio.Flow.read_all in
+
let json = Yojson.Basic.from_string body_str in
+
let method_str =
+
json |> Yojson.Basic.Util.member "method" |> Yojson.Basic.Util.to_string
+
in
+
+
Alcotest.(check string) "PATCH method" "PATCH" method_str
+
+
let test_head_request () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let req = Requests.create ~sw env in
+
let response = Requests.head req (base_url ^ "/echo") in
+
+
Alcotest.(check int) "HEAD status" 200 (Requests.Response.status_code response)
+
+
let test_options_request () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let req = Requests.create ~sw env in
+
let response = Requests.options req (base_url ^ "/echo") in
+
+
Alcotest.(check int) "OPTIONS status" 200 (Requests.Response.status_code response)
+
+
let test_custom_headers () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let req = Requests.create ~sw env in
+
let headers =
+
Requests.Headers.empty
+
|> Requests.Headers.set "X-Custom-Header" "custom-value"
+
|> Requests.Headers.set "User-Agent" "test-agent"
+
in
+
let response = Requests.get req ~headers (base_url ^ "/echo") in
+
+
Alcotest.(check int) "Headers status" 200 (Requests.Response.status_code response);
+
+
let body_str = Requests.Response.body response |> Eio.Flow.read_all in
+
let json = Yojson.Basic.from_string body_str in
+
let headers_obj = json |> Yojson.Basic.Util.member "headers" in
+
+
let custom_header =
+
headers_obj
+
|> Yojson.Basic.Util.member "x-custom-header"
+
|> Yojson.Basic.Util.to_string_option
+
|> Option.value ~default:""
+
in
+
+
Alcotest.(check string) "Custom header" "custom-value" custom_header
+
+
let test_query_params () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let req = Requests.create ~sw env in
+
let params = [("key1", "value1"); ("key2", "value2")] in
+
let response = Requests.get req ~params (base_url ^ "/echo") in
+
+
Alcotest.(check int) "Query params status" 200 (Requests.Response.status_code response);
+
+
let body_str = Requests.Response.body response |> Eio.Flow.read_all in
+
let json = Yojson.Basic.from_string body_str in
+
let uri = json |> Yojson.Basic.Util.member "uri" |> Yojson.Basic.Util.to_string in
+
+
Alcotest.(check bool) "Query params present" true
+
(string_contains uri "key1=value1" && string_contains uri "key2=value2")
+
+
let test_json_body () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let req = Requests.create ~sw env in
+
let json_data = {|{"name": "test", "value": 42}|} in
+
let body = Requests.Body.of_string Requests.Mime.json json_data in
+
let response = Requests.post req ~body (base_url ^ "/json") in
+
+
Alcotest.(check int) "JSON body status" 200 (Requests.Response.status_code response);
+
+
let body_str = Requests.Response.body response |> Eio.Flow.read_all in
+
let json = Yojson.Basic.from_string body_str in
+
let received = json |> Yojson.Basic.Util.member "received" in
+
let name = received |> Yojson.Basic.Util.member "name" |> Yojson.Basic.Util.to_string in
+
+
Alcotest.(check string) "JSON field" "test" name
+
+
let test_form_data () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let req = Requests.create ~sw env in
+
let form_data = [("field1", "value1"); ("field2", "value2")] in
+
let body = Requests.Body.form form_data in
+
let response = Requests.post req ~body (base_url ^ "/echo") in
+
+
Alcotest.(check int) "Form data status" 200 (Requests.Response.status_code response);
+
+
let body_str = Requests.Response.body response |> Eio.Flow.read_all in
+
let json = Yojson.Basic.from_string body_str in
+
let received_body =
+
json |> Yojson.Basic.Util.member "body" |> Yojson.Basic.Util.to_string
+
in
+
+
Alcotest.(check bool) "Form data encoded" true
+
(string_contains received_body "field1=value1" &&
+
string_contains received_body "field2=value2")
+
+
let test_status_codes () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let req = Requests.create ~sw env in
+
+
let resp_200 = Requests.get req (base_url ^ "/status/200") in
+
Alcotest.(check int) "Status 200" 200 (Requests.Response.status_code resp_200);
+
+
let resp_404 = Requests.get req (base_url ^ "/status/404") in
+
Alcotest.(check int) "Status 404" 404 (Requests.Response.status_code resp_404);
+
+
let resp_500 = Requests.get req (base_url ^ "/status/500") in
+
Alcotest.(check int) "Status 500" 500 (Requests.Response.status_code resp_500)
+
+
let test_redirects () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let req = Requests.create ~sw ~follow_redirects:true env in
+
let response = Requests.get req (base_url ^ "/redirect") in
+
+
Alcotest.(check int) "Redirect followed" 200 (Requests.Response.status_code response);
+
+
let body_str = Requests.Response.body response |> Eio.Flow.read_all in
+
Alcotest.(check string) "Redirect result" "redirect successful" body_str
+
+
let test_no_redirect () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let req = Requests.create ~sw env in
+
let response = Requests.request req ~follow_redirects:false ~method_:`GET (base_url ^ "/redirect") in
+
+
Alcotest.(check int) "Redirect not followed" 301
+
(Requests.Response.status_code response)
+
+
let test_cookies () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let req = Requests.create ~sw env in
+
+
let _first_response = Requests.get req (base_url ^ "/cookies") in
+
+
let second_response = Requests.get req (base_url ^ "/cookies") in
+
let body_str = Requests.Response.body second_response |> Eio.Flow.read_all in
+
+
Alcotest.(check bool) "Cookies sent back" true
+
(string_contains body_str "test_cookie=test_value")
+
+
let test_bearer_auth () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let req = Requests.create ~sw env in
+
let auth = Requests.Auth.bearer ~token:"valid_token" in
+
let response = Requests.get req ~auth (base_url ^ "/auth") in
+
+
Alcotest.(check int) "Bearer auth status" 200 (Requests.Response.status_code response);
+
+
let body_str = Requests.Response.body response |> Eio.Flow.read_all in
+
Alcotest.(check string) "Bearer auth result" "authorized" body_str
+
+
let test_basic_auth () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let req = Requests.create ~sw env in
+
let auth = Requests.Auth.basic ~username:"user" ~password:"pass" in
+
let response = Requests.get req ~auth (base_url ^ "/auth") in
+
+
Alcotest.(check int) "Basic auth status" 200 (Requests.Response.status_code response);
+
+
let body_str = Requests.Response.body response |> Eio.Flow.read_all in
+
Alcotest.(check string) "Basic auth result" "basic auth received" body_str
+
+
let test_timeout () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let req = Requests.create ~sw env in
+
let timeout = Requests.Timeout.create ~total:0.5 () in
+
+
let exception_raised =
+
try
+
let _ = Requests.get req ~timeout (base_url ^ "/timeout") in
+
false
+
with _ -> true
+
in
+
+
Alcotest.(check bool) "Timeout triggered" true exception_raised
+
+
let test_concurrent_requests () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let req = Requests.create ~sw env in
+
+
let r1 = ref None in
+
let r2 = ref None in
+
let r3 = ref None in
+
+
Eio.Fiber.all [
+
(fun () -> r1 := Some (Requests.get req (base_url ^ "/status/200")));
+
(fun () -> r2 := Some (Requests.get req (base_url ^ "/status/404")));
+
(fun () -> r3 := Some (Requests.get req (base_url ^ "/status/500")));
+
];
+
+
let r1 = Option.get !r1 in
+
let r2 = Option.get !r2 in
+
let r3 = Option.get !r3 in
+
+
Alcotest.(check int) "Concurrent 1" 200 (Requests.Response.status_code r1);
+
Alcotest.(check int) "Concurrent 2" 404 (Requests.Response.status_code r2);
+
Alcotest.(check int) "Concurrent 3" 500 (Requests.Response.status_code r3)
+
+
let test_large_response () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let req = Requests.create ~sw env in
+
let response = Requests.get req (base_url ^ "/large") in
+
+
Alcotest.(check int) "Large response status" 200 (Requests.Response.status_code response);
+
+
let body_str = Requests.Response.body response |> Eio.Flow.read_all in
+
Alcotest.(check int) "Large response size" 10000 (String.length body_str)
+
+
let test_one_module () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let client = Requests.One.create ~clock:env#clock ~net:env#net () in
+
let response = Requests.One.get ~sw ~client (base_url ^ "/echo") in
+
+
Alcotest.(check int) "One module status" 200 (Requests.Response.status_code response)
+
+
let test_multipart () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let req = Requests.create ~sw env in
+
let parts = [
+
{ Requests.Body.name = "field1";
+
filename = None;
+
content_type = Requests.Mime.text;
+
content = `String "value1" };
+
{ Requests.Body.name = "field2";
+
filename = Some "test.txt";
+
content_type = Requests.Mime.text;
+
content = `String "file content" };
+
] in
+
let body = Requests.Body.multipart parts in
+
let response = Requests.post req ~body (base_url ^ "/multipart") in
+
+
Alcotest.(check int) "Multipart status" 200 (Requests.Response.status_code response);
+
+
let body_str = Requests.Response.body response |> Eio.Flow.read_all in
+
Alcotest.(check bool) "Multipart recognized" true
+
(String.starts_with ~prefix:"Multipart received:" body_str)
+
+
let test_response_headers () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let req = Requests.create ~sw env in
+
let response = Requests.get req (base_url ^ "/json") in
+
+
let content_type =
+
Requests.Response.headers response
+
|> Requests.Headers.get "content-type"
+
in
+
+
Alcotest.(check (option string)) "Response content-type"
+
(Some "application/json") content_type
+
+
let test_default_headers () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let default_headers =
+
Requests.Headers.empty
+
|> Requests.Headers.set "X-Default" "default-value"
+
in
+
let req = Requests.create ~sw ~default_headers env in
+
let response = Requests.get req (base_url ^ "/echo") in
+
+
let body_str = Requests.Response.body response |> Eio.Flow.read_all in
+
let json = Yojson.Basic.from_string body_str in
+
let headers_obj = json |> Yojson.Basic.Util.member "headers" in
+
+
let default_header =
+
headers_obj
+
|> Yojson.Basic.Util.member "x-default"
+
|> Yojson.Basic.Util.to_string_option
+
|> Option.value ~default:""
+
in
+
+
Alcotest.(check string) "Default header present" "default-value" default_header
+
+
let test_session_persistence () =
+
run @@ fun env ->
+
Eio.Switch.run @@ fun sw ->
+
let port = get_free_port () in
+
let base_url = Printf.sprintf "http://127.0.0.1:%d" port in
+
+
let test_env = object
+
method clock = env#clock
+
method net = env#net
+
method sw = sw
+
end in
+
Test_server.start_server ~port test_env;
+
+
let req = Requests.create ~sw env in
+
+
Requests.set_default_header req "X-Session" "session-123";
+
+
let auth = Requests.Auth.bearer ~token:"test_token" in
+
Requests.set_auth req auth;
+
+
let response1 = Requests.get req (base_url ^ "/echo") in
+
let body_str1 = Requests.Response.body response1 |> Eio.Flow.read_all in
+
let json1 = Yojson.Basic.from_string body_str1 in
+
let headers1 = json1 |> Yojson.Basic.Util.member "headers" in
+
+
let session_header =
+
headers1
+
|> Yojson.Basic.Util.member "x-session"
+
|> Yojson.Basic.Util.to_string_option
+
|> Option.value ~default:""
+
in
+
+
Alcotest.(check string) "Session header persisted" "session-123" session_header;
+
+
Requests.remove_default_header req "X-Session";
+
+
let response2 = Requests.get req (base_url ^ "/echo") in
+
let body_str2 = Requests.Response.body response2 |> Eio.Flow.read_all in
+
let json2 = Yojson.Basic.from_string body_str2 in
+
let headers2 = json2 |> Yojson.Basic.Util.member "headers" in
+
+
let session_header2 =
+
headers2
+
|> Yojson.Basic.Util.member "x-session"
+
|> Yojson.Basic.Util.to_string_option
+
in
+
+
Alcotest.(check (option string)) "Session header removed" None session_header2
+
+
let () =
+
Logs.set_reporter (Logs.format_reporter ());
+
Logs.set_level (Some Logs.Warning);
+
+
let open Alcotest in
+
run "Requests Tests" [
+
"HTTP Methods", [
+
test_case "GET request" `Quick test_get_request;
+
test_case "POST request" `Quick test_post_request;
+
test_case "PUT request" `Quick test_put_request;
+
test_case "DELETE request" `Quick test_delete_request;
+
test_case "PATCH request" `Quick test_patch_request;
+
test_case "HEAD request" `Quick test_head_request;
+
test_case "OPTIONS request" `Quick test_options_request;
+
];
+
"Request Features", [
+
test_case "Custom headers" `Quick test_custom_headers;
+
test_case "Query parameters" `Quick test_query_params;
+
test_case "JSON body" `Quick test_json_body;
+
test_case "Form data" `Quick test_form_data;
+
test_case "Multipart upload" `Quick test_multipart;
+
test_case "Default headers" `Quick test_default_headers;
+
];
+
"Response Handling", [
+
test_case "Status codes" `Quick test_status_codes;
+
test_case "Response headers" `Quick test_response_headers;
+
test_case "Large response" `Quick test_large_response;
+
];
+
"Redirects", [
+
test_case "Follow redirects" `Quick test_redirects;
+
test_case "No follow redirects" `Quick test_no_redirect;
+
];
+
"Authentication", [
+
test_case "Bearer auth" `Quick test_bearer_auth;
+
test_case "Basic auth" `Quick test_basic_auth;
+
];
+
"Session Features", [
+
test_case "Cookies" `Quick test_cookies;
+
test_case "Session persistence" `Quick test_session_persistence;
+
];
+
"Advanced", [
+
test_case "Timeout handling" `Quick test_timeout;
+
test_case "Concurrent requests" `Quick test_concurrent_requests;
+
test_case "One module" `Quick test_one_module;
+
];
+
]
+537
stack/river/lib/river_store.ml
···
···
+
(*
+
* Persistent storage for Atom feed entries using Cacheio
+
*)
+
+
let src = Logs.Src.create "river.store" ~doc:"River persistent storage"
+
module Log = (val Logs.src_log src : Logs.LOG)
+
+
(* Types *)
+
+
type stored_entry = {
+
atom_id : string;
+
title : string;
+
link : Uri.t option;
+
published : Ptime.t option;
+
updated : Ptime.t;
+
author_name : string;
+
author_email : string option;
+
content : string;
+
feed_url : string;
+
feed_name : string;
+
feed_title : string;
+
stored_at : Ptime.t;
+
}
+
+
type feed_info = {
+
url : string;
+
name : string;
+
title : string;
+
last_updated : Ptime.t;
+
entry_count : int;
+
}
+
+
type t = {
+
cache : Cacheio.t;
+
base_dir : Eio.Fs.dir_ty Eio.Path.t;
+
}
+
+
(* Helper functions *)
+
+
let make_feed_key feed_url =
+
(* Use SHA256 hash of feed URL as directory name for safety *)
+
let hash = Digest.string feed_url |> Digest.to_hex in
+
"feeds/" ^ hash
+
+
let make_entry_key feed_url atom_id =
+
(* Store entry under feed directory with atom_id hash *)
+
let feed_key = make_feed_key feed_url in
+
let entry_hash = Digest.string atom_id |> Digest.to_hex in
+
feed_key ^ "/entries/" ^ entry_hash
+
+
let make_feed_meta_key feed_url =
+
let feed_key = make_feed_key feed_url in
+
feed_key ^ "/meta.json"
+
+
(* JSON serialization *)
+
+
let entry_to_json entry =
+
`Assoc [
+
"atom_id", `String entry.atom_id;
+
"title", `String entry.title;
+
"link", (match entry.link with
+
| Some u -> `String (Uri.to_string u)
+
| None -> `Null);
+
"published", (match entry.published with
+
| Some t -> `String (Ptime.to_rfc3339 t)
+
| None -> `Null);
+
"updated", `String (Ptime.to_rfc3339 entry.updated);
+
"author_name", `String entry.author_name;
+
"author_email", (match entry.author_email with Some e -> `String e | None -> `Null);
+
"content", `String entry.content;
+
"feed_url", `String entry.feed_url;
+
"feed_name", `String entry.feed_name;
+
"feed_title", `String entry.feed_title;
+
"stored_at", `String (Ptime.to_rfc3339 entry.stored_at);
+
]
+
+
let entry_of_json json =
+
let open Yojson.Safe.Util in
+
let parse_time s =
+
match Ptime.of_rfc3339 s with
+
| Ok (t, _, _) -> t
+
| Error _ -> failwith ("Invalid timestamp: " ^ s)
+
in
+
{
+
atom_id = json |> member "atom_id" |> to_string;
+
title = json |> member "title" |> to_string;
+
link = json |> member "link" |> to_string_option |> Option.map Uri.of_string;
+
published = json |> member "published" |> to_string_option |> Option.map parse_time;
+
updated = json |> member "updated" |> to_string |> parse_time;
+
author_name = json |> member "author_name" |> to_string;
+
author_email = json |> member "author_email" |> to_string_option;
+
content = json |> member "content" |> to_string;
+
feed_url = json |> member "feed_url" |> to_string;
+
feed_name = json |> member "feed_name" |> to_string;
+
feed_title = json |> member "feed_title" |> to_string;
+
stored_at = json |> member "stored_at" |> to_string |> parse_time;
+
}
+
+
let feed_meta_to_json meta =
+
`Assoc [
+
"url", `String meta.url;
+
"name", `String meta.name;
+
"title", `String meta.title;
+
"last_updated", `String (Ptime.to_rfc3339 meta.last_updated);
+
]
+
+
let feed_meta_of_json json =
+
let open Yojson.Safe.Util in
+
let parse_time s =
+
match Ptime.of_rfc3339 s with
+
| Ok (t, _, _) -> t
+
| Error _ -> failwith ("Invalid timestamp: " ^ s)
+
in
+
{
+
url = json |> member "url" |> to_string;
+
name = json |> member "name" |> to_string;
+
title = json |> member "title" |> to_string;
+
last_updated = json |> member "last_updated" |> to_string |> parse_time;
+
entry_count = 0; (* Will be counted separately *)
+
}
+
+
(* Store creation *)
+
+
let create ~base_dir =
+
let cache_dir = Eio.Path.(base_dir / "river_store") in
+
(try
+
Eio.Path.mkdir ~perm:0o755 cache_dir
+
with Eio.Io (Eio.Fs.E (Already_exists _), _) -> ());
+
let cache = Cacheio.create ~base_dir:cache_dir in
+
Log.info (fun m -> m "Created River store at %a" Eio.Path.pp cache_dir);
+
{ cache; base_dir = cache_dir }
+
+
let create_with_xdge xdge =
+
let cache = Cacheio.create_with_xdge xdge in
+
let base_dir = Eio.Path.( / ) (Xdge.cache_dir xdge) "river_store" in
+
Log.info (fun m -> m "Created River store with XDG at %a" Eio.Path.pp base_dir);
+
{ cache; base_dir }
+
+
(* Convert Post.t to stored_entry *)
+
let entry_of_post ~feed_url ~feed_name ~feed_title (post : Post.t) =
+
let atom_id = match post.link with
+
| Some uri -> Uri.to_string uri
+
| None -> Digest.to_hex (Digest.string post.title)
+
in
+
let updated = match post.date with
+
| Some d -> d
+
| None -> Ptime.of_float_s (Unix.gettimeofday ()) |> Option.get
+
in
+
let published = post.date in
+
{
+
atom_id;
+
title = post.title;
+
link = post.link;
+
published;
+
updated;
+
author_name = post.author;
+
author_email = if post.email = "" then None else Some post.email;
+
content = Soup.to_string post.content;
+
feed_url;
+
feed_name;
+
feed_title;
+
stored_at = Ptime.of_float_s (Unix.gettimeofday ()) |> Option.get;
+
}
+
+
(* Convert Syndic.Atom.entry to stored_entry *)
+
let entry_of_atom ~feed_url ~feed_name ~feed_title (atom_entry : Syndic.Atom.entry) =
+
let atom_id = Uri.to_string atom_entry.id in
+
let updated = atom_entry.updated in
+
let published = match atom_entry.published with
+
| Some p -> Some p
+
| None -> Some atom_entry.updated
+
in
+
(* Extract author info - Syndic doesn't expose person record fields,
+
so we'll use placeholders and reconstruct via Atom.author later *)
+
let content = match atom_entry.content with
+
| Some (Syndic.Atom.Text s) -> s
+
| Some (Syndic.Atom.Html (_, s)) -> s
+
| Some (Syndic.Atom.Xhtml (_, nodes)) ->
+
let ns_prefix _ = Some "" in
+
String.concat "" (List.map (Syndic.XML.to_string ~ns_prefix) nodes)
+
| Some (Syndic.Atom.Mime _) | Some (Syndic.Atom.Src _) | None ->
+
(match atom_entry.summary with
+
| Some (Syndic.Atom.Text s) -> s
+
| Some (Syndic.Atom.Html (_, s)) -> s
+
| Some (Syndic.Atom.Xhtml (_, nodes)) ->
+
let ns_prefix _ = Some "" in
+
String.concat "" (List.map (Syndic.XML.to_string ~ns_prefix) nodes)
+
| None -> "")
+
in
+
let link = try
+
Some (List.find (fun l -> l.Syndic.Atom.rel = Syndic.Atom.Alternate) atom_entry.links).href
+
with Not_found ->
+
match atom_entry.links with
+
| l :: _ -> Some l.href
+
| [] -> None
+
in
+
{
+
atom_id;
+
title = Util.string_of_text_construct atom_entry.title;
+
link;
+
published;
+
updated;
+
author_name = feed_name; (* Use feed name as fallback *)
+
author_email = None;
+
content;
+
feed_url;
+
feed_name;
+
feed_title;
+
stored_at = Ptime.of_float_s (Unix.gettimeofday ()) |> Option.get;
+
}
+
+
(* Feed metadata management *)
+
let update_feed_meta store ~feed_url ~feed_name ~feed_title ~sw:_ =
+
let key = make_feed_meta_key feed_url in
+
let meta = {
+
url = feed_url;
+
name = feed_name;
+
title = feed_title;
+
last_updated = Ptime.of_float_s (Unix.gettimeofday ()) |> Option.get;
+
entry_count = 0;
+
} in
+
let json = feed_meta_to_json meta |> Yojson.Safe.to_string in
+
let source = Eio.Flow.string_source json in
+
Cacheio.put store.cache ~key ~source ~ttl:None ();
+
Log.debug (fun m -> m "Updated feed metadata for %s" feed_url)
+
+
let get_feed_meta store ~feed_url ~sw =
+
let key = make_feed_meta_key feed_url in
+
match Cacheio.get store.cache ~key ~sw with
+
| None -> None
+
| Some source ->
+
try
+
let json_str = Eio.Buf_read.(parse_exn take_all) source ~max_size:Int.max_int in
+
let json = Yojson.Safe.from_string json_str in
+
Some (feed_meta_of_json json)
+
with e ->
+
Log.err (fun m -> m "Failed to parse feed metadata: %s" (Printexc.to_string e));
+
None
+
+
(* Entry storage *)
+
+
let store_entry store ~feed_url ~feed_name ~feed_title ~post ~sw =
+
let entry = entry_of_post ~feed_url ~feed_name ~feed_title post in
+
let key = make_entry_key feed_url entry.atom_id in
+
let json = entry_to_json entry |> Yojson.Safe.to_string in
+
let source = Eio.Flow.string_source json in
+
Cacheio.put store.cache ~key ~source ~ttl:None ();
+
Log.debug (fun m -> m "Stored entry %s for feed %s" entry.atom_id feed_url);
+
(* Update feed metadata *)
+
update_feed_meta store ~feed_url ~feed_name ~feed_title ~sw
+
+
let store_posts store ~feed_url ~feed_name ~feed_title ~posts ~sw =
+
Log.info (fun m -> m "Storing %d posts for feed %s" (List.length posts) feed_url);
+
List.iter (fun post ->
+
store_entry store ~feed_url ~feed_name ~feed_title ~post ~sw
+
) posts;
+
Log.info (fun m -> m "Stored %d entries for feed %s" (List.length posts) feed_url)
+
+
let store_atom_entries store ~feed_url ~feed_name ~feed_title ~entries ~sw =
+
Log.info (fun m -> m "Storing %d Atom entries for feed %s" (List.length entries) feed_url);
+
List.iter (fun atom_entry ->
+
let entry = entry_of_atom ~feed_url ~feed_name ~feed_title atom_entry in
+
let key = make_entry_key feed_url entry.atom_id in
+
let json = entry_to_json entry |> Yojson.Safe.to_string in
+
let source = Eio.Flow.string_source json in
+
Cacheio.put store.cache ~key ~source ~ttl:None ();
+
Log.debug (fun m -> m "Stored Atom entry %s" entry.atom_id);
+
) entries;
+
update_feed_meta store ~feed_url ~feed_name ~feed_title ~sw;
+
Log.info (fun m -> m "Stored %d Atom entries for feed %s" (List.length entries) feed_url)
+
+
(* Entry retrieval *)
+
+
let get_entry store ~feed_url ~atom_id ~sw =
+
let key = make_entry_key feed_url atom_id in
+
match Cacheio.get store.cache ~key ~sw with
+
| None -> None
+
| Some source ->
+
try
+
let json_str = Eio.Buf_read.(parse_exn take_all) source ~max_size:Int.max_int in
+
let json = Yojson.Safe.from_string json_str in
+
Some (entry_of_json json)
+
with e ->
+
Log.err (fun m -> m "Failed to parse entry: %s" (Printexc.to_string e));
+
None
+
+
let list_entries store ~feed_url =
+
let feed_key = make_feed_key feed_url in
+
let prefix = feed_key ^ "/entries/" in
+
let entries = Cacheio.scan store.cache in
+
let feed_entries = List.filter_map (fun (cache_entry : Cacheio.Entry.t) ->
+
let key = Cacheio.Entry.key cache_entry in
+
if String.starts_with ~prefix key then
+
Eio.Switch.run @@ fun sw ->
+
match Cacheio.get store.cache ~key ~sw with
+
| None -> None
+
| Some source ->
+
try
+
let json_str = Eio.Buf_read.(parse_exn take_all) source ~max_size:Int.max_int in
+
let json = Yojson.Safe.from_string json_str in
+
Some (entry_of_json json)
+
with e ->
+
Log.err (fun m -> m "Failed to parse entry from scan: %s" (Printexc.to_string e));
+
None
+
else None
+
) entries in
+
(* Sort by updated time, newest first *)
+
List.sort (fun a b -> Ptime.compare b.updated a.updated) feed_entries
+
+
let list_entries_filtered store ~feed_url ?since ?until ?limit ?(sort=`Updated) () =
+
let entries = list_entries store ~feed_url in
+
(* Filter by time *)
+
let entries = match since with
+
| None -> entries
+
| Some t -> List.filter (fun e -> Ptime.is_later e.updated ~than:t || Ptime.equal e.updated t) entries
+
in
+
let entries = match until with
+
| None -> entries
+
| Some t -> List.filter (fun e -> Ptime.is_earlier e.updated ~than:t || Ptime.equal e.updated t) entries
+
in
+
(* Sort *)
+
let entries = match sort with
+
| `Published -> List.sort (fun a b ->
+
match a.published, b.published with
+
| Some pa, Some pb -> Ptime.compare pb pa
+
| None, Some _ -> 1
+
| Some _, None -> -1
+
| None, None -> Ptime.compare b.updated a.updated
+
) entries
+
| `Updated -> List.sort (fun a b -> Ptime.compare b.updated a.updated) entries
+
| `Stored -> List.sort (fun a b -> Ptime.compare b.stored_at a.stored_at) entries
+
in
+
(* Limit *)
+
match limit with
+
| None -> entries
+
| Some n -> List.filteri (fun i _ -> i < n) entries
+
+
let exists_entry store ~feed_url ~atom_id =
+
let key = make_entry_key feed_url atom_id in
+
Cacheio.exists store.cache ~key
+
+
let get_recent_entries store ?(limit=50) () =
+
let entries = Cacheio.scan store.cache in
+
let all_entries = List.filter_map (fun (cache_entry : Cacheio.Entry.t) ->
+
let key = Cacheio.Entry.key cache_entry in
+
if String.contains key '/' &&
+
String.ends_with ~suffix:"entries/" (String.sub key 0 (String.rindex key '/') ^ "/") then
+
Eio.Switch.run @@ fun sw ->
+
match Cacheio.get store.cache ~key ~sw with
+
| None -> None
+
| Some source ->
+
try
+
let json_str = Eio.Buf_read.(parse_exn take_all) source ~max_size:Int.max_int in
+
let json = Yojson.Safe.from_string json_str in
+
Some (entry_of_json json)
+
with e ->
+
Log.err (fun m -> m "Failed to parse entry: %s" (Printexc.to_string e));
+
None
+
else None
+
) entries in
+
let sorted = List.sort (fun a b -> Ptime.compare b.updated a.updated) all_entries in
+
List.filteri (fun i _ -> i < limit) sorted
+
+
(* Entry management *)
+
+
let delete_entry store ~feed_url ~atom_id =
+
let key = make_entry_key feed_url atom_id in
+
Cacheio.delete store.cache ~key;
+
Log.info (fun m -> m "Deleted entry %s from feed %s" atom_id feed_url)
+
+
let delete_feed store ~feed_url =
+
let feed_key = make_feed_key feed_url in
+
let prefix = feed_key ^ "/" in
+
let entries = Cacheio.scan store.cache in
+
let count = ref 0 in
+
List.iter (fun (cache_entry : Cacheio.Entry.t) ->
+
let key = Cacheio.Entry.key cache_entry in
+
if String.starts_with ~prefix key then begin
+
Cacheio.delete store.cache ~key;
+
incr count
+
end
+
) entries;
+
Log.info (fun m -> m "Deleted feed %s (%d entries)" feed_url !count)
+
+
let prune_entries store ~feed_url ~keep =
+
let entries = list_entries store ~feed_url in
+
let to_delete = List.filteri (fun i _ -> i >= keep) entries in
+
List.iter (fun entry ->
+
delete_entry store ~feed_url ~atom_id:entry.atom_id
+
) to_delete;
+
let deleted = List.length to_delete in
+
Log.info (fun m -> m "Pruned %d entries from feed %s (kept %d)" deleted feed_url keep);
+
deleted
+
+
let prune_old_entries store ~feed_url ~older_than =
+
let entries = list_entries store ~feed_url in
+
let to_delete = List.filter (fun e ->
+
Ptime.is_earlier e.updated ~than:older_than
+
) entries in
+
List.iter (fun entry ->
+
delete_entry store ~feed_url ~atom_id:entry.atom_id
+
) to_delete;
+
let deleted = List.length to_delete in
+
Log.info (fun m -> m "Pruned %d old entries from feed %s" deleted feed_url);
+
deleted
+
+
(* Feed information *)
+
+
let list_feeds store =
+
let feed_entries = Cacheio.scan store.cache in
+
let feed_metas = List.filter_map (fun (cache_entry : Cacheio.Entry.t) ->
+
let key = Cacheio.Entry.key cache_entry in
+
if String.ends_with ~suffix:"/meta.json" key then
+
Eio.Switch.run @@ fun sw ->
+
match Cacheio.get store.cache ~key ~sw with
+
| None -> None
+
| Some source ->
+
try
+
let json_str = Eio.Buf_read.(parse_exn take_all) source ~max_size:Int.max_int in
+
let json = Yojson.Safe.from_string json_str in
+
Some (feed_meta_of_json json)
+
with e ->
+
Log.err (fun m -> m "Failed to parse feed metadata: %s" (Printexc.to_string e));
+
None
+
else None
+
) feed_entries in
+
(* Count entries for each feed *)
+
List.map (fun meta ->
+
let entries = list_entries store ~feed_url:meta.url in
+
{ meta with entry_count = List.length entries }
+
) feed_metas
+
+
let get_feed_info store ~feed_url =
+
Eio.Switch.run @@ fun sw ->
+
match get_feed_meta store ~feed_url ~sw with
+
| None -> None
+
| Some meta ->
+
let entries = list_entries store ~feed_url in
+
Some { meta with entry_count = List.length entries }
+
+
let stats store =
+
Cacheio.stats store.cache
+
+
(* Maintenance *)
+
+
let expire store =
+
Cacheio.expire store.cache
+
+
let compact _store =
+
(* TODO: Implement compaction logic *)
+
Log.info (fun m -> m "Compaction not yet implemented")
+
+
(* Export/Import *)
+
+
let export_to_atom store ~feed_url ?title ?limit () =
+
let entries = match limit with
+
| None -> list_entries store ~feed_url
+
| Some n -> list_entries_filtered store ~feed_url ~limit:n ()
+
in
+
let atom_entries = List.map (fun entry ->
+
let id = Uri.of_string entry.atom_id in
+
let entry_title : Syndic.Atom.text_construct = Syndic.Atom.Text entry.title in
+
let links = match entry.link with
+
| Some uri -> [Syndic.Atom.link ~rel:Syndic.Atom.Alternate uri]
+
| None -> []
+
in
+
let entry_content : Syndic.Atom.content = Syndic.Atom.Html (None, entry.content) in
+
let author = Syndic.Atom.author ?email:entry.author_email entry.author_name in
+
let authors = (author, []) in
+
Syndic.Atom.entry ~id ~title:entry_title ~updated:entry.updated ?published:entry.published
+
~links ~content:entry_content ~authors ()
+
) entries in
+
let feed_title : Syndic.Atom.text_construct = match title with
+
| Some t -> Syndic.Atom.Text t
+
| None -> Syndic.Atom.Text ("Archive: " ^ feed_url)
+
in
+
let feed_id = Uri.of_string ("urn:river:archive:" ^ (Digest.string feed_url |> Digest.to_hex)) in
+
let feed_updated = match entries with
+
| [] -> Ptime.of_float_s (Unix.gettimeofday ()) |> Option.get
+
| e :: _ -> e.updated
+
in
+
{
+
Syndic.Atom.id = feed_id;
+
title = feed_title;
+
updated = feed_updated;
+
entries = atom_entries;
+
authors = [];
+
categories = [];
+
contributors = [];
+
generator = Some {
+
Syndic.Atom.version = Some "1.0";
+
uri = None;
+
content = "River Store";
+
};
+
icon = None;
+
links = [];
+
logo = None;
+
rights = None;
+
subtitle = None;
+
}
+
+
let import_from_atom store ~feed_url ~feed_name ~feed ~sw =
+
let entries = feed.Syndic.Atom.entries in
+
store_atom_entries store ~feed_url ~feed_name ~feed_title:(Util.string_of_text_construct feed.title) ~entries ~sw;
+
List.length entries
+
+
(* Pretty printing *)
+
+
let pp_entry fmt entry =
+
Format.fprintf fmt "@[<v 2>Entry:@,";
+
Format.fprintf fmt "ID: %s@," entry.atom_id;
+
Format.fprintf fmt "Title: %s@," entry.title;
+
Format.fprintf fmt "Link: %s@," (match entry.link with Some u -> Uri.to_string u | None -> "none");
+
Format.fprintf fmt "Published: %s@," (match entry.published with
+
| Some t -> Ptime.to_rfc3339 t
+
| None -> "unknown");
+
Format.fprintf fmt "Updated: %s@," (Ptime.to_rfc3339 entry.updated);
+
Format.fprintf fmt "Feed: %s (%s)@," entry.feed_name entry.feed_url;
+
Format.fprintf fmt "Stored: %s@]" (Ptime.to_rfc3339 entry.stored_at)
+
+
let pp_feed_info fmt info =
+
Format.fprintf fmt "@[<v 2>Feed:@,";
+
Format.fprintf fmt "Name: %s@," info.name;
+
Format.fprintf fmt "Title: %s@," info.title;
+
Format.fprintf fmt "URL: %s@," info.url;
+
Format.fprintf fmt "Last updated: %s@," (Ptime.to_rfc3339 info.last_updated);
+
Format.fprintf fmt "Entries: %d@]" info.entry_count
+
+
let pp fmt store =
+
let feeds = list_feeds store in
+
Format.fprintf fmt "@[<v 2>River Store:@,";
+
Format.fprintf fmt "Base dir: %a@," Eio.Path.pp store.base_dir;
+
Format.fprintf fmt "Feeds: %d@," (List.length feeds);
+
List.iter (fun feed ->
+
Format.fprintf fmt " - %s: %d entries@," feed.name feed.entry_count
+
) feeds;
+
Format.fprintf fmt "@]"
+234
stack/river/lib/river_store.mli
···
···
+
(** Persistent storage for Atom feed entries using Cacheio
+
+
River_store provides a persistent, per-feed storage system for Atom entries,
+
enabling long-term archival of feed items that may have expired upstream.
+
+
{2 Key Features}
+
+
- {b Per-feed storage}: Each feed's entries stored independently
+
- {b Atom ID keying}: Entries keyed by their unique Atom ID
+
- {b URL resolution}: Resolves all URLs relative to feed base URI
+
- {b Persistent caching}: Built on Cacheio for reliable file storage
+
- {b Entry management}: List, update, delete, and prune operations
+
- {b Metadata tracking}: Stores feed source, timestamps, and relationships
+
+
{2 Usage Example}
+
+
{[
+
let store = River_store.create ~base_dir:store_dir in
+
+
(* Store entries from a feed *)
+
Eio.Switch.run @@ fun sw ->
+
let feed = River.fetch env source in
+
let posts = River.posts [feed] in
+
River_store.store_posts store ~feed_url:source.url ~posts ~sw ();
+
+
(* List entries for a feed *)
+
let entries = River_store.list_entries store ~feed_url:source.url in
+
List.iter (fun entry ->
+
Printf.printf "%s: %s\n" entry.atom_id entry.title
+
) entries;
+
+
(* Get a specific entry *)
+
match River_store.get_entry store ~feed_url:source.url ~atom_id:"..." ~sw with
+
| Some entry -> (* Use entry *)
+
| None -> (* Not found *)
+
]} *)
+
+
(** {1 Core Types} *)
+
+
(** Abstract type representing the store *)
+
type t
+
+
(** Stored entry with resolved URLs and metadata *)
+
type stored_entry = {
+
atom_id : string;
+
(** Unique Atom entry ID (used as key) *)
+
+
title : string;
+
(** Entry title *)
+
+
link : Uri.t option;
+
(** Primary link (resolved against feed base URI) *)
+
+
published : Ptime.t option;
+
(** Publication date *)
+
+
updated : Ptime.t;
+
(** Last update time *)
+
+
author_name : string;
+
(** Entry author name *)
+
+
author_email : string option;
+
(** Entry author email *)
+
+
content : string;
+
(** HTML content with resolved URLs *)
+
+
feed_url : string;
+
(** URL of the source feed *)
+
+
feed_name : string;
+
(** Name of the source feed *)
+
+
feed_title : string;
+
(** Title of the source feed *)
+
+
stored_at : Ptime.t;
+
(** When this entry was stored *)
+
}
+
+
(** Feed metadata *)
+
type feed_info = {
+
url : string;
+
(** Feed URL *)
+
+
name : string;
+
(** Feed name/label *)
+
+
title : string;
+
(** Feed title from metadata *)
+
+
last_updated : Ptime.t;
+
(** Last time feed was synced *)
+
+
entry_count : int;
+
(** Number of stored entries *)
+
}
+
+
(** {1 Store Creation} *)
+
+
(** Create a store at the specified base directory *)
+
val create : base_dir:Eio.Fs.dir_ty Eio.Path.t -> t
+
+
(** Create a store using an Xdge context for XDG-compliant paths *)
+
val create_with_xdge : Xdge.t -> t
+
+
(** {1 Entry Storage} *)
+
+
(** Store a single post entry from a feed *)
+
val store_entry :
+
t ->
+
feed_url:string ->
+
feed_name:string ->
+
feed_title:string ->
+
post:Post.t ->
+
sw:Eio.Switch.t ->
+
unit
+
+
(** Store multiple posts from a feed *)
+
val store_posts :
+
t ->
+
feed_url:string ->
+
feed_name:string ->
+
feed_title:string ->
+
posts:Post.t list ->
+
sw:Eio.Switch.t ->
+
unit
+
+
(** Store entries directly from Syndic.Atom.entry list *)
+
val store_atom_entries :
+
t ->
+
feed_url:string ->
+
feed_name:string ->
+
feed_title:string ->
+
entries:Syndic.Atom.entry list ->
+
sw:Eio.Switch.t ->
+
unit
+
+
(** {1 Entry Retrieval} *)
+
+
(** Get a specific entry by Atom ID *)
+
val get_entry :
+
t ->
+
feed_url:string ->
+
atom_id:string ->
+
sw:Eio.Switch.t ->
+
stored_entry option
+
+
(** List all entries for a feed *)
+
val list_entries : t -> feed_url:string -> stored_entry list
+
+
(** List entries with filtering and sorting options *)
+
val list_entries_filtered :
+
t ->
+
feed_url:string ->
+
?since:Ptime.t ->
+
?until:Ptime.t ->
+
?limit:int ->
+
?sort:[`Published | `Updated | `Stored] ->
+
unit ->
+
stored_entry list
+
+
(** Check if an entry exists *)
+
val exists_entry : t -> feed_url:string -> atom_id:string -> bool
+
+
(** Get the most recent entries across all feeds *)
+
val get_recent_entries : t -> ?limit:int -> unit -> stored_entry list
+
+
(** {1 Entry Management} *)
+
+
(** Delete a specific entry *)
+
val delete_entry : t -> feed_url:string -> atom_id:string -> unit
+
+
(** Delete all entries for a feed *)
+
val delete_feed : t -> feed_url:string -> unit
+
+
(** Prune old entries (keep most recent N per feed) *)
+
val prune_entries : t -> feed_url:string -> keep:int -> int
+
(** Returns number of entries deleted *)
+
+
(** Prune entries older than a given time *)
+
val prune_old_entries : t -> feed_url:string -> older_than:Ptime.t -> int
+
(** Returns number of entries deleted *)
+
+
(** {1 Feed Information} *)
+
+
(** List all feeds that have stored entries *)
+
val list_feeds : t -> feed_info list
+
+
(** Get information about a specific feed *)
+
val get_feed_info : t -> feed_url:string -> feed_info option
+
+
(** Get statistics about the store *)
+
val stats : t -> Cacheio.Stats.t
+
+
(** {1 Maintenance} *)
+
+
(** Clean up expired entries (respects TTL if set) *)
+
val expire : t -> int
+
(** Returns number of entries expired *)
+
+
(** Compact storage (remove duplicate/orphaned data) *)
+
val compact : t -> unit
+
+
(** Export entries to an Atom feed *)
+
val export_to_atom :
+
t ->
+
feed_url:string ->
+
?title:string ->
+
?limit:int ->
+
unit ->
+
Syndic.Atom.feed
+
+
(** Import entries from an Atom feed *)
+
val import_from_atom :
+
t ->
+
feed_url:string ->
+
feed_name:string ->
+
feed:Syndic.Atom.feed ->
+
sw:Eio.Switch.t ->
+
int
+
(** Returns number of entries imported *)
+
+
(** {1 Pretty Printing} *)
+
+
(** Pretty printer for stored entries *)
+
val pp_entry : Format.formatter -> stored_entry -> unit
+
+
(** Pretty printer for feed info *)
+
val pp_feed_info : Format.formatter -> feed_info -> unit
+
+
(** Pretty printer for the store *)
+
val pp : Format.formatter -> t -> unit