My agentic slop goes here. Not intended for anyone else!
1(** File-backed streaming cache with Maildir++-style naming *) 2 3open Eio 4 5(** Module aliases *) 6module Flags = Flags 7module Entry = Entry 8module Stats = Stats 9 10(** {1 Internal Key Management} *) 11 12let hash_key key = 13 Digestif.SHA256.(to_hex (digest_string key)) 14 15let key_to_dirs key = 16 let hash = hash_key key in 17 if String.length hash < 4 then 18 invalid_arg "hash too short" 19 else 20 [ String.sub hash 0 2; 21 String.sub hash 2 2 ] 22 23let key_to_filename ?ttl ?flags key = 24 let hash = hash_key key in 25 let hash_rest = String.sub hash 4 (String.length hash - 4) in 26 27 let ttl_part = match ttl with 28 | None -> "" 29 | Some ttl_val -> Printf.sprintf ":%d" (int_of_float ttl_val) 30 in 31 32 let flags_part = match flags with 33 | None -> "" 34 | Some f when Flags.is_empty f -> "" 35 | Some f -> "," ^ Flags.to_string f 36 in 37 38 hash_rest ^ ":2" ^ ttl_part ^ flags_part 39 40let parse_filename filename = 41 try 42 match String.split_on_char ':' filename with 43 | hash_rest :: "2" :: rest -> 44 (* Check if this might be a chunk file by looking for range pattern at the end *) 45 let is_chunk, ttl, flags_str = 46 match rest with 47 | [] -> false, None, "" 48 | [ttl_or_flags] -> 49 if String.contains ttl_or_flags ',' then 50 false, None, ttl_or_flags 51 else 52 false, (try Some (float_of_string ttl_or_flags) with _ -> None), "" 53 | _ -> 54 (* Join back and check for chunk pattern *) 55 let rest_str = String.concat ":" rest in 56 (* Check if it ends with a range pattern like :0-9 *) 57 match String.rindex_opt rest_str ':' with 58 | Some idx -> 59 let before_colon = String.sub rest_str 0 idx in 60 let after_colon = String.sub rest_str (idx + 1) (String.length rest_str - idx - 1) in 61 (* Check if after_colon looks like a range *) 62 if String.contains after_colon '-' then 63 (* This is likely a chunk file - parse as chunk *) 64 let ttl, flags_part = 65 match String.split_on_char ',' before_colon with 66 | "" :: flags :: _ -> None, flags 67 | ttl :: flags :: _ -> 68 (try Some (float_of_string ttl) with _ -> None), flags 69 | [ttl_or_flags] -> 70 if String.length ttl_or_flags = 0 then 71 None, "" 72 else if ttl_or_flags.[0] >= '0' && ttl_or_flags.[0] <= '9' then 73 (try Some (float_of_string ttl_or_flags) with _ -> None), "" 74 else 75 None, ttl_or_flags 76 | [] -> None, "" 77 in 78 true, ttl, flags_part 79 else 80 (* Normal file with colons in flags *) 81 false, None, rest_str 82 | None -> 83 (* No more colons, parse normally *) 84 false, None, rest_str 85 in 86 87 let flags_str = 88 if is_chunk then 89 flags_str (* Already extracted *) 90 else 91 match String.split_on_char ',' flags_str with 92 | "" :: flags_part :: _ | flags_part :: _ -> flags_part 93 | [] -> "" 94 in 95 96 let flags = Flags.of_string flags_str in 97 Some (hash_rest, ttl, flags) 98 | _ -> None 99 with _ -> None 100 101type t = { 102 base_dir : Fs.dir_ty Path.t; 103 tmp_dir : Fs.dir_ty Path.t; 104 mutex : Mutex.t; 105} 106 107(** {1 Internal Helpers} *) 108 109(** Get current Unix timestamp *) 110let now () = Unix.time () 111 112(** Check if a TTL has expired *) 113let is_expired ttl_opt = 114 match ttl_opt with 115 | None -> false 116 | Some ttl -> now () > ttl 117 118(** Find a cache file by trying different flag combinations *) 119let find_file base_dir key = 120 let dirs = key_to_dirs key in 121 let dir_path = List.fold_left (fun acc d -> Path.(acc / d)) base_dir dirs in 122 123 (* Try to find any file matching the hash pattern *) 124 try 125 let entries = Path.read_dir dir_path in 126 let hash = hash_key key in 127 let hash_rest = String.sub hash 4 (String.length hash - 4) in 128 129 List.find_map (fun entry -> 130 if String.starts_with ~prefix:(hash_rest ^ ":2") entry then 131 match parse_filename entry with 132 | Some (_, ttl, flags) when not (is_expired ttl) && not (Flags.is_chunk flags) -> 133 Some (Path.(dir_path / entry), ttl, flags) 134 | _ -> None 135 else None 136 ) entries 137 with _ -> None 138 139(** Ensure directory structure exists *) 140let ensure_dirs t key = 141 let dirs = key_to_dirs key in 142 let rec make_dirs base = function 143 | [] -> base 144 | d :: rest -> 145 let next = Path.(base / d) in 146 (try Path.mkdir ~perm:0o755 next 147 with Eio.Io (Eio.Fs.E (Already_exists _), _) -> () 148 | e -> raise e); 149 make_dirs next rest 150 in 151 make_dirs t.base_dir dirs 152 153(** {1 Creation} *) 154 155let create ~base_dir = 156 (* Note: RNG initialization should be done by the application *) 157 (* For testing, we'll use a simple deterministic approach *) 158 159 (* Ensure base directory exists *) 160 (try Path.mkdir ~perm:0o755 base_dir 161 with Eio.Io (Eio.Fs.E (Already_exists _), _) -> () 162 | e -> raise e); 163 164 (* Create tmp directory *) 165 let tmp_dir = Path.(base_dir / "tmp") in 166 (try Path.mkdir ~perm:0o755 tmp_dir 167 with Eio.Io (Eio.Fs.E (Already_exists _), _) -> () 168 | e -> raise e); 169 170 { base_dir; tmp_dir; mutex = Mutex.create () } 171 172(** {1 Streaming Operations} *) 173 174let get t ~key ~sw = 175 (* No mutex needed for read operation *) 176 match find_file t.base_dir key with 177 | None -> None 178 | Some (path, _ttl, _flags) -> 179 (* Open file for reading *) 180 let file = Path.open_in ~sw path in 181 Some (file :> Flow.source_ty Resource.t) 182 183let put t ~key ~source ?(ttl=None) ?(flags=Flags.empty) () = 184 185 (* Convert relative TTL to absolute expiry time *) 186 let ttl_abs = match ttl with 187 | None -> None 188 | Some seconds -> Some (now () +. seconds) 189 in 190 191 Mutex.use_rw ~protect:false t.mutex @@ fun () -> 192 193 (* Remove any existing file for this key *) 194 (match find_file t.base_dir key with 195 | Some (old_path, _, _) -> 196 (try 197 Path.unlink old_path; 198 (* Also remove the sidecar .key file if it exists *) 199 (* Get the parent directory and filename from the path *) 200 let dirs = key_to_dirs key in 201 let dir_path = List.fold_left (fun acc d -> Path.(acc / d)) t.base_dir dirs in 202 (* We need to find the actual filename to append .key to it *) 203 (match find_file t.base_dir key with 204 | Some (_, _, _) -> 205 (* The file was found, so let's find its name and remove the .key file *) 206 (try 207 let entries = Path.read_dir dir_path in 208 let hash = hash_key key in 209 let hash_rest = String.sub hash 4 (String.length hash - 4) in 210 List.iter (fun entry -> 211 if String.starts_with ~prefix:(hash_rest ^ ":2") entry then 212 let key_file_path = Path.(dir_path / (entry ^ ".key")) in 213 (try Path.unlink key_file_path with _ -> ()) 214 ) entries 215 with _ -> ()) 216 | None -> ()) 217 with Eio.Io (Eio.Fs.E (Not_found _), _) -> () 218 | e -> raise e) 219 | None -> ()); 220 221 (* Ensure directory structure *) 222 let dir = ensure_dirs t key in 223 224 (* Generate temporary filename using hashed key *) 225 (* Use timestamp and random for temporary file uniqueness *) 226 let rand_hex = Printf.sprintf "%016Lx" (Random.bits64 ()) in 227 let temp_name = Printf.sprintf "tmp.%s.%s" 228 (hash_key key) 229 rand_hex in 230 let temp_path = Path.(t.tmp_dir / temp_name) in 231 232 (* Write to temporary file *) 233 Switch.run @@ fun sw -> 234 let sink = Path.open_out ~sw ~create:(`Or_truncate 0o644) temp_path in 235 Flow.copy source sink; 236 Flow.close sink; 237 238 (* Create final filename and move *) 239 let final_name = key_to_filename ?ttl:ttl_abs ~flags key in 240 let final_path = Path.(dir / final_name) in 241 Path.rename temp_path final_path; 242 243 (* Write the sidecar .key file containing the original key *) 244 let key_file_path = Path.(dir / (final_name ^ ".key")) in 245 Switch.run @@ fun sw -> 246 let key_sink = Path.open_out ~sw ~create:(`Or_truncate 0o644) key_file_path in 247 Flow.copy (Flow.string_source key) key_sink; 248 Flow.close key_sink 249 250(** {1 Basic Operations} *) 251 252let exists t ~key = 253 (* No mutex needed for checking existence *) 254 match find_file t.base_dir key with 255 | Some _ -> true 256 | None -> false 257 258let delete t ~key = 259 Mutex.use_rw ~protect:false t.mutex @@ fun () -> 260 match find_file t.base_dir key with 261 | Some (path, _, _) -> 262 (try 263 Path.unlink path; 264 (* Also remove the sidecar .key file if it exists *) 265 (* Since we have the full path, we need to extract the directory and filename *) 266 let dirs = key_to_dirs key in 267 let dir_path = List.fold_left (fun acc d -> Path.(acc / d)) t.base_dir dirs in 268 (try 269 let entries = Path.read_dir dir_path in 270 let hash = hash_key key in 271 let hash_rest = String.sub hash 4 (String.length hash - 4) in 272 List.iter (fun entry -> 273 if String.starts_with ~prefix:(hash_rest ^ ":2") entry then 274 let key_file_path = Path.(dir_path / (entry ^ ".key")) in 275 (try Path.unlink key_file_path with _ -> ()) 276 ) entries 277 with _ -> ()) 278 with Eio.Io (Eio.Fs.E (Not_found _), _) -> () 279 | e -> raise e) 280 | None -> () 281 282let size t ~key = 283 (* No mutex needed for getting size *) 284 match find_file t.base_dir key with 285 | Some (path, _, _) -> 286 (try Some (Optint.Int63.to_int64 (Path.stat ~follow:false path).size) 287 with _ -> None) 288 | None -> None 289 290let get_path t ~key = 291 (* No mutex needed for getting path *) 292 match find_file t.base_dir key with 293 | Some (path, _, _) -> Some path 294 | None -> None 295 296(** {1 Flag Operations} *) 297 298let set_flags t ~key flags = 299 Mutex.use_rw ~protect:false t.mutex @@ fun () -> 300 match find_file t.base_dir key with 301 | Some (old_path, ttl, _old_flags) -> 302 let dir = ensure_dirs t key in 303 let new_name = key_to_filename ?ttl ~flags key in 304 let new_path = Path.(dir / new_name) in 305 if Path.native_exn old_path <> Path.native_exn new_path then 306 Path.rename old_path new_path 307 | None -> () 308 309let add_flag t ~key flag = 310 Mutex.use_rw ~protect:false t.mutex @@ fun () -> 311 match find_file t.base_dir key with 312 | Some (old_path, ttl, flags) -> 313 let new_flags = Flags.add flag flags in 314 if not (Flags.equal flags new_flags) then begin 315 let dir = ensure_dirs t key in 316 let new_name = key_to_filename ?ttl ~flags:new_flags key in 317 let new_path = Path.(dir / new_name) in 318 if Path.native_exn old_path <> Path.native_exn new_path then 319 Path.rename old_path new_path 320 end 321 | None -> () 322 323let remove_flag t ~key flag = 324 Mutex.use_rw ~protect:false t.mutex @@ fun () -> 325 match find_file t.base_dir key with 326 | Some (old_path, ttl, flags) -> 327 let new_flags = Flags.remove flag flags in 328 if not (Flags.equal flags new_flags) then begin 329 let dir = ensure_dirs t key in 330 let new_name = key_to_filename ?ttl ~flags:new_flags key in 331 let new_path = Path.(dir / new_name) in 332 if Path.native_exn old_path <> Path.native_exn new_path then 333 Path.rename old_path new_path 334 end 335 | None -> () 336 337let get_flags t ~key = 338 (* No mutex needed for getting flags *) 339 match find_file t.base_dir key with 340 | Some (_, _, flags) -> Some flags 341 | None -> None 342 343(** {1 Cache Management} *) 344 345let scan_dir base_dir acc dir1 = 346 let dir1_path = Path.(base_dir / dir1) in 347 try 348 List.fold_left (fun acc dir2 -> 349 let dir2_path = Path.(dir1_path / dir2) in 350 try 351 List.fold_left (fun acc filename -> 352 (* Skip .key files themselves *) 353 if String.ends_with ~suffix:".key" filename then 354 acc 355 else 356 match parse_filename filename with 357 | None -> acc 358 | Some (hash_rest, ttl, flags) -> 359 let path = Path.(dir2_path / filename) in 360 try 361 let stat = Path.stat ~follow:false path in 362 (* Try to read the original key from the sidecar file *) 363 let key = 364 let key_path = Path.(dir2_path / (filename ^ ".key")) in 365 try 366 Switch.run @@ fun sw -> 367 let source = Path.open_in ~sw key_path in 368 let buf_read = Eio.Buf_read.of_flow ~max_size:4096 source in 369 let content = Eio.Buf_read.take_all buf_read in 370 Eio.Flow.close source; 371 content 372 with _ -> 373 (* Fall back to using the hash if sidecar file doesn't exist *) 374 dir1 ^ dir2 ^ hash_rest 375 in 376 let entry = Entry.create 377 ~key 378 ~size:(Optint.Int63.to_int64 stat.size) 379 ~mtime:stat.mtime 380 ~ttl 381 ~flags 382 in 383 entry :: acc 384 with _ -> acc 385 ) acc (Path.read_dir dir2_path) 386 with _ -> acc 387 ) acc (Path.read_dir dir1_path) 388 with _ -> acc 389 390let scan t = 391 (* No mutex needed for read-only directory scan *) 392 try 393 Path.read_dir t.base_dir 394 |> List.filter (fun name -> name <> "tmp") 395 |> List.fold_left (scan_dir t.base_dir) [] 396 with _ -> [] 397 398let stats t = 399 let entries = scan t in 400 Stats.of_entries entries 401 402let expire t = 403 (* Scan without mutex, then delete with individual locks *) 404 let entries = scan t in 405 List.fold_left (fun count entry -> 406 if Entry.is_expired entry then begin 407 delete t ~key:(Entry.key entry); 408 count + 1 409 end else 410 count 411 ) 0 entries 412 413let clear t = 414 (* Scan without mutex, then delete with individual locks *) 415 let entries = scan t in 416 List.iter (fun entry -> 417 if not (Entry.is_pinned entry) then 418 delete t ~key:(Entry.key entry) 419 ) entries 420 421let clear_temporary t = 422 (* Scan without mutex, then delete with individual locks *) 423 let entries = scan t in 424 List.iter (fun entry -> 425 if Entry.is_temporary entry then 426 delete t ~key:(Entry.key entry) 427 ) entries 428 429(** {1 Pretty Printing} *) 430 431let pp fmt t = 432 Format.fprintf fmt "Cache{base=%s}" 433 (Path.native_exn t.base_dir) 434 435(** {1 Convenience Functions} *) 436 437let create_with_xdge xdge = 438 let base_dir = Xdge.cache_dir xdge in 439 create ~base_dir 440 441(** {1 Chunk Support} *) 442 443module Range = Range 444module Chunk = Chunk 445 446let put_chunk t ~key ~range ~source ?(ttl=None) ?(flags=Flags.empty) () = 447 (* Add Chunk flag *) 448 let flags = Flags.add `Chunk flags in 449 450 (* Generate filename with range *) 451 let hash = hash_key key in 452 let filename = Chunk.make_filename ~hash ~range ?ttl ~flags () in 453 454 (* Ensure directory structure *) 455 let dir = ensure_dirs t key in 456 457 (* Generate temporary filename using hashed key *) 458 (* Use timestamp and random for temporary file uniqueness *) 459 let rand_hex = Printf.sprintf "%016Lx" (Random.bits64 ()) in 460 let temp_name = Printf.sprintf "tmp.%s.%s.chunk" 461 (hash_key key) rand_hex in 462 let temp_path = Path.(t.tmp_dir / temp_name) in 463 464 (* Write to temporary file *) 465 Switch.run @@ fun sw -> 466 let sink = Path.open_out ~sw ~create:(`Or_truncate 0o644) temp_path in 467 Flow.copy source sink; 468 Flow.close sink; 469 470 (* Move to final location *) 471 let final_path = Path.(dir / filename) in 472 Path.rename temp_path final_path; 473 474 (* Write the sidecar .key file for chunks too *) 475 let key_file_path = Path.(dir / (filename ^ ".key")) in 476 Switch.run @@ fun sw -> 477 let key_sink = Path.open_out ~sw ~create:(`Or_truncate 0o644) key_file_path in 478 Flow.copy (Flow.string_source key) key_sink; 479 Flow.close key_sink 480 481let get_range t ~key ~range ~sw = 482 (* First check for complete file *) 483 match find_file t.base_dir key with 484 | Some (path, _, flags) when Flags.is_complete flags -> 485 (* We have complete file, create a range-limited source *) 486 let file = Path.open_in ~sw path in 487 (* Seek to start position *) 488 (try 489 let (_ : Optint.Int63.t) = Eio.File.seek file (Optint.Int63.of_int64 (Range.start range)) `Set in () 490 with _ -> ()); 491 `Complete (file :> Flow.source_ty Resource.t) 492 493 | _ -> 494 (* Look for chunks *) 495 let chunks = Chunk.find_chunks ~base_dir:t.base_dir ~key in 496 let overlapping = List.filter (fun c -> 497 Range.overlaps (Chunk.range c) range 498 ) chunks in 499 500 match overlapping with 501 | [] -> `Not_found 502 | chunks -> 503 let sources = List.map (fun chunk -> 504 let source = Path.open_in ~sw (Chunk.path chunk) in 505 (Chunk.range chunk, (source :> Flow.source_ty Resource.t)) 506 ) (Chunk.sort_by_range chunks) in 507 `Chunks sources 508 509let list_chunks t ~key = 510 Chunk.find_chunks ~base_dir:t.base_dir ~key 511 512let has_complete_chunks t ~key ~total_size = 513 let chunks = list_chunks t ~key in 514 Chunk.is_complete_sequence chunks ~total_size 515 516let missing_ranges t ~key ~total_size = 517 let chunks = list_chunks t ~key in 518 Chunk.missing_ranges chunks ~total_size 519 520let coalesce_chunks t ~key ?(verify=false) ?(flags=Flags.empty) ?(ttl=None) () = 521 (* Note: This returns a unit Promise that will be resolved when coalescing is done. 522 The caller should provide their own switch context for async execution. *) 523 let promise, resolver = Eio.Promise.create () in 524 (Mutex.use_rw ~protect:false t.mutex @@ fun () -> 525 let chunks = list_chunks t ~key in 526 let sorted = Chunk.sort_by_range chunks in 527 528 match sorted with 529 | [] -> 530 Eio.Promise.resolve resolver (Error (Failure "No chunks found")) 531 | _ -> 532 (* Calculate total size from chunks *) 533 let last = List.hd (List.rev sorted) in 534 let total_size = Int64.succ (Range.end_ (Chunk.range last)) in 535 536 (* Verify we have complete sequence *) 537 if not (Chunk.is_complete_sequence sorted ~total_size) then 538 Eio.Promise.resolve resolver (Error (Failure "Chunks do not form complete sequence")) 539 else begin 540 541 (* Create final file *) 542 let final_filename = key_to_filename ?ttl ~flags key in 543 (* Use timestamp and random for temporary file uniqueness *) 544 let rand_hex = Printf.sprintf "%016Lx" (Random.bits64 ()) in 545 let temp_name = Printf.sprintf "tmp.%s.%s.coalesce" 546 (hash_key key) rand_hex in 547 let temp_path = Path.(t.tmp_dir / temp_name) in 548 let dir = ensure_dirs t key in 549 let final_path = Path.(dir / final_filename) in 550 551 try 552 (* Concatenate all chunks *) 553 Switch.run @@ fun sw -> 554 let sink = Path.open_out ~sw ~create:(`Or_truncate 0o644) temp_path in 555 556 List.iter (fun chunk -> 557 let source = Path.open_in ~sw (Chunk.path chunk) in 558 Flow.copy source sink; 559 Flow.close source 560 ) sorted; 561 562 Flow.close sink; 563 564 (* If verify is requested, check the assembled file's size *) 565 let verification_ok = 566 if verify then 567 (* Verify that the final file has expected size *) 568 try 569 let stat = Path.stat ~follow:false temp_path in 570 let file_size = Optint.Int63.to_int64 stat.size in 571 file_size = total_size 572 with _ -> false 573 else 574 true 575 in 576 577 if not verification_ok then begin 578 (* Delete temp file and fail *) 579 (try Path.unlink temp_path with _ -> ()); 580 Eio.Promise.resolve resolver (Error (Failure "Size verification failed")) 581 end else begin 582 (* Atomic rename *) 583 Path.rename temp_path final_path; 584 585 (* Clean up chunks *) 586 List.iter (fun chunk -> 587 try Path.unlink (Chunk.path chunk) 588 with _ -> () 589 ) chunks; 590 591 (* Resolve the promise with success *) 592 Eio.Promise.resolve resolver (Ok ()) 593 end 594 with exn -> 595 (* Clean up temp file on error *) 596 (try Path.unlink temp_path with _ -> ()); 597 Eio.Promise.resolve resolver (Error exn) 598 end); 599 promise 600 601let cleanup_chunks t ~key = 602 let chunks = list_chunks t ~key in 603 List.iter (fun chunk -> 604 try 605 let path = Chunk.path chunk in 606 Path.unlink path; 607 (* Also remove the sidecar .key file if it exists *) 608 (* We need to derive the key file path from the chunk path *) 609 (* Since chunk.path is the full path to the chunk file, we can't decompose it easily *) 610 (* Instead, let's find and remove any matching .key files in the directory *) 611 let dirs = key_to_dirs (Chunk.key chunk) in 612 let dir_path = List.fold_left (fun acc d -> Path.(acc / d)) t.base_dir dirs in 613 (try 614 let entries = Path.read_dir dir_path in 615 List.iter (fun entry -> 616 if String.ends_with ~suffix:".key" entry && String.starts_with ~prefix:(Chunk.hash chunk |> fun h -> String.sub h 4 (String.length h - 4)) entry then 617 (try Path.unlink Path.(dir_path / entry) with _ -> ()) 618 ) entries 619 with _ -> ()) 620 with _ -> () 621 ) chunks