···
String.length record.content = 0
82
-
let read_params_from_flow ~sw:_ buf_read =
82
+
let read_params buf_read =
Printf.eprintf "[DEBUG] read_params_from_flow: Starting\n%!";
let params = ref KV.empty in
···
117
-
let read_stdin_from_flow ~sw:_ buf_read =
117
+
let read_stdin buf_read =
Printf.eprintf "[DEBUG] read_stdin_from_flow: Starting\n%!";
let data = Buffer.create 1024 in
···
(** Read DATA stream for Filter role *)
151
-
let read_data_from_flow buf_read =
151
+
let read_data buf_read =
let data_buf = Buffer.create 1024 in
···
(** Read request streams based on role *)
171
-
let read_request_streams ~sw request buf_read =
171
+
let read_request_streams request buf_read =
Printf.eprintf "[DEBUG] read_request_streams: Processing role=%s\n%!"
(Format.asprintf "%a" pp_role request.role);
···
Printf.eprintf "[DEBUG] read_request_streams: Responder role, reading STDIN\n%!";
180
-
let* stdin_data = read_stdin_from_flow ~sw buf_read in
180
+
let* stdin_data = read_stdin buf_read in
Printf.eprintf "[DEBUG] read_request_streams: Got STDIN data, %d bytes\n%!" (String.length stdin_data);
Ok { request with stdin_data }
Printf.eprintf "[DEBUG] read_request_streams: Filter role, reading STDIN and DATA\n%!";
185
-
let* stdin_data = read_stdin_from_flow ~sw buf_read in
185
+
let* stdin_data = read_stdin buf_read in
Printf.eprintf "[DEBUG] read_request_streams: Got STDIN data, %d bytes\n%!" (String.length stdin_data);
let request = { request with stdin_data } in
188
-
let* data = read_data_from_flow buf_read in
188
+
let* data = read_data buf_read in
Printf.eprintf "[DEBUG] read_request_streams: Got DATA stream, %d bytes\n%!" (String.length data);
Ok { request with data_stream = Some data }
192
-
let read_request_from_flow ~sw flow =
193
-
Printf.eprintf "[DEBUG] read_request_from_flow: Starting\n%!";
194
-
let buf_read = Eio.Buf_read.of_flow flow ~max_size:1000000 in
192
+
let read_request buf_read =
193
+
Printf.eprintf "[DEBUG] read_request: Starting\n%!";
197
-
Printf.eprintf "[DEBUG] read_request_from_flow: Reading BEGIN_REQUEST record\n%!";
196
+
Printf.eprintf "[DEBUG] read_request: Reading BEGIN_REQUEST record\n%!";
let begin_record = Fastcgi_record.read buf_read in
199
-
Printf.eprintf "[DEBUG] read_request_from_flow: Got BEGIN_REQUEST record: %s\n%!"
198
+
Printf.eprintf "[DEBUG] read_request: Got BEGIN_REQUEST record: %s\n%!"
(Format.asprintf "%a" (Fastcgi_record.pp ~max_content_len:50) begin_record);
let* request = create begin_record in
202
-
Printf.eprintf "[DEBUG] read_request_from_flow: Created request with role=%s, id=%d\n%!"
201
+
Printf.eprintf "[DEBUG] read_request: Created request with role=%s, id=%d\n%!"
(Format.asprintf "%a" pp_role request.role) request.request_id;
205
-
Printf.eprintf "[DEBUG] read_request_from_flow: Reading PARAMS stream\n%!";
206
-
let* params = read_params_from_flow ~sw buf_read in
207
-
Printf.eprintf "[DEBUG] read_request_from_flow: Got %d params\n%!" (Fastcgi_record.KV.cardinal params);
204
+
Printf.eprintf "[DEBUG] read_request: Reading PARAMS stream\n%!";
205
+
let* params = read_params buf_read in
206
+
Printf.eprintf "[DEBUG] read_request: Got %d params\n%!" (Fastcgi_record.KV.cardinal params);
let request = { request with params } in
(* Read remaining streams based on role *)
210
-
Printf.eprintf "[DEBUG] read_request_from_flow: Reading streams for role=%s\n%!"
209
+
Printf.eprintf "[DEBUG] read_request: Reading streams for role=%s\n%!"
(Format.asprintf "%a" pp_role request.role);
212
-
let result = read_request_streams ~sw request buf_read in
213
-
Printf.eprintf "[DEBUG] read_request_from_flow: Finished reading request\n%!";
211
+
let result = read_request_streams request buf_read in
212
+
Printf.eprintf "[DEBUG] read_request: Finished reading request\n%!";
217
-
Printf.eprintf "[DEBUG] read_request_from_flow: Hit End_of_file\n%!";
216
+
Printf.eprintf "[DEBUG] read_request: Hit End_of_file\n%!";
Error "Unexpected end of stream"
220
-
Printf.eprintf "[DEBUG] read_request_from_flow: Exception: %s\n%!" (Printexc.to_string exn);
219
+
Printf.eprintf "[DEBUG] read_request: Exception: %s\n%!" (Printexc.to_string exn);
Error (Printf.sprintf "Error reading request: %s" (Printexc.to_string exn))
(** {1 Response Generation} *)
···
276
-
let flow_to_stream_records ~sw:_ ~request_id ~record_type flow =
277
-
(* Read entire flow content *)
278
-
let buf = Buffer.create 4096 in
279
-
Eio.Flow.copy flow (Eio.Flow.buffer_sink buf);
280
-
let content = Buffer.contents buf in
281
-
string_to_stream_records ~request_id ~record_type content
283
-
let write_stream_records records sink =
284
-
(* Create a function to serialize a single record to a string *)
285
-
let serialize_record record =
286
-
let buf = Buffer.create 512 in
287
-
let buf_sink = Eio.Flow.buffer_sink buf in
288
-
Eio.Buf_write.with_flow buf_sink (fun buf_write ->
289
-
Fastcgi_record.write buf_write record
291
-
Buffer.contents buf
275
+
let write_stream_records buf_write request_id record_type content =
276
+
let max_chunk = 65535 in (* FastCGI max record content length *)
277
+
let len = String.length content in
279
+
let rec chunk_string pos =
281
+
() (* Empty terminator will be added separately *)
283
+
let chunk_len = min max_chunk (len - pos) in
284
+
let chunk = String.sub content pos chunk_len in
285
+
let record = Fastcgi_record.create ~version:1 ~record:record_type ~request_id ~content:chunk in
286
+
Fastcgi_record.write buf_write record;
287
+
chunk_string (pos + chunk_len)
294
-
(* Serialize all records and write to sink *)
295
-
List.iter (fun record ->
296
-
let serialized = serialize_record record in
297
-
Eio.Flow.copy_string serialized sink
292
+
(* Add stream terminator *)
293
+
let terminator = Fastcgi_record.create ~version:1 ~record:record_type ~request_id ~content:"" in
294
+
Fastcgi_record.write buf_write terminator
300
-
let make_end_request ~request_id ~app_status ~protocol_status =
296
+
let write_stdout_records buf_write request_id content =
297
+
write_stream_records buf_write request_id Stdout content
299
+
let write_stderr_records buf_write request_id content =
300
+
write_stream_records buf_write request_id Stderr content
302
+
let write_end_request buf_write request_id app_status protocol_status =
let buf = Bytes.create 8 in
Bytes.set_int32_be buf 0 (Int32.of_int app_status);
···
Bytes.set_uint8 buf 7 0; (* reserved *)
310
-
Fastcgi_record.create ~version:1 ~record:End_request ~request_id ~content
312
-
let write_response ~sw request ~stdout ~stderr sink app_status =
313
-
(* Convert stdout flow to STDOUT records *)
314
-
let stdout_records = flow_to_stream_records ~sw ~request_id:request.request_id ~record_type:Stdout stdout in
316
-
(* Convert stderr flow to STDERR records *)
317
-
let stderr_records = flow_to_stream_records ~sw ~request_id:request.request_id ~record_type:Stderr stderr in
319
-
(* Create END_REQUEST record *)
320
-
let end_record = make_end_request ~request_id:request.request_id ~app_status ~protocol_status:Request_complete in
322
-
(* Write all records *)
323
-
let all_records = stdout_records @ stderr_records @ [end_record] in
324
-
write_stream_records all_records sink
326
-
let write_error_response request sink proto_status =
327
-
let end_record = make_end_request ~request_id:request.request_id ~app_status:1 ~protocol_status:proto_status in
328
-
write_stream_records [end_record] sink
330
-
let write_abort_response request sink =
331
-
let end_record = make_end_request ~request_id:request.request_id ~app_status:0 ~protocol_status:Request_complete in
332
-
write_stream_records [end_record] sink
334
-
(** {1 High-level Request Processing} *)
336
-
type handler = t ->
337
-
stdout:Eio.Flow.sink_ty Eio.Resource.t ->
338
-
stderr:Eio.Flow.sink_ty Eio.Resource.t ->
341
-
let process_request ~sw request handler sink =
342
-
(* Create in-memory flows for stdout and stderr *)
343
-
let stdout_buf = Buffer.create 4096 in
344
-
let stderr_buf = Buffer.create 1024 in
345
-
let stdout_sink = Eio.Flow.buffer_sink stdout_buf in
346
-
let stderr_sink = Eio.Flow.buffer_sink stderr_buf in
349
-
let app_status = handler request ~stdout:stdout_sink ~stderr:stderr_sink in
351
-
(* Convert buffers to sources and write response *)
352
-
let stdout_source = Eio.Flow.string_source (Buffer.contents stdout_buf) in
353
-
let stderr_source = Eio.Flow.string_source (Buffer.contents stderr_buf) in
355
-
write_response ~sw request ~stdout:stdout_source ~stderr:stderr_source sink app_status
357
-
let process_request_with_flows ~sw request ~stdout ~stderr sink app_status =
358
-
write_response ~sw request ~stdout ~stderr sink app_status
360
-
(** {1 Connection Management} *)
362
-
let handle_connection ~sw flow handler =
363
-
let _buf_read = Eio.Buf_read.of_flow flow ~max_size:1000000 in
364
-
let _buf_write = Eio.Buf_write.create 4096 in
368
-
(* Read next request *)
369
-
match read_request_from_flow ~sw flow with
371
-
(* Log error and continue or close connection *)
372
-
Printf.eprintf "Error reading request: %s\n%!" msg
374
-
(* Process request *)
375
-
let response_buf = Buffer.create 4096 in
376
-
let response_sink = Eio.Flow.buffer_sink response_buf in
378
-
process_request ~sw request handler response_sink;
380
-
(* Write response to connection *)
381
-
let response_data = Buffer.contents response_buf in
382
-
Eio.Flow.copy (Eio.Flow.string_source response_data) flow;
384
-
(* Continue if keep_conn is true *)
385
-
if request.keep_conn then
388
-
| End_of_file -> () (* Connection closed *)
390
-
Printf.eprintf "Connection error: %s\n%!" (Printexc.to_string exn)
312
+
let record = Fastcgi_record.create ~version:1 ~record:End_request ~request_id ~content in
313
+
Fastcgi_record.write buf_write record
394
-
let serve ~sw:_ ~backlog:_ ~port:_ _handler =
395
-
(* This would typically use Eio.Net to create a listening socket *)
396
-
(* For now, we'll provide a placeholder implementation *)
397
-
failwith "serve: Implementation requires Eio.Net integration"