···
+
(** {1 Request Roles} *)
+
let pp_role ppf = function
+
| Responder -> Format.pp_print_string ppf "Responder"
+
| Authorizer -> Format.pp_print_string ppf "Authorizer"
+
| Filter -> Format.pp_print_string ppf "Filter"
+
let role_of_begin_request record =
+
if record.record_type <> Begin_request then
+
Error "Expected BEGIN_REQUEST record"
+
else if String.length record.content <> 8 then
+
Error "Invalid BEGIN_REQUEST content length"
+
let content = record.content in
+
let role_int = (Char.code content.[0] lsl 8) lor (Char.code content.[1]) in
+
| n -> Error (Printf.sprintf "Unknown FastCGI role: %d" n)
+
(** {1 Request Context} *)
+
request_id : request_id;
+
data_stream : string option;
+
let data_str = match request.data_stream with
+
| Some d -> Printf.sprintf "Some(%d bytes)" (String.length d)
+
"@[<2>{ request_id = %d;@ role = %a;@ keep_conn = %b;@ params = %a;@ stdin = %d bytes;@ data = %s }@]"
+
(String.length request.stdin_data)
+
match role_of_begin_request record with
+
if String.length record.content <> 8 then
+
Error "Invalid BEGIN_REQUEST content length"
+
let flags_int = Char.code record.content.[2] in
+
let keep_conn = (flags_int land 1) <> 0 in
+
request_id = record.request_id;
+
data_stream = if role = Filter then Some "" else None;
+
(** {1 Stream Processing} *)
+
(** Helper functions for result binding to simplify nested pattern matching *)
+
let ( let* ) = Result.bind
+
let is_stream_terminator record =
+
String.length record.content = 0
+
let read_params_from_flow ~sw:_ flow =
+
let buf_read = Eio.Buf_read.of_flow flow ~max_size:1000000 in
+
let params = ref KV.empty in
+
let record = Fastcgi_record.read buf_read in
+
if record.record_type <> Params then
+
Error (Printf.sprintf "Expected PARAMS record, got %s"
+
(Format.asprintf "%a" pp_record record.record_type))
+
else if is_stream_terminator record then
+
let record_params = KV.decode record.content in
+
params := KV.to_seq record_params
+
|> Seq.fold_left (fun acc (k, v) -> KV.add k v acc) !params;
+
| End_of_file -> Error "Unexpected end of stream while reading PARAMS"
+
| exn -> Error (Printf.sprintf "Error reading PARAMS: %s" (Printexc.to_string exn))
+
let read_stdin_from_flow ~sw:_ flow =
+
let buf_read = Eio.Buf_read.of_flow flow ~max_size:1000000 in
+
let data = Buffer.create 1024 in
+
let record = Fastcgi_record.read buf_read in
+
if record.record_type <> Stdin then
+
Error (Printf.sprintf "Expected STDIN record, got %s"
+
(Format.asprintf "%a" pp_record record.record_type))
+
else if is_stream_terminator record then
+
Ok (Buffer.contents data)
+
Buffer.add_string data record.content;
+
| End_of_file -> Error "Unexpected end of stream while reading STDIN"
+
| exn -> Error (Printf.sprintf "Error reading STDIN: %s" (Printexc.to_string exn))
+
(** Read DATA stream for Filter role *)
+
let read_data_from_flow buf_read =
+
let data_buf = Buffer.create 1024 in
+
let record = Fastcgi_record.read buf_read in
+
if record.record_type <> Data then
+
Error "Expected DATA record"
+
else if is_stream_terminator record then
+
Ok (Buffer.contents data_buf)
+
Buffer.add_string data_buf record.content;
+
| End_of_file -> Error "Unexpected end of DATA stream"
+
| exn -> Error (Printf.sprintf "Error reading DATA: %s" (Printexc.to_string exn))
+
(** Read request streams based on role *)
+
let read_request_streams ~sw request flow buf_read =
+
match request.role with
+
let* stdin_data = read_stdin_from_flow ~sw flow in
+
Ok { request with stdin_data }
+
let* stdin_data = read_stdin_from_flow ~sw flow in
+
let request = { request with stdin_data } in
+
let* data = read_data_from_flow buf_read in
+
Ok { request with data_stream = Some data }
+
let read_request_from_flow ~sw flow =
+
let buf_read = Eio.Buf_read.of_flow flow ~max_size:1000000 in
+
(* Read BEGIN_REQUEST *)
+
let begin_record = Fastcgi_record.read buf_read in
+
let* request = create begin_record in
+
(* Read PARAMS stream *)
+
let* params = read_params_from_flow ~sw flow in
+
let request = { request with params } in
+
(* Read remaining streams based on role *)
+
read_request_streams ~sw request flow buf_read
+
| End_of_file -> Error "Unexpected end of stream"
+
| exn -> Error (Printf.sprintf "Error reading request: %s" (Printexc.to_string exn))
+
(** {1 Response Generation} *)
+
let pp_protocol_status ppf = function
+
| Request_complete -> Format.pp_print_string ppf "Request_complete"
+
| Cant_mpx_conn -> Format.pp_print_string ppf "Cant_mpx_conn"
+
| Overloaded -> Format.pp_print_string ppf "Overloaded"
+
| Unknown_role -> Format.pp_print_string ppf "Unknown_role"
+
let protocol_status_to_int = function
+
| Request_complete -> 0
+
let stream_records_to_string records =
+
let buf = Buffer.create 1024 in
+
List.iter (fun record ->
+
if not (is_stream_terminator record) then
+
Buffer.add_string buf record.content
+
let string_to_stream_records ~request_id ~record_type content =
+
let max_chunk = 65535 in (* FastCGI max record content length *)
+
let len = String.length content in
+
let records = ref [] in
+
let rec chunk_string pos =
+
() (* Empty terminator will be added separately *)
+
let chunk_len = min max_chunk (len - pos) in
+
let chunk = String.sub content pos chunk_len in
+
let record = Fastcgi_record.create ~version:1 ~record:record_type ~request_id ~content:chunk in
+
records := record :: !records;
+
chunk_string (pos + chunk_len)
+
(* Add stream terminator *)
+
let terminator = Fastcgi_record.create ~version:1 ~record:record_type ~request_id ~content:"" in
+
records := terminator :: !records;
+
let flow_to_stream_records ~sw:_ ~request_id ~record_type flow =
+
(* Read entire flow content *)
+
let buf = Buffer.create 4096 in
+
Eio.Flow.copy flow (Eio.Flow.buffer_sink buf);
+
let content = Buffer.contents buf in
+
string_to_stream_records ~request_id ~record_type content
+
let write_stream_records records sink =
+
(* Create a function to serialize a single record to a string *)
+
let serialize_record record =
+
let buf = Buffer.create 512 in
+
let buf_sink = Eio.Flow.buffer_sink buf in
+
Eio.Buf_write.with_flow buf_sink (fun buf_write ->
+
Fastcgi_record.write buf_write record
+
(* Serialize all records and write to sink *)
+
List.iter (fun record ->
+
let serialized = serialize_record record in
+
Eio.Flow.copy_string serialized sink
+
let make_end_request ~request_id ~app_status ~protocol_status =
+
let buf = Bytes.create 8 in
+
Bytes.set_int32_be buf 0 (Int32.of_int app_status);
+
Bytes.set_uint8 buf 4 (protocol_status_to_int protocol_status);
+
Bytes.set_uint8 buf 5 0; (* reserved *)
+
Bytes.set_uint8 buf 6 0; (* reserved *)
+
Bytes.set_uint8 buf 7 0; (* reserved *)
+
Fastcgi_record.create ~version:1 ~record:End_request ~request_id ~content
+
let write_response ~sw request ~stdout ~stderr sink app_status =
+
(* Convert stdout flow to STDOUT records *)
+
let stdout_records = flow_to_stream_records ~sw ~request_id:request.request_id ~record_type:Stdout stdout in
+
(* Convert stderr flow to STDERR records *)
+
let stderr_records = flow_to_stream_records ~sw ~request_id:request.request_id ~record_type:Stderr stderr in
+
(* Create END_REQUEST record *)
+
let end_record = make_end_request ~request_id:request.request_id ~app_status ~protocol_status:Request_complete in
+
(* Write all records *)
+
let all_records = stdout_records @ stderr_records @ [end_record] in
+
write_stream_records all_records sink
+
let write_error_response request sink proto_status =
+
let end_record = make_end_request ~request_id:request.request_id ~app_status:1 ~protocol_status:proto_status in
+
write_stream_records [end_record] sink
+
let write_abort_response request sink =
+
let end_record = make_end_request ~request_id:request.request_id ~app_status:0 ~protocol_status:Request_complete in
+
write_stream_records [end_record] sink
+
(** {1 High-level Request Processing} *)
+
stdout:Eio.Flow.sink_ty Eio.Resource.t ->
+
stderr:Eio.Flow.sink_ty Eio.Resource.t ->
+
let process_request ~sw request handler sink =
+
(* Create in-memory flows for stdout and stderr *)
+
let stdout_buf = Buffer.create 4096 in
+
let stderr_buf = Buffer.create 1024 in
+
let stdout_sink = Eio.Flow.buffer_sink stdout_buf in
+
let stderr_sink = Eio.Flow.buffer_sink stderr_buf in
+
let app_status = handler request ~stdout:stdout_sink ~stderr:stderr_sink in
+
(* Convert buffers to sources and write response *)
+
let stdout_source = Eio.Flow.string_source (Buffer.contents stdout_buf) in
+
let stderr_source = Eio.Flow.string_source (Buffer.contents stderr_buf) in
+
write_response ~sw request ~stdout:stdout_source ~stderr:stderr_source sink app_status
+
let process_request_with_flows ~sw request ~stdout ~stderr sink app_status =
+
write_response ~sw request ~stdout ~stderr sink app_status
+
(** {1 Connection Management} *)
+
let handle_connection ~sw flow handler =
+
let _buf_read = Eio.Buf_read.of_flow flow ~max_size:1000000 in
+
let _buf_write = Eio.Buf_write.create 4096 in
+
(* Read next request *)
+
match read_request_from_flow ~sw flow with
+
(* Log error and continue or close connection *)
+
Printf.eprintf "Error reading request: %s\n%!" msg
+
let response_buf = Buffer.create 4096 in
+
let response_sink = Eio.Flow.buffer_sink response_buf in
+
process_request ~sw request handler response_sink;
+
(* Write response to connection *)
+
let response_data = Buffer.contents response_buf in
+
Eio.Flow.copy (Eio.Flow.string_source response_data) flow;
+
(* Continue if keep_conn is true *)
+
if request.keep_conn then
+
| End_of_file -> () (* Connection closed *)
+
Printf.eprintf "Connection error: %s\n%!" (Printexc.to_string exn)
+
let serve ~sw:_ ~backlog:_ ~port:_ _handler =
+
(* This would typically use Eio.Net to create a listening socket *)
+
(* For now, we'll provide a placeholder implementation *)
+
failwith "serve: Implementation requires Eio.Net integration"