FastCGI implementation in OCaml
1open Fastcgi_record 2 3(** {1 Request Roles} *) 4 5type role = 6 | Responder 7 | Authorizer 8 | Filter 9 10let pp_role ppf = function 11 | Responder -> Format.pp_print_string ppf "Responder" 12 | Authorizer -> Format.pp_print_string ppf "Authorizer" 13 | Filter -> Format.pp_print_string ppf "Filter" 14 15let role_of_begin_request record = 16 if record.record_type <> Begin_request then 17 Error "Expected BEGIN_REQUEST record" 18 else if String.length record.content <> 8 then 19 Error "Invalid BEGIN_REQUEST content length" 20 else 21 let content = record.content in 22 let role_int = (Char.code content.[0] lsl 8) lor (Char.code content.[1]) in 23 match role_int with 24 | 1 -> Ok Responder 25 | 2 -> Ok Authorizer 26 | 3 -> Ok Filter 27 | n -> Error (Printf.sprintf "Unknown FastCGI role: %d" n) 28 29(** {1 Request Context} *) 30 31type t = { 32 request_id : request_id; 33 role : role; 34 keep_conn : bool; 35 params : KV.t; 36 stdin_data : string; 37 data_stream : string option; 38} 39 40let pp ppf request = 41 let data_str = match request.data_stream with 42 | None -> "None" 43 | Some d -> Printf.sprintf "Some(%d bytes)" (String.length d) 44 in 45 Format.fprintf ppf 46 "@[<2>{ request_id = %d;@ role = %a;@ keep_conn = %b;@ params = %a;@ stdin = %d bytes;@ data = %s }@]" 47 request.request_id 48 pp_role request.role 49 request.keep_conn 50 (KV.pp) request.params 51 (String.length request.stdin_data) 52 data_str 53 54let create record = 55 match role_of_begin_request record with 56 | Error _ as e -> e 57 | Ok role -> 58 if String.length record.content <> 8 then 59 Error "Invalid BEGIN_REQUEST content length" 60 else 61 let flags_int = Char.code record.content.[2] in 62 let keep_conn = (flags_int land 1) <> 0 in 63 Ok { 64 request_id = record.request_id; 65 role; 66 keep_conn; 67 params = KV.empty; 68 stdin_data = ""; 69 data_stream = if role = Filter then Some "" else None; 70 } 71 72 73(** {1 Stream Processing} *) 74 75(** Helper functions for result binding to simplify nested pattern matching *) 76let ( let* ) = Result.bind 77 78let is_stream_terminator record = 79 String.length record.content = 0 80 81 82let read_params_from_flow ~sw:_ buf_read = 83 Printf.eprintf "[DEBUG] read_params_from_flow: Starting\n%!"; 84 let params = ref KV.empty in 85 let rec loop () = 86 try 87 Printf.eprintf "[DEBUG] read_params_from_flow: Reading next PARAMS record\n%!"; 88 let record = Fastcgi_record.read buf_read in 89 Printf.eprintf "[DEBUG] read_params_from_flow: Got record type=%s, content_length=%d\n%!" 90 (Format.asprintf "%a" pp_record record.record_type) 91 (String.length record.content); 92 if record.record_type <> Params then 93 Error (Printf.sprintf "Expected PARAMS record, got %s" 94 (Format.asprintf "%a" pp_record record.record_type)) 95 else if is_stream_terminator record then ( 96 Printf.eprintf "[DEBUG] read_params_from_flow: Got stream terminator, returning %d params\n%!" 97 (Fastcgi_record.KV.cardinal !params); 98 Ok !params 99 ) else ( 100 let record_params = KV.decode record.content in 101 Printf.eprintf "[DEBUG] read_params_from_flow: Decoded %d params from record\n%!" 102 (Fastcgi_record.KV.cardinal record_params); 103 params := KV.to_seq record_params 104 |> Seq.fold_left (fun acc (k, v) -> KV.add k v acc) !params; 105 loop () 106 ) 107 with 108 | End_of_file -> 109 Printf.eprintf "[DEBUG] read_params_from_flow: Hit End_of_file\n%!"; 110 Error "Unexpected end of stream while reading PARAMS" 111 | exn -> 112 Printf.eprintf "[DEBUG] read_params_from_flow: Exception: %s\n%!" (Printexc.to_string exn); 113 Error (Printf.sprintf "Error reading PARAMS: %s" (Printexc.to_string exn)) 114 in 115 loop () 116 117let read_stdin_from_flow ~sw:_ buf_read = 118 Printf.eprintf "[DEBUG] read_stdin_from_flow: Starting\n%!"; 119 let data = Buffer.create 1024 in 120 let rec loop () = 121 try 122 Printf.eprintf "[DEBUG] read_stdin_from_flow: Reading next STDIN record\n%!"; 123 let record = Fastcgi_record.read buf_read in 124 Printf.eprintf "[DEBUG] read_stdin_from_flow: Got record type=%s, content_length=%d\n%!" 125 (Format.asprintf "%a" pp_record record.record_type) 126 (String.length record.content); 127 if record.record_type <> Stdin then 128 Error (Printf.sprintf "Expected STDIN record, got %s" 129 (Format.asprintf "%a" pp_record record.record_type)) 130 else if is_stream_terminator record then ( 131 Printf.eprintf "[DEBUG] read_stdin_from_flow: Got stream terminator, total stdin=%d bytes\n%!" 132 (Buffer.length data); 133 Ok (Buffer.contents data) 134 ) else ( 135 Buffer.add_string data record.content; 136 Printf.eprintf "[DEBUG] read_stdin_from_flow: Added %d bytes, total now %d\n%!" 137 (String.length record.content) (Buffer.length data); 138 loop () 139 ) 140 with 141 | End_of_file -> 142 Printf.eprintf "[DEBUG] read_stdin_from_flow: Hit End_of_file\n%!"; 143 Error "Unexpected end of stream while reading STDIN" 144 | exn -> 145 Printf.eprintf "[DEBUG] read_stdin_from_flow: Exception: %s\n%!" (Printexc.to_string exn); 146 Error (Printf.sprintf "Error reading STDIN: %s" (Printexc.to_string exn)) 147 in 148 loop () 149 150(** Read DATA stream for Filter role *) 151let read_data_from_flow buf_read = 152 let data_buf = Buffer.create 1024 in 153 let rec read_data () = 154 try 155 let record = Fastcgi_record.read buf_read in 156 if record.record_type <> Data then 157 Error "Expected DATA record" 158 else if is_stream_terminator record then 159 Ok (Buffer.contents data_buf) 160 else ( 161 Buffer.add_string data_buf record.content; 162 read_data () 163 ) 164 with 165 | End_of_file -> Error "Unexpected end of DATA stream" 166 | exn -> Error (Printf.sprintf "Error reading DATA: %s" (Printexc.to_string exn)) 167 in 168 read_data () 169 170(** Read request streams based on role *) 171let read_request_streams ~sw request buf_read = 172 Printf.eprintf "[DEBUG] read_request_streams: Processing role=%s\n%!" 173 (Format.asprintf "%a" pp_role request.role); 174 match request.role with 175 | Authorizer -> 176 Printf.eprintf "[DEBUG] read_request_streams: Authorizer role, no streams to read\n%!"; 177 Ok request 178 | Responder -> 179 Printf.eprintf "[DEBUG] read_request_streams: Responder role, reading STDIN\n%!"; 180 let* stdin_data = read_stdin_from_flow ~sw buf_read in 181 Printf.eprintf "[DEBUG] read_request_streams: Got STDIN data, %d bytes\n%!" (String.length stdin_data); 182 Ok { request with stdin_data } 183 | Filter -> 184 Printf.eprintf "[DEBUG] read_request_streams: Filter role, reading STDIN and DATA\n%!"; 185 let* stdin_data = read_stdin_from_flow ~sw buf_read in 186 Printf.eprintf "[DEBUG] read_request_streams: Got STDIN data, %d bytes\n%!" (String.length stdin_data); 187 let request = { request with stdin_data } in 188 let* data = read_data_from_flow buf_read in 189 Printf.eprintf "[DEBUG] read_request_streams: Got DATA stream, %d bytes\n%!" (String.length data); 190 Ok { request with data_stream = Some data } 191 192let read_request_from_flow ~sw flow = 193 Printf.eprintf "[DEBUG] read_request_from_flow: Starting\n%!"; 194 let buf_read = Eio.Buf_read.of_flow flow ~max_size:1000000 in 195 try 196 (* Read BEGIN_REQUEST *) 197 Printf.eprintf "[DEBUG] read_request_from_flow: Reading BEGIN_REQUEST record\n%!"; 198 let begin_record = Fastcgi_record.read buf_read in 199 Printf.eprintf "[DEBUG] read_request_from_flow: Got BEGIN_REQUEST record: %s\n%!" 200 (Format.asprintf "%a" (Fastcgi_record.pp ~max_content_len:50) begin_record); 201 let* request = create begin_record in 202 Printf.eprintf "[DEBUG] read_request_from_flow: Created request with role=%s, id=%d\n%!" 203 (Format.asprintf "%a" pp_role request.role) request.request_id; 204 (* Read PARAMS stream *) 205 Printf.eprintf "[DEBUG] read_request_from_flow: Reading PARAMS stream\n%!"; 206 let* params = read_params_from_flow ~sw buf_read in 207 Printf.eprintf "[DEBUG] read_request_from_flow: Got %d params\n%!" (Fastcgi_record.KV.cardinal params); 208 let request = { request with params } in 209 (* Read remaining streams based on role *) 210 Printf.eprintf "[DEBUG] read_request_from_flow: Reading streams for role=%s\n%!" 211 (Format.asprintf "%a" pp_role request.role); 212 let result = read_request_streams ~sw request buf_read in 213 Printf.eprintf "[DEBUG] read_request_from_flow: Finished reading request\n%!"; 214 result 215 with 216 | End_of_file -> 217 Printf.eprintf "[DEBUG] read_request_from_flow: Hit End_of_file\n%!"; 218 Error "Unexpected end of stream" 219 | exn -> 220 Printf.eprintf "[DEBUG] read_request_from_flow: Exception: %s\n%!" (Printexc.to_string exn); 221 Error (Printf.sprintf "Error reading request: %s" (Printexc.to_string exn)) 222 223(** {1 Response Generation} *) 224 225type app_status = int 226type protocol_status = 227 | Request_complete 228 | Cant_mpx_conn 229 | Overloaded 230 | Unknown_role 231 232let pp_protocol_status ppf = function 233 | Request_complete -> Format.pp_print_string ppf "Request_complete" 234 | Cant_mpx_conn -> Format.pp_print_string ppf "Cant_mpx_conn" 235 | Overloaded -> Format.pp_print_string ppf "Overloaded" 236 | Unknown_role -> Format.pp_print_string ppf "Unknown_role" 237 238let protocol_status_to_int = function 239 | Request_complete -> 0 240 | Cant_mpx_conn -> 1 241 | Overloaded -> 2 242 | Unknown_role -> 3 243 244let stream_records_to_string records = 245 let buf = Buffer.create 1024 in 246 List.iter (fun record -> 247 if not (is_stream_terminator record) then 248 Buffer.add_string buf record.content 249 ) records; 250 Buffer.contents buf 251 252let string_to_stream_records ~request_id ~record_type content = 253 let max_chunk = 65535 in (* FastCGI max record content length *) 254 let len = String.length content in 255 let records = ref [] in 256 257 let rec chunk_string pos = 258 if pos >= len then 259 () (* Empty terminator will be added separately *) 260 else 261 let chunk_len = min max_chunk (len - pos) in 262 let chunk = String.sub content pos chunk_len in 263 let record = Fastcgi_record.create ~version:1 ~record:record_type ~request_id ~content:chunk in 264 records := record :: !records; 265 chunk_string (pos + chunk_len) 266 in 267 268 chunk_string 0; 269 270 (* Add stream terminator *) 271 let terminator = Fastcgi_record.create ~version:1 ~record:record_type ~request_id ~content:"" in 272 records := terminator :: !records; 273 274 List.rev !records 275 276let flow_to_stream_records ~sw:_ ~request_id ~record_type flow = 277 (* Read entire flow content *) 278 let buf = Buffer.create 4096 in 279 Eio.Flow.copy flow (Eio.Flow.buffer_sink buf); 280 let content = Buffer.contents buf in 281 string_to_stream_records ~request_id ~record_type content 282 283let write_stream_records records sink = 284 (* Create a function to serialize a single record to a string *) 285 let serialize_record record = 286 let buf = Buffer.create 512 in 287 let buf_sink = Eio.Flow.buffer_sink buf in 288 Eio.Buf_write.with_flow buf_sink (fun buf_write -> 289 Fastcgi_record.write buf_write record 290 ); 291 Buffer.contents buf 292 in 293 294 (* Serialize all records and write to sink *) 295 List.iter (fun record -> 296 let serialized = serialize_record record in 297 Eio.Flow.copy_string serialized sink 298 ) records 299 300let make_end_request ~request_id ~app_status ~protocol_status = 301 let content = 302 let buf = Bytes.create 8 in 303 Bytes.set_int32_be buf 0 (Int32.of_int app_status); 304 Bytes.set_uint8 buf 4 (protocol_status_to_int protocol_status); 305 Bytes.set_uint8 buf 5 0; (* reserved *) 306 Bytes.set_uint8 buf 6 0; (* reserved *) 307 Bytes.set_uint8 buf 7 0; (* reserved *) 308 Bytes.to_string buf 309 in 310 Fastcgi_record.create ~version:1 ~record:End_request ~request_id ~content 311 312let write_response ~sw request ~stdout ~stderr sink app_status = 313 (* Convert stdout flow to STDOUT records *) 314 let stdout_records = flow_to_stream_records ~sw ~request_id:request.request_id ~record_type:Stdout stdout in 315 316 (* Convert stderr flow to STDERR records *) 317 let stderr_records = flow_to_stream_records ~sw ~request_id:request.request_id ~record_type:Stderr stderr in 318 319 (* Create END_REQUEST record *) 320 let end_record = make_end_request ~request_id:request.request_id ~app_status ~protocol_status:Request_complete in 321 322 (* Write all records *) 323 let all_records = stdout_records @ stderr_records @ [end_record] in 324 write_stream_records all_records sink 325 326let write_error_response request sink proto_status = 327 let end_record = make_end_request ~request_id:request.request_id ~app_status:1 ~protocol_status:proto_status in 328 write_stream_records [end_record] sink 329 330let write_abort_response request sink = 331 let end_record = make_end_request ~request_id:request.request_id ~app_status:0 ~protocol_status:Request_complete in 332 write_stream_records [end_record] sink 333 334(** {1 High-level Request Processing} *) 335 336type handler = t -> 337 stdout:Eio.Flow.sink_ty Eio.Resource.t -> 338 stderr:Eio.Flow.sink_ty Eio.Resource.t -> 339 app_status 340 341let process_request ~sw request handler sink = 342 (* Create in-memory flows for stdout and stderr *) 343 let stdout_buf = Buffer.create 4096 in 344 let stderr_buf = Buffer.create 1024 in 345 let stdout_sink = Eio.Flow.buffer_sink stdout_buf in 346 let stderr_sink = Eio.Flow.buffer_sink stderr_buf in 347 348 (* Call handler *) 349 let app_status = handler request ~stdout:stdout_sink ~stderr:stderr_sink in 350 351 (* Convert buffers to sources and write response *) 352 let stdout_source = Eio.Flow.string_source (Buffer.contents stdout_buf) in 353 let stderr_source = Eio.Flow.string_source (Buffer.contents stderr_buf) in 354 355 write_response ~sw request ~stdout:stdout_source ~stderr:stderr_source sink app_status 356 357let process_request_with_flows ~sw request ~stdout ~stderr sink app_status = 358 write_response ~sw request ~stdout ~stderr sink app_status 359 360(** {1 Connection Management} *) 361 362let handle_connection ~sw flow handler = 363 let _buf_read = Eio.Buf_read.of_flow flow ~max_size:1000000 in 364 let _buf_write = Eio.Buf_write.create 4096 in 365 366 let rec loop () = 367 try 368 (* Read next request *) 369 match read_request_from_flow ~sw flow with 370 | Error msg -> 371 (* Log error and continue or close connection *) 372 Printf.eprintf "Error reading request: %s\n%!" msg 373 | Ok request -> 374 (* Process request *) 375 let response_buf = Buffer.create 4096 in 376 let response_sink = Eio.Flow.buffer_sink response_buf in 377 378 process_request ~sw request handler response_sink; 379 380 (* Write response to connection *) 381 let response_data = Buffer.contents response_buf in 382 Eio.Flow.copy (Eio.Flow.string_source response_data) flow; 383 384 (* Continue if keep_conn is true *) 385 if request.keep_conn then 386 loop () 387 with 388 | End_of_file -> () (* Connection closed *) 389 | exn -> 390 Printf.eprintf "Connection error: %s\n%!" (Printexc.to_string exn) 391 in 392 loop () 393 394let serve ~sw:_ ~backlog:_ ~port:_ _handler = 395 (* This would typically use Eio.Net to create a listening socket *) 396 (* For now, we'll provide a placeholder implementation *) 397 failwith "serve: Implementation requires Eio.Net integration"