···
-
let src = Logs.Src.create "requests.cache" ~doc:"HTTP cache with cacheio"
-
module Log = (val Logs.src_log src : Logs.LOG)
-
type cached_response = {
-
status : Cohttp.Code.status_code;
-
headers : Cohttp.Header.t;
-
cache_get_requests : bool;
-
cache_range_requests : bool;
-
cacheio : Cacheio.t option;
-
memory_cache : (string, cached_response * float) Hashtbl.t;
-
let create ~sw ~enabled ?(cache_get_requests=true) ?(cache_range_requests=true) ~cache_dir () =
-
| Some dir when enabled ->
-
Some (Cacheio.create ~base_dir:dir)
-
Log.warn (fun m -> m "Failed to create cacheio backend: %s. Using memory cache only."
-
(Printexc.to_string e));
-
{ sw; enabled; cache_get_requests; cache_range_requests; cacheio;
-
memory_cache = Hashtbl.create 100 }
-
let make_cache_key ~method_ ~url ~headers =
-
let method_str = match method_ with
-
| `GET -> "GET" | `HEAD -> "HEAD"
-
let url_str = Uri.to_string url in
-
let range_str = match Cohttp.Header.get headers "range" with
-
| Some r -> "_range:" ^ r
-
Printf.sprintf "%s_%s%s" method_str url_str range_str
-
let is_cacheable ~method_ ~status ~headers =
-
let code = Cohttp.Code.code_of_status status in
-
if code >= 200 && code < 300 then
-
match Cohttp.Header.get headers "cache-control" with
-
let cc_lower = String.lowercase_ascii cc in
-
let rec contains s sub pos =
-
if pos + String.length sub > String.length s then false
-
else if String.sub s pos (String.length sub) = sub then true
-
else contains s sub (pos + 1)
-
not (contains cc_lower "no-store" 0 ||
-
contains cc_lower "no-cache" 0 ||
-
contains cc_lower "private" 0)
-
code = 301 || code = 308
-
let parse_max_age headers =
-
match Cohttp.Header.get headers "cache-control" with
-
let parts = String.split_on_char ',' cc |> List.map String.trim in
-
List.find_map (fun part ->
-
let prefix = "max-age=" in
-
if String.starts_with ~prefix part then
-
let value = String.sub part (String.length prefix)
-
(String.length part - String.length prefix) in
-
try Some (float_of_string value) with _ -> None
-
(* JSON codec for cache metadata *)
-
module Metadata = struct
-
headers : (string * string) list;
-
let make status_code headers = { status_code; headers }
-
let status_code t = t.status_code
-
let headers t = t.headers
-
let header_pair_jsont =
-
let dec x y = (x, y) in
-
let enc (x, y) i = if i = 0 then x else y in
-
Jsont.t2 ~dec ~enc Jsont.string
-
Jsont.Object.map ~kind:"CacheMetadata" make
-
|> Jsont.Object.mem "status_code" Jsont.int ~enc:status_code
-
|> Jsont.Object.mem "headers" (Jsont.list header_pair_jsont) ~enc:headers
-
let serialize_metadata ~status ~headers =
-
let status_code = Cohttp.Code.code_of_status status in
-
let headers_assoc = Cohttp.Header.to_list headers in
-
let metadata = Metadata.make status_code headers_assoc in
-
match Jsont_bytesrw.encode_string' Metadata.t_jsont metadata with
-
| Error e -> failwith (Fmt.str "Failed to serialize metadata: %s" (Jsont.Error.to_string e))
-
let deserialize_metadata json_str =
-
match Jsont_bytesrw.decode_string' Metadata.t_jsont json_str with
-
let status = Cohttp.Code.status_of_code (Metadata.status_code metadata) in
-
let headers = Cohttp.Header.of_list (Metadata.headers metadata) in
-
let get t ~method_ ~url ~headers =
-
if not t.enabled then None
-
else if method_ = `GET && not t.cache_get_requests then None
-
let key = make_cache_key ~method_ ~url ~headers in
-
(* Try cacheio first *)
-
(* Check for metadata entry *)
-
let metadata_key = key ^ ".meta" in
-
let body_key = key ^ ".body" in
-
if Cacheio.exists cache ~key:metadata_key && Cacheio.exists cache ~key:body_key then
-
Eio.Switch.run @@ fun sw ->
-
let metadata_opt = match Cacheio.get cache ~key:metadata_key ~sw with
-
let buf = Buffer.create 256 in
-
Eio.Flow.copy source (Eio.Flow.buffer_sink buf);
-
deserialize_metadata (Buffer.contents buf)
-
(match metadata_opt with
-
| Some (status, resp_headers) ->
-
(match Cacheio.get cache ~key:body_key ~sw with
-
let buf = Buffer.create 4096 in
-
Eio.Flow.copy source (Eio.Flow.buffer_sink buf);
-
let body = Buffer.contents buf in
-
Log.debug (fun m -> m "Cache hit for %s" (Uri.to_string url));
-
Some { status; headers = resp_headers; body }
-
Log.debug (fun m -> m "Cache body missing for %s" (Uri.to_string url));
-
Log.debug (fun m -> m "Cache metadata missing for %s" (Uri.to_string url));
-
(Log.debug (fun m -> m "Cache miss for %s" (Uri.to_string url));
-
(* Fall back to memory cache *)
-
match Hashtbl.find_opt t.memory_cache key with
-
| Some (response, expiry) when expiry > Unix.gettimeofday () ->
-
Log.debug (fun m -> m "Memory cache hit for %s" (Uri.to_string url));
-
Log.debug (fun m -> m "Cache miss for %s" (Uri.to_string url));
-
let get_stream t ~method_ ~url ~headers ~sw =
-
if not t.enabled then None
-
else if method_ = `GET && not t.cache_get_requests then None
-
let key = make_cache_key ~method_ ~url ~headers in
-
let metadata_key = key ^ ".meta" in
-
let body_key = key ^ ".body" in
-
if Cacheio.exists cache ~key:metadata_key && Cacheio.exists cache ~key:body_key then
-
(* Read metadata first *)
-
match Cacheio.get cache ~key:metadata_key ~sw with
-
let buf = Buffer.create 256 in
-
Eio.Flow.copy source (Eio.Flow.buffer_sink buf);
-
deserialize_metadata (Buffer.contents buf)
-
(match metadata_opt with
-
| Some (status, resp_headers) ->
-
(* Return body stream directly *)
-
(match Cacheio.get cache ~key:body_key ~sw with
-
Log.debug (fun m -> m "Streaming cache hit for %s" (Uri.to_string url));
-
Some (status, resp_headers, source)
-
let put t ~method_ ~url ~request_headers ~status ~headers ~body =
-
if not t.enabled then ()
-
else if is_cacheable ~method_ ~status ~headers then
-
let key = make_cache_key ~method_ ~url ~headers:request_headers in
-
let ttl = parse_max_age headers in
-
Log.debug (fun m -> m "Caching response for %s (ttl: %s)"
-
(match ttl with Some t -> Printf.sprintf "%.0fs" t | None -> "3600s"));
-
Eio.Switch.run @@ fun _sw ->
-
let metadata_key = key ^ ".meta" in
-
let metadata = serialize_metadata ~status ~headers in
-
let metadata_source = Eio.Flow.string_source metadata in
-
Cacheio.put cache ~key:metadata_key ~source:metadata_source ~ttl ();
-
let body_key = key ^ ".body" in
-
let body_source = Eio.Flow.string_source body in
-
Cacheio.put cache ~key:body_key ~source:body_source ~ttl ()
-
let cached_resp = { status; headers; body } in
-
let expiry = Unix.gettimeofday () +. Option.value ttl ~default:3600.0 in
-
Hashtbl.replace t.memory_cache key (cached_resp, expiry)
-
let put_stream t ~method_ ~url ~request_headers ~status ~headers ~body_source ~ttl =
-
if not t.enabled then ()
-
else if is_cacheable ~method_ ~status ~headers then
-
let key = make_cache_key ~method_ ~url ~headers:request_headers in
-
Log.debug (fun m -> m "Caching streamed response for %s (ttl: %s)"
-
(match ttl with Some t -> Printf.sprintf "%.0fs" t | None -> "3600s"));
-
Eio.Switch.run @@ fun _sw ->
-
let metadata_key = key ^ ".meta" in
-
let metadata = serialize_metadata ~status ~headers in
-
let metadata_source = Eio.Flow.string_source metadata in
-
Cacheio.put cache ~key:metadata_key ~source:metadata_source ~ttl ();
-
(* Store body directly from source *)
-
let body_key = key ^ ".body" in
-
Cacheio.put cache ~key:body_key ~source:body_source ~ttl ()
-
end_ : int64 option; (* None means to end of file *)
-
(* Parse Range: bytes=start-end *)
-
let prefix = "bytes=" in
-
let prefix_len = String.length prefix in
-
if String.length header >= prefix_len &&
-
String.sub header 0 prefix_len = prefix then
-
let range_str = String.sub header prefix_len (String.length header - prefix_len) in
-
match String.split_on_char '-' range_str with
-
(* bytes=N- means from N to end *)
-
(try Some { start = Int64.of_string start; end_ = None }
-
start = Int64.of_string start;
-
end_ = Some (Int64.of_string end_)
-
| None -> Printf.sprintf "bytes=%Ld-" t.start
-
| Some e -> Printf.sprintf "bytes=%Ld-%Ld" t.start e
-
let to_cacheio_range t ~total_size =
-
let end_ = match t.end_ with
-
| None -> Int64.pred total_size
-
| Some e -> min e (Int64.pred total_size)
-
(* Convert to Cacheio.Range.t *)
-
Cacheio.Range.create ~start:t.start ~end_
-
let download_range t ~sw ~url ~range ~on_chunk =
-
let range_header = Range.to_header range in
-
Log.debug (fun m -> m "Range request for %s: %s"
-
(Uri.to_string url) range_header);
-
let key = Uri.to_string url in
-
let cacheio_range = Range.to_cacheio_range range ~total_size:Int64.max_int in
-
(match Cacheio.get_range cache ~key ~range:cacheio_range ~sw with
-
let rec read_chunks () =
-
let chunk = Cstruct.create 8192 in
-
let n = Eio.Flow.single_read source chunk in
-
on_chunk (Cstruct.to_string ~off:0 ~len:n chunk);
-
| `Chunks chunk_sources ->
-
List.iter (fun (_range, source) ->
-
let rec read_chunk () =
-
let chunk = Cstruct.create 8192 in
-
let n = Eio.Flow.single_read source chunk in
-
on_chunk (Cstruct.to_string ~off:0 ~len:n chunk);
-
let put_chunk t ~url ~range ~data =
-
if not t.enabled || not t.cache_range_requests then ()
-
let key = Uri.to_string url in
-
let cacheio_range = Range.to_cacheio_range range ~total_size:Int64.max_int in
-
Eio.Switch.run @@ fun _sw ->
-
let source = Eio.Flow.string_source data in
-
Cacheio.put_chunk cache ~key ~range:cacheio_range ~source ()
-
Log.debug (fun m -> m "Cannot cache chunk for %s: no cacheio backend"
-
let has_complete t ~url ~total_size =
-
if not t.enabled then false
-
let key = Uri.to_string url in
-
Cacheio.has_complete_chunks cache ~key ~total_size
-
let missing_ranges t ~url ~total_size =
-
[{ Range.start = 0L; end_ = Some (Int64.pred total_size) }]
-
let key = Uri.to_string url in
-
let cacheio_ranges = Cacheio.missing_ranges cache ~key ~total_size in
-
{ Range.start = Cacheio.Range.start r;
-
end_ = Some (Cacheio.Range.end_ r) }
-
[{ Range.start = 0L; end_ = Some (Int64.pred total_size) }]
-
let coalesce_chunks t ~url =
-
if not t.enabled then false
-
let key = Uri.to_string url in
-
let promise = Cacheio.coalesce_chunks cache ~key ~verify:true () in
-
(match Eio.Promise.await promise with
-
Log.info (fun m -> m "Successfully coalesced chunks for %s" key);
-
Log.warn (fun m -> m "Failed to coalesce chunks for %s: %s"
-
key (Printexc.to_string exn));
-
if not t.enabled then ()
-
let key = make_cache_key ~method_:`GET ~url ~headers:(Cohttp.Header.init ()) in
-
Cacheio.delete cache ~key:(key ^ ".meta");
-
Cacheio.delete cache ~key:(key ^ ".body")
-
Log.debug (fun m -> m "Evicting cache for %s" (Uri.to_string url));
-
Hashtbl.remove t.memory_cache key
-
Log.info (fun m -> m "Clearing entire cache");
-
| Some cache -> Cacheio.clear cache
-
Hashtbl.clear t.memory_cache
-
temporary_entries : int;
-
memory_cache_entries : int;
-
cache_backend : string;
-
cache_get_requests : bool;
-
cache_range_requests : bool;
-
cacheio_stats : cacheio_stats option;
-
let make_cacheio_stats total_entries total_bytes expired_entries pinned_entries temporary_entries =
-
{ total_entries; total_bytes; expired_entries; pinned_entries; temporary_entries }
-
let make memory_cache_entries cache_backend enabled cache_get_requests cache_range_requests cacheio_stats =
-
{ memory_cache_entries; cache_backend; enabled; cache_get_requests; cache_range_requests; cacheio_stats }
-
let cacheio_stats_jsont =
-
Jsont.Object.map ~kind:"CacheioStats" make_cacheio_stats
-
|> Jsont.Object.mem "total_entries" Jsont.int ~enc:(fun t -> t.total_entries)
-
|> Jsont.Object.mem "total_bytes" Jsont.int ~enc:(fun t -> t.total_bytes)
-
|> Jsont.Object.mem "expired_entries" Jsont.int ~enc:(fun t -> t.expired_entries)
-
|> Jsont.Object.mem "pinned_entries" Jsont.int ~enc:(fun t -> t.pinned_entries)
-
|> Jsont.Object.mem "temporary_entries" Jsont.int ~enc:(fun t -> t.temporary_entries)
-
Jsont.Object.map ~kind:"CacheStats" make
-
|> Jsont.Object.mem "memory_cache_entries" Jsont.int ~enc:(fun t -> t.memory_cache_entries)
-
|> Jsont.Object.mem "cache_backend" Jsont.string ~enc:(fun t -> t.cache_backend)
-
|> Jsont.Object.mem "enabled" Jsont.bool ~enc:(fun t -> t.enabled)
-
|> Jsont.Object.mem "cache_get_requests" Jsont.bool ~enc:(fun t -> t.cache_get_requests)
-
|> Jsont.Object.mem "cache_range_requests" Jsont.bool ~enc:(fun t -> t.cache_range_requests)
-
|> Jsont.Object.opt_mem "cacheio_stats" cacheio_stats_jsont ~enc:(fun t -> t.cacheio_stats)
-
match Jsont_bytesrw.encode_string' ~format:Jsont.Indent t_jsont t with
-
let msg = Jsont.Error.to_string e in
-
failwith (Printf.sprintf "Failed to encode stats: %s" msg)
-
let stats = Cacheio.stats cache in
-
Some (Stats.make_cacheio_stats
-
(Cacheio.Stats.entry_count stats)
-
(Int64.to_int (Cacheio.Stats.total_size stats))
-
(Cacheio.Stats.expired_count stats)
-
(Cacheio.Stats.pinned_count stats)
-
(Cacheio.Stats.temporary_count stats))
-
(Hashtbl.length t.memory_cache)
-
(if Option.is_some t.cacheio then "cacheio" else "memory")