My agentic slop goes here. Not intended for anyone else!
at main 4.2 kB view raw
1(** Bidirectional JSON-RPC session management with request/response correlation. 2 3 This module provides a high-level session abstraction over a transport layer, 4 handling request ID generation, response correlation via promises, and 5 bidirectional message routing using Eio structured concurrency. 6 7 {1 Architecture} 8 9 Sessions run a background receive loop in an Eio fiber that continuously 10 reads from the transport and routes messages: 11 - Incoming requests → dispatched to request_handler in new fiber 12 - Incoming responses → resolve pending promises 13 - Incoming notifications → dispatched to notification_handler 14 15 Outgoing messages (requests and notifications) are sent directly on the 16 transport. Requests return promises that are resolved when the corresponding 17 response arrives. 18 19 {1 Example Usage} 20 21 {[ 22 Eio_main.run @@ fun env -> 23 let config = { 24 transport; 25 request_handler = (fun ~method_ ~params -> 26 (* Handle incoming requests *) 27 match method_ with 28 | "ping" -> `String "pong" 29 | _ -> failwith "Unknown method" 30 ); 31 notification_handler = (fun ~method_ ~params -> 32 (* Handle incoming notifications *) 33 Printf.printf "Notification: %s\n" method_ 34 ); 35 timeout = Some 30.0; (* 30 second timeout *) 36 clock = Some (C (Eio.Stdenv.clock env)); 37 } 38 in 39 40 Eio.Switch.run @@ fun sw -> 41 let session = Session.create ~sw config in 42 43 (* Send a request and wait for response *) 44 let response = Session.send_request session 45 ~method_:"initialize" 46 ~params:(`Object [("version", `String "1.0")]) 47 () 48 in 49 50 (* Send a notification (no response expected) *) 51 Session.send_notification session 52 ~method_:"progress" 53 ~params:(`Object [("percent", `Number 50.0)]) 54 () 55 ]} *) 56 57(** {1 Handlers} *) 58 59type request_handler = 60 method_:string -> 61 params:Jsont.json option -> 62 Jsont.json 63(** Handler for incoming requests. Should return the result value. 64 May raise exceptions which will be converted to JSON-RPC errors. *) 65 66type notification_handler = 67 method_:string -> 68 params:Jsont.json option -> 69 unit 70(** Handler for incoming notifications. No response is expected. *) 71 72(** {1 Configuration} *) 73 74type clock = C : _ Eio.Time.clock -> clock 75(** Wrapper for existential clock type *) 76 77type config = { 78 transport : Transport.t; 79 (** Transport layer for sending/receiving JSON messages *) 80 request_handler : request_handler; 81 (** Handler for incoming requests *) 82 notification_handler : notification_handler; 83 (** Handler for incoming notifications *) 84 timeout : float option; 85 (** Request timeout in seconds. [None] means no timeout. *) 86 clock : clock option; 87 (** Clock for timeout handling. Required if [timeout] is set. *) 88} 89(** Session configuration *) 90 91(** {1 Session Management} *) 92 93type t 94(** Session handle *) 95 96exception Timeout of string 97(** Raised when a request times out *) 98 99exception Session_closed 100(** Raised when attempting to use a closed session *) 101 102exception Unknown_response of Jsonrpc.Id.t 103(** Raised when receiving a response for an unknown request ID *) 104 105exception Remote_error of Jsonrpc.Error_data.t 106(** Raised when the remote side returns an error response *) 107 108val create : 109 sw:Eio.Switch.t -> 110 config -> 111 t 112(** Create and start a session with a background receive loop. 113 The receive loop runs in a background fiber attached to [sw]. *) 114 115val send_request : 116 t -> 117 method_:string -> 118 ?params:Jsont.json -> 119 unit -> 120 Jsont.json 121(** Send a request and await the response. 122 Raises [Timeout] if the request times out. 123 Raises [Remote_error] if the server returns an error. 124 Raises [Session_closed] if the session is closed. *) 125 126val send_notification : 127 t -> 128 method_:string -> 129 ?params:Jsont.json -> 130 unit -> 131 unit 132(** Send a notification (no response expected). 133 Raises [Session_closed] if the session is closed. *) 134 135val close : t -> unit 136(** Close the session and underlying transport. 137 This will cancel all pending requests. *) 138 139val is_closed : t -> bool 140(** Check if the session is closed *)