(** File-backed streaming cache with Maildir++-style naming *) open Eio (** Module aliases *) module Flags = Flags module Entry = Entry module Stats = Stats (** {1 Internal Key Management} *) let hash_key key = Digestif.SHA256.(to_hex (digest_string key)) let key_to_dirs key = let hash = hash_key key in if String.length hash < 4 then invalid_arg "hash too short" else [ String.sub hash 0 2; String.sub hash 2 2 ] let key_to_filename ?ttl ?flags key = let hash = hash_key key in let hash_rest = String.sub hash 4 (String.length hash - 4) in let ttl_part = match ttl with | None -> "" | Some ttl_val -> Printf.sprintf ":%d" (int_of_float ttl_val) in let flags_part = match flags with | None -> "" | Some f when Flags.is_empty f -> "" | Some f -> "," ^ Flags.to_string f in hash_rest ^ ":2" ^ ttl_part ^ flags_part let parse_filename filename = try match String.split_on_char ':' filename with | hash_rest :: "2" :: rest -> (* Check if this might be a chunk file by looking for range pattern at the end *) let is_chunk, ttl, flags_str = match rest with | [] -> false, None, "" | [ttl_or_flags] -> if String.contains ttl_or_flags ',' then false, None, ttl_or_flags else false, (try Some (float_of_string ttl_or_flags) with _ -> None), "" | _ -> (* Join back and check for chunk pattern *) let rest_str = String.concat ":" rest in (* Check if it ends with a range pattern like :0-9 *) match String.rindex_opt rest_str ':' with | Some idx -> let before_colon = String.sub rest_str 0 idx in let after_colon = String.sub rest_str (idx + 1) (String.length rest_str - idx - 1) in (* Check if after_colon looks like a range *) if String.contains after_colon '-' then (* This is likely a chunk file - parse as chunk *) let ttl, flags_part = match String.split_on_char ',' before_colon with | "" :: flags :: _ -> None, flags | ttl :: flags :: _ -> (try Some (float_of_string ttl) with _ -> None), flags | [ttl_or_flags] -> if String.length ttl_or_flags = 0 then None, "" else if ttl_or_flags.[0] >= '0' && ttl_or_flags.[0] <= '9' then (try Some (float_of_string ttl_or_flags) with _ -> None), "" else None, ttl_or_flags | [] -> None, "" in true, ttl, flags_part else (* Normal file with colons in flags *) false, None, rest_str | None -> (* No more colons, parse normally *) false, None, rest_str in let flags_str = if is_chunk then flags_str (* Already extracted *) else match String.split_on_char ',' flags_str with | "" :: flags_part :: _ | flags_part :: _ -> flags_part | [] -> "" in let flags = Flags.of_string flags_str in Some (hash_rest, ttl, flags) | _ -> None with _ -> None type t = { base_dir : Fs.dir_ty Path.t; tmp_dir : Fs.dir_ty Path.t; mutex : Mutex.t; } (** {1 Internal Helpers} *) (** Get current Unix timestamp *) let now () = Unix.time () (** Check if a TTL has expired *) let is_expired ttl_opt = match ttl_opt with | None -> false | Some ttl -> now () > ttl (** Find a cache file by trying different flag combinations *) let find_file base_dir key = let dirs = key_to_dirs key in let dir_path = List.fold_left (fun acc d -> Path.(acc / d)) base_dir dirs in (* Try to find any file matching the hash pattern *) try let entries = Path.read_dir dir_path in let hash = hash_key key in let hash_rest = String.sub hash 4 (String.length hash - 4) in List.find_map (fun entry -> if String.starts_with ~prefix:(hash_rest ^ ":2") entry then match parse_filename entry with | Some (_, ttl, flags) when not (is_expired ttl) && not (Flags.is_chunk flags) -> Some (Path.(dir_path / entry), ttl, flags) | _ -> None else None ) entries with _ -> None (** Ensure directory structure exists *) let ensure_dirs t key = let dirs = key_to_dirs key in let rec make_dirs base = function | [] -> base | d :: rest -> let next = Path.(base / d) in (try Path.mkdir ~perm:0o755 next with Eio.Io (Eio.Fs.E (Already_exists _), _) -> () | e -> raise e); make_dirs next rest in make_dirs t.base_dir dirs (** {1 Creation} *) let create ~base_dir = (* Note: RNG initialization should be done by the application *) (* For testing, we'll use a simple deterministic approach *) (* Ensure base directory exists *) (try Path.mkdir ~perm:0o755 base_dir with Eio.Io (Eio.Fs.E (Already_exists _), _) -> () | e -> raise e); (* Create tmp directory *) let tmp_dir = Path.(base_dir / "tmp") in (try Path.mkdir ~perm:0o755 tmp_dir with Eio.Io (Eio.Fs.E (Already_exists _), _) -> () | e -> raise e); { base_dir; tmp_dir; mutex = Mutex.create () } (** {1 Streaming Operations} *) let get t ~key ~sw = (* No mutex needed for read operation *) match find_file t.base_dir key with | None -> None | Some (path, _ttl, _flags) -> (* Open file for reading *) let file = Path.open_in ~sw path in Some (file :> Flow.source_ty Resource.t) let put t ~key ~source ?(ttl=None) ?(flags=Flags.empty) () = (* Convert relative TTL to absolute expiry time *) let ttl_abs = match ttl with | None -> None | Some seconds -> Some (now () +. seconds) in Mutex.use_rw ~protect:false t.mutex @@ fun () -> (* Remove any existing file for this key *) (match find_file t.base_dir key with | Some (old_path, _, _) -> (try Path.unlink old_path; (* Also remove the sidecar .key file if it exists *) (* Get the parent directory and filename from the path *) let dirs = key_to_dirs key in let dir_path = List.fold_left (fun acc d -> Path.(acc / d)) t.base_dir dirs in (* We need to find the actual filename to append .key to it *) (match find_file t.base_dir key with | Some (_, _, _) -> (* The file was found, so let's find its name and remove the .key file *) (try let entries = Path.read_dir dir_path in let hash = hash_key key in let hash_rest = String.sub hash 4 (String.length hash - 4) in List.iter (fun entry -> if String.starts_with ~prefix:(hash_rest ^ ":2") entry then let key_file_path = Path.(dir_path / (entry ^ ".key")) in (try Path.unlink key_file_path with _ -> ()) ) entries with _ -> ()) | None -> ()) with Eio.Io (Eio.Fs.E (Not_found _), _) -> () | e -> raise e) | None -> ()); (* Ensure directory structure *) let dir = ensure_dirs t key in (* Generate temporary filename using hashed key *) (* Use timestamp and random for temporary file uniqueness *) let rand_hex = Printf.sprintf "%016Lx" (Random.bits64 ()) in let temp_name = Printf.sprintf "tmp.%s.%s" (hash_key key) rand_hex in let temp_path = Path.(t.tmp_dir / temp_name) in (* Write to temporary file *) Switch.run @@ fun sw -> let sink = Path.open_out ~sw ~create:(`Or_truncate 0o644) temp_path in Flow.copy source sink; Flow.close sink; (* Create final filename and move *) let final_name = key_to_filename ?ttl:ttl_abs ~flags key in let final_path = Path.(dir / final_name) in Path.rename temp_path final_path; (* Write the sidecar .key file containing the original key *) let key_file_path = Path.(dir / (final_name ^ ".key")) in Switch.run @@ fun sw -> let key_sink = Path.open_out ~sw ~create:(`Or_truncate 0o644) key_file_path in Flow.copy (Flow.string_source key) key_sink; Flow.close key_sink (** {1 Basic Operations} *) let exists t ~key = (* No mutex needed for checking existence *) match find_file t.base_dir key with | Some _ -> true | None -> false let delete t ~key = Mutex.use_rw ~protect:false t.mutex @@ fun () -> match find_file t.base_dir key with | Some (path, _, _) -> (try Path.unlink path; (* Also remove the sidecar .key file if it exists *) (* Since we have the full path, we need to extract the directory and filename *) let dirs = key_to_dirs key in let dir_path = List.fold_left (fun acc d -> Path.(acc / d)) t.base_dir dirs in (try let entries = Path.read_dir dir_path in let hash = hash_key key in let hash_rest = String.sub hash 4 (String.length hash - 4) in List.iter (fun entry -> if String.starts_with ~prefix:(hash_rest ^ ":2") entry then let key_file_path = Path.(dir_path / (entry ^ ".key")) in (try Path.unlink key_file_path with _ -> ()) ) entries with _ -> ()) with Eio.Io (Eio.Fs.E (Not_found _), _) -> () | e -> raise e) | None -> () let size t ~key = (* No mutex needed for getting size *) match find_file t.base_dir key with | Some (path, _, _) -> (try Some (Optint.Int63.to_int64 (Path.stat ~follow:false path).size) with _ -> None) | None -> None let get_path t ~key = (* No mutex needed for getting path *) match find_file t.base_dir key with | Some (path, _, _) -> Some path | None -> None (** {1 Flag Operations} *) let set_flags t ~key flags = Mutex.use_rw ~protect:false t.mutex @@ fun () -> match find_file t.base_dir key with | Some (old_path, ttl, _old_flags) -> let dir = ensure_dirs t key in let new_name = key_to_filename ?ttl ~flags key in let new_path = Path.(dir / new_name) in if Path.native_exn old_path <> Path.native_exn new_path then Path.rename old_path new_path | None -> () let add_flag t ~key flag = Mutex.use_rw ~protect:false t.mutex @@ fun () -> match find_file t.base_dir key with | Some (old_path, ttl, flags) -> let new_flags = Flags.add flag flags in if not (Flags.equal flags new_flags) then begin let dir = ensure_dirs t key in let new_name = key_to_filename ?ttl ~flags:new_flags key in let new_path = Path.(dir / new_name) in if Path.native_exn old_path <> Path.native_exn new_path then Path.rename old_path new_path end | None -> () let remove_flag t ~key flag = Mutex.use_rw ~protect:false t.mutex @@ fun () -> match find_file t.base_dir key with | Some (old_path, ttl, flags) -> let new_flags = Flags.remove flag flags in if not (Flags.equal flags new_flags) then begin let dir = ensure_dirs t key in let new_name = key_to_filename ?ttl ~flags:new_flags key in let new_path = Path.(dir / new_name) in if Path.native_exn old_path <> Path.native_exn new_path then Path.rename old_path new_path end | None -> () let get_flags t ~key = (* No mutex needed for getting flags *) match find_file t.base_dir key with | Some (_, _, flags) -> Some flags | None -> None (** {1 Cache Management} *) let scan_dir base_dir acc dir1 = let dir1_path = Path.(base_dir / dir1) in try List.fold_left (fun acc dir2 -> let dir2_path = Path.(dir1_path / dir2) in try List.fold_left (fun acc filename -> (* Skip .key files themselves *) if String.ends_with ~suffix:".key" filename then acc else match parse_filename filename with | None -> acc | Some (hash_rest, ttl, flags) -> let path = Path.(dir2_path / filename) in try let stat = Path.stat ~follow:false path in (* Try to read the original key from the sidecar file *) let key = let key_path = Path.(dir2_path / (filename ^ ".key")) in try Switch.run @@ fun sw -> let source = Path.open_in ~sw key_path in let buf_read = Eio.Buf_read.of_flow ~max_size:4096 source in let content = Eio.Buf_read.take_all buf_read in Eio.Flow.close source; content with _ -> (* Fall back to using the hash if sidecar file doesn't exist *) dir1 ^ dir2 ^ hash_rest in let entry = Entry.create ~key ~size:(Optint.Int63.to_int64 stat.size) ~mtime:stat.mtime ~ttl ~flags in entry :: acc with _ -> acc ) acc (Path.read_dir dir2_path) with _ -> acc ) acc (Path.read_dir dir1_path) with _ -> acc let scan t = (* No mutex needed for read-only directory scan *) try Path.read_dir t.base_dir |> List.filter (fun name -> name <> "tmp") |> List.fold_left (scan_dir t.base_dir) [] with _ -> [] let stats t = let entries = scan t in Stats.of_entries entries let expire t = (* Scan without mutex, then delete with individual locks *) let entries = scan t in List.fold_left (fun count entry -> if Entry.is_expired entry then begin delete t ~key:(Entry.key entry); count + 1 end else count ) 0 entries let clear t = (* Scan without mutex, then delete with individual locks *) let entries = scan t in List.iter (fun entry -> if not (Entry.is_pinned entry) then delete t ~key:(Entry.key entry) ) entries let clear_temporary t = (* Scan without mutex, then delete with individual locks *) let entries = scan t in List.iter (fun entry -> if Entry.is_temporary entry then delete t ~key:(Entry.key entry) ) entries (** {1 Pretty Printing} *) let pp fmt t = Format.fprintf fmt "Cache{base=%s}" (Path.native_exn t.base_dir) (** {1 Convenience Functions} *) let create_with_xdge xdge = let base_dir = Xdge.cache_dir xdge in create ~base_dir (** {1 Chunk Support} *) module Range = Range module Chunk = Chunk let put_chunk t ~key ~range ~source ?(ttl=None) ?(flags=Flags.empty) () = (* Add Chunk flag *) let flags = Flags.add `Chunk flags in (* Generate filename with range *) let hash = hash_key key in let filename = Chunk.make_filename ~hash ~range ?ttl ~flags () in (* Ensure directory structure *) let dir = ensure_dirs t key in (* Generate temporary filename using hashed key *) (* Use timestamp and random for temporary file uniqueness *) let rand_hex = Printf.sprintf "%016Lx" (Random.bits64 ()) in let temp_name = Printf.sprintf "tmp.%s.%s.chunk" (hash_key key) rand_hex in let temp_path = Path.(t.tmp_dir / temp_name) in (* Write to temporary file *) Switch.run @@ fun sw -> let sink = Path.open_out ~sw ~create:(`Or_truncate 0o644) temp_path in Flow.copy source sink; Flow.close sink; (* Move to final location *) let final_path = Path.(dir / filename) in Path.rename temp_path final_path; (* Write the sidecar .key file for chunks too *) let key_file_path = Path.(dir / (filename ^ ".key")) in Switch.run @@ fun sw -> let key_sink = Path.open_out ~sw ~create:(`Or_truncate 0o644) key_file_path in Flow.copy (Flow.string_source key) key_sink; Flow.close key_sink let get_range t ~key ~range ~sw = (* First check for complete file *) match find_file t.base_dir key with | Some (path, _, flags) when Flags.is_complete flags -> (* We have complete file, create a range-limited source *) let file = Path.open_in ~sw path in (* Seek to start position *) (try let (_ : Optint.Int63.t) = Eio.File.seek file (Optint.Int63.of_int64 (Range.start range)) `Set in () with _ -> ()); `Complete (file :> Flow.source_ty Resource.t) | _ -> (* Look for chunks *) let chunks = Chunk.find_chunks ~base_dir:t.base_dir ~key in let overlapping = List.filter (fun c -> Range.overlaps (Chunk.range c) range ) chunks in match overlapping with | [] -> `Not_found | chunks -> let sources = List.map (fun chunk -> let source = Path.open_in ~sw (Chunk.path chunk) in (Chunk.range chunk, (source :> Flow.source_ty Resource.t)) ) (Chunk.sort_by_range chunks) in `Chunks sources let list_chunks t ~key = Chunk.find_chunks ~base_dir:t.base_dir ~key let has_complete_chunks t ~key ~total_size = let chunks = list_chunks t ~key in Chunk.is_complete_sequence chunks ~total_size let missing_ranges t ~key ~total_size = let chunks = list_chunks t ~key in Chunk.missing_ranges chunks ~total_size let coalesce_chunks t ~key ?(verify=false) ?(flags=Flags.empty) ?(ttl=None) () = (* Note: This returns a unit Promise that will be resolved when coalescing is done. The caller should provide their own switch context for async execution. *) let promise, resolver = Eio.Promise.create () in (Mutex.use_rw ~protect:false t.mutex @@ fun () -> let chunks = list_chunks t ~key in let sorted = Chunk.sort_by_range chunks in match sorted with | [] -> Eio.Promise.resolve resolver (Error (Failure "No chunks found")) | _ -> (* Calculate total size from chunks *) let last = List.hd (List.rev sorted) in let total_size = Int64.succ (Range.end_ (Chunk.range last)) in (* Verify we have complete sequence *) if not (Chunk.is_complete_sequence sorted ~total_size) then Eio.Promise.resolve resolver (Error (Failure "Chunks do not form complete sequence")) else begin (* Create final file *) let final_filename = key_to_filename ?ttl ~flags key in (* Use timestamp and random for temporary file uniqueness *) let rand_hex = Printf.sprintf "%016Lx" (Random.bits64 ()) in let temp_name = Printf.sprintf "tmp.%s.%s.coalesce" (hash_key key) rand_hex in let temp_path = Path.(t.tmp_dir / temp_name) in let dir = ensure_dirs t key in let final_path = Path.(dir / final_filename) in try (* Concatenate all chunks *) Switch.run @@ fun sw -> let sink = Path.open_out ~sw ~create:(`Or_truncate 0o644) temp_path in List.iter (fun chunk -> let source = Path.open_in ~sw (Chunk.path chunk) in Flow.copy source sink; Flow.close source ) sorted; Flow.close sink; (* If verify is requested, check the assembled file's size *) let verification_ok = if verify then (* Verify that the final file has expected size *) try let stat = Path.stat ~follow:false temp_path in let file_size = Optint.Int63.to_int64 stat.size in file_size = total_size with _ -> false else true in if not verification_ok then begin (* Delete temp file and fail *) (try Path.unlink temp_path with _ -> ()); Eio.Promise.resolve resolver (Error (Failure "Size verification failed")) end else begin (* Atomic rename *) Path.rename temp_path final_path; (* Clean up chunks *) List.iter (fun chunk -> try Path.unlink (Chunk.path chunk) with _ -> () ) chunks; (* Resolve the promise with success *) Eio.Promise.resolve resolver (Ok ()) end with exn -> (* Clean up temp file on error *) (try Path.unlink temp_path with _ -> ()); Eio.Promise.resolve resolver (Error exn) end); promise let cleanup_chunks t ~key = let chunks = list_chunks t ~key in List.iter (fun chunk -> try let path = Chunk.path chunk in Path.unlink path; (* Also remove the sidecar .key file if it exists *) (* We need to derive the key file path from the chunk path *) (* Since chunk.path is the full path to the chunk file, we can't decompose it easily *) (* Instead, let's find and remove any matching .key files in the directory *) let dirs = key_to_dirs (Chunk.key chunk) in let dir_path = List.fold_left (fun acc d -> Path.(acc / d)) t.base_dir dirs in (try let entries = Path.read_dir dir_path in List.iter (fun entry -> if String.ends_with ~suffix:".key" entry && String.starts_with ~prefix:(Chunk.hash chunk |> fun h -> String.sub h 4 (String.length h - 4)) entry then (try Path.unlink Path.(dir_path / entry) with _ -> ()) ) entries with _ -> ()) with _ -> () ) chunks