#!/usr/bin/env python3 """ Generate FastCGI test case files based on the specification. """ import struct import os # FastCGI constants from the specification FCGI_VERSION_1 = 1 # Record types FCGI_BEGIN_REQUEST = 1 FCGI_ABORT_REQUEST = 2 FCGI_END_REQUEST = 3 FCGI_PARAMS = 4 FCGI_STDIN = 5 FCGI_STDOUT = 6 FCGI_STDERR = 7 FCGI_DATA = 8 FCGI_GET_VALUES = 9 FCGI_GET_VALUES_RESULT = 10 FCGI_UNKNOWN_TYPE = 11 # Roles FCGI_RESPONDER = 1 FCGI_AUTHORIZER = 2 FCGI_FILTER = 3 # Flags FCGI_KEEP_CONN = 1 # Protocol status FCGI_REQUEST_COMPLETE = 0 FCGI_CANT_MPX_CONN = 1 FCGI_OVERLOADED = 2 FCGI_UNKNOWN_ROLE = 3 def create_record_header(version, record_type, request_id, content_length, padding_length=0): """Create a FastCGI record header.""" return struct.pack('>BBHHBB', version, record_type, request_id, content_length, padding_length, 0) # reserved def encode_name_value_pair(name, value): """Encode a name-value pair according to FastCGI spec.""" name_bytes = name.encode('utf-8') value_bytes = value.encode('utf-8') name_len = len(name_bytes) value_len = len(value_bytes) # Encode lengths if name_len < 128: name_len_encoded = struct.pack('B', name_len) else: name_len_encoded = struct.pack('>I', name_len | 0x80000000) if value_len < 128: value_len_encoded = struct.pack('B', value_len) else: value_len_encoded = struct.pack('>I', value_len | 0x80000000) return name_len_encoded + value_len_encoded + name_bytes + value_bytes def create_begin_request(): """Create FCGI_BEGIN_REQUEST record.""" # FCGI_BeginRequestBody body = struct.pack('>HB5x', FCGI_RESPONDER, FCGI_KEEP_CONN) header = create_record_header(FCGI_VERSION_1, FCGI_BEGIN_REQUEST, 1, len(body)) return header + body def create_begin_request_no_keep_conn(): """Create FCGI_BEGIN_REQUEST record without keep connection.""" body = struct.pack('>HB5x', FCGI_RESPONDER, 0) header = create_record_header(FCGI_VERSION_1, FCGI_BEGIN_REQUEST, 1, len(body)) return header + body def create_begin_request_authorizer(): """Create FCGI_BEGIN_REQUEST record for authorizer role.""" body = struct.pack('>HB5x', FCGI_AUTHORIZER, 0) header = create_record_header(FCGI_VERSION_1, FCGI_BEGIN_REQUEST, 2, len(body)) return header + body def create_begin_request_filter(): """Create FCGI_BEGIN_REQUEST record for filter role.""" body = struct.pack('>HB5x', FCGI_FILTER, FCGI_KEEP_CONN) header = create_record_header(FCGI_VERSION_1, FCGI_BEGIN_REQUEST, 3, len(body)) return header + body def create_end_request(): """Create FCGI_END_REQUEST record.""" # FCGI_EndRequestBody body = struct.pack('>IB3x', 0, FCGI_REQUEST_COMPLETE) # app_status=0, protocol_status=COMPLETE header = create_record_header(FCGI_VERSION_1, FCGI_END_REQUEST, 1, len(body)) return header + body def create_end_request_error(): """Create FCGI_END_REQUEST record with error status.""" body = struct.pack('>IB3x', 1, FCGI_REQUEST_COMPLETE) # app_status=1 (error) header = create_record_header(FCGI_VERSION_1, FCGI_END_REQUEST, 1, len(body)) return header + body def create_params_record(): """Create FCGI_PARAMS record with CGI environment variables.""" # Typical CGI parameters params = [ ("REQUEST_METHOD", "GET"), ("SCRIPT_NAME", "/test.cgi"), ("REQUEST_URI", "/test.cgi?foo=bar"), ("QUERY_STRING", "foo=bar"), ("SERVER_NAME", "localhost"), ("SERVER_PORT", "80"), ("HTTP_HOST", "localhost"), ("HTTP_USER_AGENT", "Mozilla/5.0"), ("CONTENT_TYPE", ""), ("CONTENT_LENGTH", "0") ] body = b'' for name, value in params: body += encode_name_value_pair(name, value) header = create_record_header(FCGI_VERSION_1, FCGI_PARAMS, 1, len(body)) return header + body def create_params_record_post(): """Create FCGI_PARAMS record for POST request.""" params = [ ("REQUEST_METHOD", "POST"), ("SCRIPT_NAME", "/form.cgi"), ("REQUEST_URI", "/form.cgi"), ("QUERY_STRING", ""), ("SERVER_NAME", "localhost"), ("SERVER_PORT", "443"), ("HTTPS", "on"), ("HTTP_HOST", "localhost"), ("HTTP_USER_AGENT", "curl/7.68.0"), ("CONTENT_TYPE", "application/x-www-form-urlencoded"), ("CONTENT_LENGTH", "23") ] body = b'' for name, value in params: body += encode_name_value_pair(name, value) header = create_record_header(FCGI_VERSION_1, FCGI_PARAMS, 1, len(body)) return header + body def create_empty_params(): """Create empty FCGI_PARAMS record (end of params stream).""" header = create_record_header(FCGI_VERSION_1, FCGI_PARAMS, 1, 0) return header def create_stdin_record(): """Create FCGI_STDIN record with form data.""" data = b"name=John&email=john@example.com" header = create_record_header(FCGI_VERSION_1, FCGI_STDIN, 1, len(data)) return header + data def create_empty_stdin(): """Create empty FCGI_STDIN record (end of stdin stream).""" header = create_record_header(FCGI_VERSION_1, FCGI_STDIN, 1, 0) return header def create_stdout_record(): """Create FCGI_STDOUT record with HTTP response.""" response = b"Content-Type: text/html\r\n\r\nHello World" header = create_record_header(FCGI_VERSION_1, FCGI_STDOUT, 1, len(response)) return header + response def create_empty_stdout(): """Create empty FCGI_STDOUT record (end of stdout stream).""" header = create_record_header(FCGI_VERSION_1, FCGI_STDOUT, 1, 0) return header def create_stderr_record(): """Create FCGI_STDERR record with error message.""" error_msg = b"Warning: Configuration file not found\n" header = create_record_header(FCGI_VERSION_1, FCGI_STDERR, 1, len(error_msg)) return header + error_msg def create_empty_stderr(): """Create empty FCGI_STDERR record (end of stderr stream).""" header = create_record_header(FCGI_VERSION_1, FCGI_STDERR, 1, 0) return header def create_get_values(): """Create FCGI_GET_VALUES record.""" # Request standard capability variables body = (encode_name_value_pair("FCGI_MAX_CONNS", "") + encode_name_value_pair("FCGI_MAX_REQS", "") + encode_name_value_pair("FCGI_MPXS_CONNS", "")) header = create_record_header(FCGI_VERSION_1, FCGI_GET_VALUES, 0, len(body)) return header + body def create_get_values_result(): """Create FCGI_GET_VALUES_RESULT record.""" body = (encode_name_value_pair("FCGI_MAX_CONNS", "1") + encode_name_value_pair("FCGI_MAX_REQS", "1") + encode_name_value_pair("FCGI_MPXS_CONNS", "0")) header = create_record_header(FCGI_VERSION_1, FCGI_GET_VALUES_RESULT, 0, len(body)) return header + body def create_unknown_type(): """Create FCGI_UNKNOWN_TYPE record.""" # FCGI_UnknownTypeBody body = struct.pack('B7x', 99) # unknown type 99 header = create_record_header(FCGI_VERSION_1, FCGI_UNKNOWN_TYPE, 0, len(body)) return header + body def create_abort_request(): """Create FCGI_ABORT_REQUEST record.""" header = create_record_header(FCGI_VERSION_1, FCGI_ABORT_REQUEST, 1, 0) return header def create_data_record(): """Create FCGI_DATA record (for Filter role).""" file_data = b"This is file content that needs to be filtered\nLine 2\nLine 3\n" header = create_record_header(FCGI_VERSION_1, FCGI_DATA, 3, len(file_data)) return header + file_data def create_empty_data(): """Create empty FCGI_DATA record (end of data stream).""" header = create_record_header(FCGI_VERSION_1, FCGI_DATA, 3, 0) return header def create_multiplexed_records(): """Create a sequence showing multiplexed requests.""" records = [] # Request 1 begins records.append(create_begin_request()) records.append(create_params_record()) records.append(create_empty_params()) # Request 2 begins (different request ID) body = struct.pack('>HB5x', FCGI_RESPONDER, FCGI_KEEP_CONN) header = create_record_header(FCGI_VERSION_1, FCGI_BEGIN_REQUEST, 2, len(body)) records.append(header + body) # Request 1 continues records.append(create_empty_stdin()) # Request 2 params params = [("REQUEST_METHOD", "POST"), ("SCRIPT_NAME", "/other.cgi")] body = b'' for name, value in params: body += encode_name_value_pair(name, value) header = create_record_header(FCGI_VERSION_1, FCGI_PARAMS, 2, len(body)) records.append(header + body) # Request 2 empty params header = create_record_header(FCGI_VERSION_1, FCGI_PARAMS, 2, 0) records.append(header) # Request 2 completes first response = b"Content-Type: text/plain\r\n\r\nRequest 2 done" header = create_record_header(FCGI_VERSION_1, FCGI_STDOUT, 2, len(response)) records.append(header + response) header = create_record_header(FCGI_VERSION_1, FCGI_STDOUT, 2, 0) records.append(header) body = struct.pack('>IB3x', 0, FCGI_REQUEST_COMPLETE) header = create_record_header(FCGI_VERSION_1, FCGI_END_REQUEST, 2, len(body)) records.append(header + body) # Request 1 completes response = b"Content-Type: text/html\r\n\r\nRequest 1 done" header = create_record_header(FCGI_VERSION_1, FCGI_STDOUT, 1, len(response)) records.append(header + response) header = create_record_header(FCGI_VERSION_1, FCGI_STDOUT, 1, 0) records.append(header) body = struct.pack('>IB3x', 0, FCGI_REQUEST_COMPLETE) header = create_record_header(FCGI_VERSION_1, FCGI_END_REQUEST, 1, len(body)) records.append(header + body) return b''.join(records) def create_large_record(): """Create a record with maximum content size.""" # Create a large response (just under 64KB) large_content = b"x" * 65000 header = create_record_header(FCGI_VERSION_1, FCGI_STDOUT, 1, len(large_content)) return header + large_content def create_padded_record(): """Create a record with padding for alignment.""" data = b"Hello" # 5 bytes padding_length = 3 # to align to 8-byte boundary (5 + 3 = 8) header = create_record_header(FCGI_VERSION_1, FCGI_STDOUT, 1, len(data), padding_length) padding = b'\x00' * padding_length return header + data + padding # Test case definitions test_cases = { "begin_request_responder.bin": create_begin_request, "begin_request_no_keep.bin": create_begin_request_no_keep_conn, "begin_request_authorizer.bin": create_begin_request_authorizer, "begin_request_filter.bin": create_begin_request_filter, "end_request_success.bin": create_end_request, "end_request_error.bin": create_end_request_error, "params_get.bin": create_params_record, "params_post.bin": create_params_record_post, "params_empty.bin": create_empty_params, "stdin_form_data.bin": create_stdin_record, "stdin_empty.bin": create_empty_stdin, "stdout_response.bin": create_stdout_record, "stdout_empty.bin": create_empty_stdout, "stderr_message.bin": create_stderr_record, "stderr_empty.bin": create_empty_stderr, "get_values.bin": create_get_values, "get_values_result.bin": create_get_values_result, "unknown_type.bin": create_unknown_type, "abort_request.bin": create_abort_request, "data_filter.bin": create_data_record, "data_empty.bin": create_empty_data, "multiplexed_requests.bin": create_multiplexed_records, "large_record.bin": create_large_record, "padded_record.bin": create_padded_record, } if __name__ == "__main__": for filename, creator in test_cases.items(): with open(filename, 'wb') as f: f.write(creator()) print(f"Created {filename}") print(f"\nGenerated {len(test_cases)} test case files") print("\nTest case descriptions:") print("- begin_request_*.bin: Various BEGIN_REQUEST records for different roles") print("- end_request_*.bin: END_REQUEST records with different status codes") print("- params_*.bin: PARAMS records with CGI environment variables") print("- stdin_*.bin: STDIN records with request body data") print("- stdout_*.bin: STDOUT records with response data") print("- stderr_*.bin: STDERR records with error messages") print("- get_values*.bin: Management records for capability negotiation") print("- unknown_type.bin: Unknown record type handling") print("- abort_request.bin: Request abortion") print("- data_*.bin: DATA records for Filter role") print("- multiplexed_requests.bin: Multiple concurrent requests") print("- large_record.bin: Maximum size record") print("- padded_record.bin: Record with padding for alignment")