My agentic slop goes here. Not intended for anyone else!
1(** Bidirectional JSON-RPC session management with request/response correlation *)
2
3(** {1 Handlers} *)
4
5type request_handler =
6 method_:string ->
7 params:Jsont.json option ->
8 Jsont.json
9
10type notification_handler =
11 method_:string ->
12 params:Jsont.json option ->
13 unit
14
15(** {1 Configuration} *)
16
17type clock = C : _ Eio.Time.clock -> clock
18
19type config = {
20 transport : Transport.t;
21 request_handler : request_handler;
22 notification_handler : notification_handler;
23 timeout : float option;
24 clock : clock option;
25 (** Clock for timeout handling. Required if timeout is set. *)
26}
27
28(** {1 Exceptions} *)
29
30exception Timeout of string
31exception Session_closed
32exception Unknown_response of Jsonrpc.Id.t
33exception Remote_error of Jsonrpc.Error_data.t
34
35(** {1 Internal Types} *)
36
37type response_result =
38 | Success of Jsont.json
39 | Error of exn
40
41type pending_request = {
42 id : Jsonrpc.Id.t;
43 resolver : response_result Eio.Promise.u;
44 mutable cancelled : bool; (* Flag to indicate request was completed *)
45}
46
47type t = {
48 transport : Transport.t;
49 mutable next_id : int;
50 pending : (Jsonrpc.Id.t, pending_request) Hashtbl.t;
51 request_handler : request_handler;
52 notification_handler : notification_handler;
53 timeout : float option;
54 clock : clock option;
55 sw : Eio.Switch.t;
56 mutable closed : bool;
57}
58
59(** {1 Helper Functions} *)
60
61let encode_message msg =
62 match Jsont.Json.encode Jsonrpc.Message.jsont msg with
63 | Ok json -> json
64 | Error e -> failwith ("Failed to encode message: " ^ e)
65
66let send_json t json =
67 if t.closed then raise Session_closed;
68 Transport.send t.transport json
69
70(** Handle an incoming request by calling the user's handler and sending response *)
71let handle_request t req =
72 let open Jsonrpc in
73 let id = match req.Request.id with
74 | Some id -> id
75 | None ->
76 (* This is a notification, not a request - no response needed *)
77 t.notification_handler
78 ~method_:req.Request.method_
79 ~params:req.Request.params;
80 raise Exit (* Exit this handler without sending response *)
81 in
82
83 try
84 (* Call user's request handler *)
85 let result = t.request_handler
86 ~method_:req.Request.method_
87 ~params:req.Request.params
88 in
89 (* Send success response *)
90 let response = Response.make_result ~id ~result in
91 let msg = Message.Response response in
92 let json = encode_message msg in
93 send_json t json
94 with
95 | Exit -> () (* Notification, no response *)
96 | exn ->
97 (* Convert exception to error response *)
98 let error = Error_data.make
99 ~code:Internal_error
100 ~message:(Printexc.to_string exn)
101 ()
102 in
103 let response = Response.make_error ~id ~error in
104 let msg = Message.Response response in
105 let json = encode_message msg in
106 send_json t json
107
108(** Resolve a pending request with a response *)
109let resolve_response t resp =
110 let open Jsonrpc in
111 let id = resp.Response.id in
112 match Hashtbl.find_opt t.pending id with
113 | None ->
114 (* Received response for unknown request ID *)
115 raise (Unknown_response id)
116 | Some pending ->
117 Hashtbl.remove t.pending id;
118 (* Mark as cancelled so timeout doesn't fire *)
119 pending.cancelled <- true;
120 (* Resolve the promise with result or error *)
121 (match resp.Response.value with
122 | Ok result ->
123 Eio.Promise.resolve pending.resolver (Success result)
124 | Stdlib.Result.Error error ->
125 Eio.Promise.resolve pending.resolver (Error (Remote_error error)))
126
127(** Background receive loop - reads messages and routes them *)
128let rec receive_loop t =
129 if t.closed then () else
130 match Transport.receive t.transport with
131 | None ->
132 (* Transport closed *)
133 t.closed <- true;
134 (* Cancel all pending requests *)
135 Hashtbl.iter (fun _ pending ->
136 Eio.Promise.resolve pending.resolver (Error Session_closed)
137 ) t.pending;
138 Hashtbl.clear t.pending
139 | Some json ->
140 (try
141 let msg = Jsonrpc.Message.classify json in
142 match msg with
143 | Request req ->
144 (* Handle request in new fibre so it doesn't block receive loop *)
145 Eio.Fiber.fork_promise ~sw:t.sw (fun () ->
146 handle_request t req
147 ) |> ignore
148 | Response resp ->
149 (* Resolve pending promise *)
150 resolve_response t resp
151 with exn ->
152 (* Log error but continue receive loop *)
153 Printf.eprintf "Error in receive loop: %s\n%!" (Printexc.to_string exn)
154 );
155 receive_loop t
156
157(** {1 Public API} *)
158
159let create ~sw (config : config) : t =
160 (* Validate that clock is provided if timeout is set *)
161 (match config.timeout with
162 | Some _ when config.clock = None ->
163 invalid_arg "Session.create: clock must be provided when timeout is set"
164 | _ -> ());
165
166 let t = {
167 transport = config.transport;
168 next_id = 1;
169 pending = Hashtbl.create 16;
170 request_handler = config.request_handler;
171 notification_handler = config.notification_handler;
172 timeout = config.timeout;
173 clock = config.clock;
174 sw;
175 closed = false;
176 } in
177
178 (* Start background receive loop *)
179 Eio.Fiber.fork ~sw (fun () -> receive_loop t);
180
181 t
182
183let send_request t ~method_ ?params () =
184 if t.closed then raise Session_closed;
185
186 (* Generate unique request ID *)
187 let id = `Number (float_of_int t.next_id) in
188 t.next_id <- t.next_id + 1;
189
190 (* Create promise for response *)
191 let promise, resolver = Eio.Promise.create () in
192
193 (* Register pending request *)
194 let pending = {
195 id;
196 resolver;
197 cancelled = false;
198 } in
199 Hashtbl.add t.pending id pending;
200
201 (* Setup timeout if configured *)
202 (match t.timeout, t.clock with
203 | None, _ | _, None -> ()
204 | Some timeout_sec, Some (C clock) ->
205 (* Start timeout fiber *)
206 Eio.Fiber.fork ~sw:t.sw (fun () ->
207 Eio.Time.sleep clock timeout_sec;
208 (* Timeout expired - check if request is still pending and not cancelled *)
209 if not pending.cancelled then begin
210 match Hashtbl.find_opt t.pending id with
211 | Some _ ->
212 Hashtbl.remove t.pending id;
213 let msg = Printf.sprintf "Request timeout after %.1fs: %s" timeout_sec method_ in
214 Eio.Promise.resolve pending.resolver (Error (Timeout msg))
215 | None ->
216 (* Request already completed, nothing to do *)
217 ()
218 end
219 )
220 );
221
222 (* Send request *)
223 let req = Jsonrpc.Request.make ~method_ ?params ~id () in
224 let msg = Jsonrpc.Message.Request req in
225 let json = encode_message msg in
226 send_json t json;
227
228 (* Wait for response *)
229 match Eio.Promise.await promise with
230 | Success result -> result
231 | Error exn -> raise exn
232
233let send_notification t ~method_ ?params () =
234 if t.closed then raise Session_closed;
235
236 (* Create notification (request with no ID) *)
237 let req = Jsonrpc.Request.make ~method_ ?params () in
238 let msg = Jsonrpc.Message.Request req in
239 let json = encode_message msg in
240 send_json t json
241
242let close t =
243 if not t.closed then begin
244 t.closed <- true;
245 (* Cancel all pending requests *)
246 Hashtbl.iter (fun _ pending ->
247 Eio.Promise.resolve pending.resolver (Error Session_closed)
248 ) t.pending;
249 Hashtbl.clear t.pending;
250 (* Close transport *)
251 Transport.close t.transport
252 end
253
254let is_closed t = t.closed