FastCGI implementation in OCaml
1open Fastcgi_record
2
3(** {1 Request Roles} *)
4
5type role =
6 | Responder
7 | Authorizer
8 | Filter
9
10let pp_role ppf = function
11 | Responder -> Format.pp_print_string ppf "Responder"
12 | Authorizer -> Format.pp_print_string ppf "Authorizer"
13 | Filter -> Format.pp_print_string ppf "Filter"
14
15let role_of_begin_request record =
16 if record.record_type <> Begin_request then
17 Error "Expected BEGIN_REQUEST record"
18 else if String.length record.content <> 8 then
19 Error "Invalid BEGIN_REQUEST content length"
20 else
21 let content = record.content in
22 let role_int = (Char.code content.[0] lsl 8) lor (Char.code content.[1]) in
23 match role_int with
24 | 1 -> Ok Responder
25 | 2 -> Ok Authorizer
26 | 3 -> Ok Filter
27 | n -> Error (Printf.sprintf "Unknown FastCGI role: %d" n)
28
29(** {1 Request Context} *)
30
31type t = {
32 request_id : request_id;
33 role : role;
34 keep_conn : bool;
35 params : KV.t;
36 stdin_data : string;
37 data_stream : string option;
38}
39
40let pp ppf request =
41 let data_str = match request.data_stream with
42 | None -> "None"
43 | Some d -> Printf.sprintf "Some(%d bytes)" (String.length d)
44 in
45 Format.fprintf ppf
46 "@[<2>{ request_id = %d;@ role = %a;@ keep_conn = %b;@ params = %a;@ stdin = %d bytes;@ data = %s }@]"
47 request.request_id
48 pp_role request.role
49 request.keep_conn
50 (KV.pp) request.params
51 (String.length request.stdin_data)
52 data_str
53
54let create record =
55 match role_of_begin_request record with
56 | Error _ as e -> e
57 | Ok role ->
58 if String.length record.content <> 8 then
59 Error "Invalid BEGIN_REQUEST content length"
60 else
61 let flags_int = Char.code record.content.[2] in
62 let keep_conn = (flags_int land 1) <> 0 in
63 Ok {
64 request_id = record.request_id;
65 role;
66 keep_conn;
67 params = KV.empty;
68 stdin_data = "";
69 data_stream = if role = Filter then Some "" else None;
70 }
71
72
73(** {1 Stream Processing} *)
74
75(** Helper functions for result binding to simplify nested pattern matching *)
76let ( let* ) = Result.bind
77
78let is_stream_terminator record =
79 String.length record.content = 0
80
81
82let read_params_from_flow ~sw:_ buf_read =
83 Printf.eprintf "[DEBUG] read_params_from_flow: Starting\n%!";
84 let params = ref KV.empty in
85 let rec loop () =
86 try
87 Printf.eprintf "[DEBUG] read_params_from_flow: Reading next PARAMS record\n%!";
88 let record = Fastcgi_record.read buf_read in
89 Printf.eprintf "[DEBUG] read_params_from_flow: Got record type=%s, content_length=%d\n%!"
90 (Format.asprintf "%a" pp_record record.record_type)
91 (String.length record.content);
92 if record.record_type <> Params then
93 Error (Printf.sprintf "Expected PARAMS record, got %s"
94 (Format.asprintf "%a" pp_record record.record_type))
95 else if is_stream_terminator record then (
96 Printf.eprintf "[DEBUG] read_params_from_flow: Got stream terminator, returning %d params\n%!"
97 (Fastcgi_record.KV.cardinal !params);
98 Ok !params
99 ) else (
100 let record_params = KV.decode record.content in
101 Printf.eprintf "[DEBUG] read_params_from_flow: Decoded %d params from record\n%!"
102 (Fastcgi_record.KV.cardinal record_params);
103 params := KV.to_seq record_params
104 |> Seq.fold_left (fun acc (k, v) -> KV.add k v acc) !params;
105 loop ()
106 )
107 with
108 | End_of_file ->
109 Printf.eprintf "[DEBUG] read_params_from_flow: Hit End_of_file\n%!";
110 Error "Unexpected end of stream while reading PARAMS"
111 | exn ->
112 Printf.eprintf "[DEBUG] read_params_from_flow: Exception: %s\n%!" (Printexc.to_string exn);
113 Error (Printf.sprintf "Error reading PARAMS: %s" (Printexc.to_string exn))
114 in
115 loop ()
116
117let read_stdin_from_flow ~sw:_ buf_read =
118 Printf.eprintf "[DEBUG] read_stdin_from_flow: Starting\n%!";
119 let data = Buffer.create 1024 in
120 let rec loop () =
121 try
122 Printf.eprintf "[DEBUG] read_stdin_from_flow: Reading next STDIN record\n%!";
123 let record = Fastcgi_record.read buf_read in
124 Printf.eprintf "[DEBUG] read_stdin_from_flow: Got record type=%s, content_length=%d\n%!"
125 (Format.asprintf "%a" pp_record record.record_type)
126 (String.length record.content);
127 if record.record_type <> Stdin then
128 Error (Printf.sprintf "Expected STDIN record, got %s"
129 (Format.asprintf "%a" pp_record record.record_type))
130 else if is_stream_terminator record then (
131 Printf.eprintf "[DEBUG] read_stdin_from_flow: Got stream terminator, total stdin=%d bytes\n%!"
132 (Buffer.length data);
133 Ok (Buffer.contents data)
134 ) else (
135 Buffer.add_string data record.content;
136 Printf.eprintf "[DEBUG] read_stdin_from_flow: Added %d bytes, total now %d\n%!"
137 (String.length record.content) (Buffer.length data);
138 loop ()
139 )
140 with
141 | End_of_file ->
142 Printf.eprintf "[DEBUG] read_stdin_from_flow: Hit End_of_file\n%!";
143 Error "Unexpected end of stream while reading STDIN"
144 | exn ->
145 Printf.eprintf "[DEBUG] read_stdin_from_flow: Exception: %s\n%!" (Printexc.to_string exn);
146 Error (Printf.sprintf "Error reading STDIN: %s" (Printexc.to_string exn))
147 in
148 loop ()
149
150(** Read DATA stream for Filter role *)
151let read_data_from_flow buf_read =
152 let data_buf = Buffer.create 1024 in
153 let rec read_data () =
154 try
155 let record = Fastcgi_record.read buf_read in
156 if record.record_type <> Data then
157 Error "Expected DATA record"
158 else if is_stream_terminator record then
159 Ok (Buffer.contents data_buf)
160 else (
161 Buffer.add_string data_buf record.content;
162 read_data ()
163 )
164 with
165 | End_of_file -> Error "Unexpected end of DATA stream"
166 | exn -> Error (Printf.sprintf "Error reading DATA: %s" (Printexc.to_string exn))
167 in
168 read_data ()
169
170(** Read request streams based on role *)
171let read_request_streams ~sw request buf_read =
172 Printf.eprintf "[DEBUG] read_request_streams: Processing role=%s\n%!"
173 (Format.asprintf "%a" pp_role request.role);
174 match request.role with
175 | Authorizer ->
176 Printf.eprintf "[DEBUG] read_request_streams: Authorizer role, no streams to read\n%!";
177 Ok request
178 | Responder ->
179 Printf.eprintf "[DEBUG] read_request_streams: Responder role, reading STDIN\n%!";
180 let* stdin_data = read_stdin_from_flow ~sw buf_read in
181 Printf.eprintf "[DEBUG] read_request_streams: Got STDIN data, %d bytes\n%!" (String.length stdin_data);
182 Ok { request with stdin_data }
183 | Filter ->
184 Printf.eprintf "[DEBUG] read_request_streams: Filter role, reading STDIN and DATA\n%!";
185 let* stdin_data = read_stdin_from_flow ~sw buf_read in
186 Printf.eprintf "[DEBUG] read_request_streams: Got STDIN data, %d bytes\n%!" (String.length stdin_data);
187 let request = { request with stdin_data } in
188 let* data = read_data_from_flow buf_read in
189 Printf.eprintf "[DEBUG] read_request_streams: Got DATA stream, %d bytes\n%!" (String.length data);
190 Ok { request with data_stream = Some data }
191
192let read_request_from_flow ~sw flow =
193 Printf.eprintf "[DEBUG] read_request_from_flow: Starting\n%!";
194 let buf_read = Eio.Buf_read.of_flow flow ~max_size:1000000 in
195 try
196 (* Read BEGIN_REQUEST *)
197 Printf.eprintf "[DEBUG] read_request_from_flow: Reading BEGIN_REQUEST record\n%!";
198 let begin_record = Fastcgi_record.read buf_read in
199 Printf.eprintf "[DEBUG] read_request_from_flow: Got BEGIN_REQUEST record: %s\n%!"
200 (Format.asprintf "%a" (Fastcgi_record.pp ~max_content_len:50) begin_record);
201 let* request = create begin_record in
202 Printf.eprintf "[DEBUG] read_request_from_flow: Created request with role=%s, id=%d\n%!"
203 (Format.asprintf "%a" pp_role request.role) request.request_id;
204 (* Read PARAMS stream *)
205 Printf.eprintf "[DEBUG] read_request_from_flow: Reading PARAMS stream\n%!";
206 let* params = read_params_from_flow ~sw buf_read in
207 Printf.eprintf "[DEBUG] read_request_from_flow: Got %d params\n%!" (Fastcgi_record.KV.cardinal params);
208 let request = { request with params } in
209 (* Read remaining streams based on role *)
210 Printf.eprintf "[DEBUG] read_request_from_flow: Reading streams for role=%s\n%!"
211 (Format.asprintf "%a" pp_role request.role);
212 let result = read_request_streams ~sw request buf_read in
213 Printf.eprintf "[DEBUG] read_request_from_flow: Finished reading request\n%!";
214 result
215 with
216 | End_of_file ->
217 Printf.eprintf "[DEBUG] read_request_from_flow: Hit End_of_file\n%!";
218 Error "Unexpected end of stream"
219 | exn ->
220 Printf.eprintf "[DEBUG] read_request_from_flow: Exception: %s\n%!" (Printexc.to_string exn);
221 Error (Printf.sprintf "Error reading request: %s" (Printexc.to_string exn))
222
223(** {1 Response Generation} *)
224
225type app_status = int
226type protocol_status =
227 | Request_complete
228 | Cant_mpx_conn
229 | Overloaded
230 | Unknown_role
231
232let pp_protocol_status ppf = function
233 | Request_complete -> Format.pp_print_string ppf "Request_complete"
234 | Cant_mpx_conn -> Format.pp_print_string ppf "Cant_mpx_conn"
235 | Overloaded -> Format.pp_print_string ppf "Overloaded"
236 | Unknown_role -> Format.pp_print_string ppf "Unknown_role"
237
238let protocol_status_to_int = function
239 | Request_complete -> 0
240 | Cant_mpx_conn -> 1
241 | Overloaded -> 2
242 | Unknown_role -> 3
243
244let stream_records_to_string records =
245 let buf = Buffer.create 1024 in
246 List.iter (fun record ->
247 if not (is_stream_terminator record) then
248 Buffer.add_string buf record.content
249 ) records;
250 Buffer.contents buf
251
252let string_to_stream_records ~request_id ~record_type content =
253 let max_chunk = 65535 in (* FastCGI max record content length *)
254 let len = String.length content in
255 let records = ref [] in
256
257 let rec chunk_string pos =
258 if pos >= len then
259 () (* Empty terminator will be added separately *)
260 else
261 let chunk_len = min max_chunk (len - pos) in
262 let chunk = String.sub content pos chunk_len in
263 let record = Fastcgi_record.create ~version:1 ~record:record_type ~request_id ~content:chunk in
264 records := record :: !records;
265 chunk_string (pos + chunk_len)
266 in
267
268 chunk_string 0;
269
270 (* Add stream terminator *)
271 let terminator = Fastcgi_record.create ~version:1 ~record:record_type ~request_id ~content:"" in
272 records := terminator :: !records;
273
274 List.rev !records
275
276let flow_to_stream_records ~sw:_ ~request_id ~record_type flow =
277 (* Read entire flow content *)
278 let buf = Buffer.create 4096 in
279 Eio.Flow.copy flow (Eio.Flow.buffer_sink buf);
280 let content = Buffer.contents buf in
281 string_to_stream_records ~request_id ~record_type content
282
283let write_stream_records records sink =
284 (* Create a function to serialize a single record to a string *)
285 let serialize_record record =
286 let buf = Buffer.create 512 in
287 let buf_sink = Eio.Flow.buffer_sink buf in
288 Eio.Buf_write.with_flow buf_sink (fun buf_write ->
289 Fastcgi_record.write buf_write record
290 );
291 Buffer.contents buf
292 in
293
294 (* Serialize all records and write to sink *)
295 List.iter (fun record ->
296 let serialized = serialize_record record in
297 Eio.Flow.copy_string serialized sink
298 ) records
299
300let make_end_request ~request_id ~app_status ~protocol_status =
301 let content =
302 let buf = Bytes.create 8 in
303 Bytes.set_int32_be buf 0 (Int32.of_int app_status);
304 Bytes.set_uint8 buf 4 (protocol_status_to_int protocol_status);
305 Bytes.set_uint8 buf 5 0; (* reserved *)
306 Bytes.set_uint8 buf 6 0; (* reserved *)
307 Bytes.set_uint8 buf 7 0; (* reserved *)
308 Bytes.to_string buf
309 in
310 Fastcgi_record.create ~version:1 ~record:End_request ~request_id ~content
311
312let write_response ~sw request ~stdout ~stderr sink app_status =
313 (* Convert stdout flow to STDOUT records *)
314 let stdout_records = flow_to_stream_records ~sw ~request_id:request.request_id ~record_type:Stdout stdout in
315
316 (* Convert stderr flow to STDERR records *)
317 let stderr_records = flow_to_stream_records ~sw ~request_id:request.request_id ~record_type:Stderr stderr in
318
319 (* Create END_REQUEST record *)
320 let end_record = make_end_request ~request_id:request.request_id ~app_status ~protocol_status:Request_complete in
321
322 (* Write all records *)
323 let all_records = stdout_records @ stderr_records @ [end_record] in
324 write_stream_records all_records sink
325
326let write_error_response request sink proto_status =
327 let end_record = make_end_request ~request_id:request.request_id ~app_status:1 ~protocol_status:proto_status in
328 write_stream_records [end_record] sink
329
330let write_abort_response request sink =
331 let end_record = make_end_request ~request_id:request.request_id ~app_status:0 ~protocol_status:Request_complete in
332 write_stream_records [end_record] sink
333
334(** {1 High-level Request Processing} *)
335
336type handler = t ->
337 stdout:Eio.Flow.sink_ty Eio.Resource.t ->
338 stderr:Eio.Flow.sink_ty Eio.Resource.t ->
339 app_status
340
341let process_request ~sw request handler sink =
342 (* Create in-memory flows for stdout and stderr *)
343 let stdout_buf = Buffer.create 4096 in
344 let stderr_buf = Buffer.create 1024 in
345 let stdout_sink = Eio.Flow.buffer_sink stdout_buf in
346 let stderr_sink = Eio.Flow.buffer_sink stderr_buf in
347
348 (* Call handler *)
349 let app_status = handler request ~stdout:stdout_sink ~stderr:stderr_sink in
350
351 (* Convert buffers to sources and write response *)
352 let stdout_source = Eio.Flow.string_source (Buffer.contents stdout_buf) in
353 let stderr_source = Eio.Flow.string_source (Buffer.contents stderr_buf) in
354
355 write_response ~sw request ~stdout:stdout_source ~stderr:stderr_source sink app_status
356
357let process_request_with_flows ~sw request ~stdout ~stderr sink app_status =
358 write_response ~sw request ~stdout ~stderr sink app_status
359
360(** {1 Connection Management} *)
361
362let handle_connection ~sw flow handler =
363 let _buf_read = Eio.Buf_read.of_flow flow ~max_size:1000000 in
364 let _buf_write = Eio.Buf_write.create 4096 in
365
366 let rec loop () =
367 try
368 (* Read next request *)
369 match read_request_from_flow ~sw flow with
370 | Error msg ->
371 (* Log error and continue or close connection *)
372 Printf.eprintf "Error reading request: %s\n%!" msg
373 | Ok request ->
374 (* Process request *)
375 let response_buf = Buffer.create 4096 in
376 let response_sink = Eio.Flow.buffer_sink response_buf in
377
378 process_request ~sw request handler response_sink;
379
380 (* Write response to connection *)
381 let response_data = Buffer.contents response_buf in
382 Eio.Flow.copy (Eio.Flow.string_source response_data) flow;
383
384 (* Continue if keep_conn is true *)
385 if request.keep_conn then
386 loop ()
387 with
388 | End_of_file -> () (* Connection closed *)
389 | exn ->
390 Printf.eprintf "Connection error: %s\n%!" (Printexc.to_string exn)
391 in
392 loop ()
393
394let serve ~sw:_ ~backlog:_ ~port:_ _handler =
395 (* This would typically use Eio.Net to create a listening socket *)
396 (* For now, we'll provide a placeholder implementation *)
397 failwith "serve: Implementation requires Eio.Net integration"