My agentic slop goes here. Not intended for anyone else!
at main 7.4 kB view raw
1(** Bidirectional JSON-RPC session management with request/response correlation *) 2 3(** {1 Handlers} *) 4 5type request_handler = 6 method_:string -> 7 params:Jsont.json option -> 8 Jsont.json 9 10type notification_handler = 11 method_:string -> 12 params:Jsont.json option -> 13 unit 14 15(** {1 Configuration} *) 16 17type clock = C : _ Eio.Time.clock -> clock 18 19type config = { 20 transport : Transport.t; 21 request_handler : request_handler; 22 notification_handler : notification_handler; 23 timeout : float option; 24 clock : clock option; 25 (** Clock for timeout handling. Required if timeout is set. *) 26} 27 28(** {1 Exceptions} *) 29 30exception Timeout of string 31exception Session_closed 32exception Unknown_response of Jsonrpc.Id.t 33exception Remote_error of Jsonrpc.Error_data.t 34 35(** {1 Internal Types} *) 36 37type response_result = 38 | Success of Jsont.json 39 | Error of exn 40 41type pending_request = { 42 id : Jsonrpc.Id.t; 43 resolver : response_result Eio.Promise.u; 44 mutable cancelled : bool; (* Flag to indicate request was completed *) 45} 46 47type t = { 48 transport : Transport.t; 49 mutable next_id : int; 50 pending : (Jsonrpc.Id.t, pending_request) Hashtbl.t; 51 request_handler : request_handler; 52 notification_handler : notification_handler; 53 timeout : float option; 54 clock : clock option; 55 sw : Eio.Switch.t; 56 mutable closed : bool; 57} 58 59(** {1 Helper Functions} *) 60 61let encode_message msg = 62 match Jsont.Json.encode Jsonrpc.Message.jsont msg with 63 | Ok json -> json 64 | Error e -> failwith ("Failed to encode message: " ^ e) 65 66let send_json t json = 67 if t.closed then raise Session_closed; 68 Transport.send t.transport json 69 70(** Handle an incoming request by calling the user's handler and sending response *) 71let handle_request t req = 72 let open Jsonrpc in 73 let id = match req.Request.id with 74 | Some id -> id 75 | None -> 76 (* This is a notification, not a request - no response needed *) 77 t.notification_handler 78 ~method_:req.Request.method_ 79 ~params:req.Request.params; 80 raise Exit (* Exit this handler without sending response *) 81 in 82 83 try 84 (* Call user's request handler *) 85 let result = t.request_handler 86 ~method_:req.Request.method_ 87 ~params:req.Request.params 88 in 89 (* Send success response *) 90 let response = Response.make_result ~id ~result in 91 let msg = Message.Response response in 92 let json = encode_message msg in 93 send_json t json 94 with 95 | Exit -> () (* Notification, no response *) 96 | exn -> 97 (* Convert exception to error response *) 98 let error = Error_data.make 99 ~code:Internal_error 100 ~message:(Printexc.to_string exn) 101 () 102 in 103 let response = Response.make_error ~id ~error in 104 let msg = Message.Response response in 105 let json = encode_message msg in 106 send_json t json 107 108(** Resolve a pending request with a response *) 109let resolve_response t resp = 110 let open Jsonrpc in 111 let id = resp.Response.id in 112 match Hashtbl.find_opt t.pending id with 113 | None -> 114 (* Received response for unknown request ID *) 115 raise (Unknown_response id) 116 | Some pending -> 117 Hashtbl.remove t.pending id; 118 (* Mark as cancelled so timeout doesn't fire *) 119 pending.cancelled <- true; 120 (* Resolve the promise with result or error *) 121 (match resp.Response.value with 122 | Ok result -> 123 Eio.Promise.resolve pending.resolver (Success result) 124 | Stdlib.Result.Error error -> 125 Eio.Promise.resolve pending.resolver (Error (Remote_error error))) 126 127(** Background receive loop - reads messages and routes them *) 128let rec receive_loop t = 129 if t.closed then () else 130 match Transport.receive t.transport with 131 | None -> 132 (* Transport closed *) 133 t.closed <- true; 134 (* Cancel all pending requests *) 135 Hashtbl.iter (fun _ pending -> 136 Eio.Promise.resolve pending.resolver (Error Session_closed) 137 ) t.pending; 138 Hashtbl.clear t.pending 139 | Some json -> 140 (try 141 let msg = Jsonrpc.Message.classify json in 142 match msg with 143 | Request req -> 144 (* Handle request in new fibre so it doesn't block receive loop *) 145 Eio.Fiber.fork_promise ~sw:t.sw (fun () -> 146 handle_request t req 147 ) |> ignore 148 | Response resp -> 149 (* Resolve pending promise *) 150 resolve_response t resp 151 with exn -> 152 (* Log error but continue receive loop *) 153 Printf.eprintf "Error in receive loop: %s\n%!" (Printexc.to_string exn) 154 ); 155 receive_loop t 156 157(** {1 Public API} *) 158 159let create ~sw (config : config) : t = 160 (* Validate that clock is provided if timeout is set *) 161 (match config.timeout with 162 | Some _ when config.clock = None -> 163 invalid_arg "Session.create: clock must be provided when timeout is set" 164 | _ -> ()); 165 166 let t = { 167 transport = config.transport; 168 next_id = 1; 169 pending = Hashtbl.create 16; 170 request_handler = config.request_handler; 171 notification_handler = config.notification_handler; 172 timeout = config.timeout; 173 clock = config.clock; 174 sw; 175 closed = false; 176 } in 177 178 (* Start background receive loop *) 179 Eio.Fiber.fork ~sw (fun () -> receive_loop t); 180 181 t 182 183let send_request t ~method_ ?params () = 184 if t.closed then raise Session_closed; 185 186 (* Generate unique request ID *) 187 let id = `Number (float_of_int t.next_id) in 188 t.next_id <- t.next_id + 1; 189 190 (* Create promise for response *) 191 let promise, resolver = Eio.Promise.create () in 192 193 (* Register pending request *) 194 let pending = { 195 id; 196 resolver; 197 cancelled = false; 198 } in 199 Hashtbl.add t.pending id pending; 200 201 (* Setup timeout if configured *) 202 (match t.timeout, t.clock with 203 | None, _ | _, None -> () 204 | Some timeout_sec, Some (C clock) -> 205 (* Start timeout fiber *) 206 Eio.Fiber.fork ~sw:t.sw (fun () -> 207 Eio.Time.sleep clock timeout_sec; 208 (* Timeout expired - check if request is still pending and not cancelled *) 209 if not pending.cancelled then begin 210 match Hashtbl.find_opt t.pending id with 211 | Some _ -> 212 Hashtbl.remove t.pending id; 213 let msg = Printf.sprintf "Request timeout after %.1fs: %s" timeout_sec method_ in 214 Eio.Promise.resolve pending.resolver (Error (Timeout msg)) 215 | None -> 216 (* Request already completed, nothing to do *) 217 () 218 end 219 ) 220 ); 221 222 (* Send request *) 223 let req = Jsonrpc.Request.make ~method_ ?params ~id () in 224 let msg = Jsonrpc.Message.Request req in 225 let json = encode_message msg in 226 send_json t json; 227 228 (* Wait for response *) 229 match Eio.Promise.await promise with 230 | Success result -> result 231 | Error exn -> raise exn 232 233let send_notification t ~method_ ?params () = 234 if t.closed then raise Session_closed; 235 236 (* Create notification (request with no ID) *) 237 let req = Jsonrpc.Request.make ~method_ ?params () in 238 let msg = Jsonrpc.Message.Request req in 239 let json = encode_message msg in 240 send_json t json 241 242let close t = 243 if not t.closed then begin 244 t.closed <- true; 245 (* Cancel all pending requests *) 246 Hashtbl.iter (fun _ pending -> 247 Eio.Promise.resolve pending.resolver (Error Session_closed) 248 ) t.pending; 249 Hashtbl.clear t.pending; 250 (* Close transport *) 251 Transport.close t.transport 252 end 253 254let is_closed t = t.closed