at 25.11-pre 4.1 kB view raw
1{ pkgs, ... }: 2 3with pkgs.lib; 4 5let 6 makeKafkaTest = 7 name: 8 { 9 kafkaPackage, 10 mode ? "kraft", 11 }: 12 (import ../make-test-python.nix ({ 13 inherit name; 14 meta = with pkgs.lib.maintainers; { 15 maintainers = [ nequissimus ]; 16 }; 17 18 nodes = 19 { 20 kafka = 21 { ... }: 22 { 23 services.apache-kafka = mkMerge [ 24 ({ 25 enable = true; 26 package = kafkaPackage; 27 settings = { 28 "offsets.topic.replication.factor" = 1; 29 "log.dirs" = [ 30 "/var/lib/kafka/logdir1" 31 "/var/lib/kafka/logdir2" 32 ]; 33 }; 34 }) 35 (mkIf (mode == "zookeeper") { 36 settings = { 37 "zookeeper.session.timeout.ms" = 600000; 38 "zookeeper.connect" = [ "zookeeper1:2181" ]; 39 }; 40 }) 41 (mkIf (mode == "kraft") { 42 clusterId = "ak2fIHr4S8WWarOF_ODD0g"; 43 formatLogDirs = true; 44 settings = { 45 "node.id" = 1; 46 "process.roles" = [ 47 "broker" 48 "controller" 49 ]; 50 "listeners" = [ 51 "PLAINTEXT://:9092" 52 "CONTROLLER://:9093" 53 ]; 54 "listener.security.protocol.map" = [ 55 "PLAINTEXT:PLAINTEXT" 56 "CONTROLLER:PLAINTEXT" 57 ]; 58 "controller.quorum.voters" = [ 59 "1@kafka:9093" 60 ]; 61 "controller.listener.names" = [ "CONTROLLER" ]; 62 }; 63 }) 64 ]; 65 66 networking.firewall.allowedTCPPorts = [ 67 9092 68 9093 69 ]; 70 virtualisation.diskSize = 1024; 71 # i686 tests: qemu-system-i386 can simulate max 2047MB RAM (not 2048) 72 virtualisation.memorySize = 2047; 73 }; 74 } 75 // optionalAttrs (mode == "zookeeper") { 76 zookeeper1 = 77 { ... }: 78 { 79 services.zookeeper = { 80 enable = true; 81 }; 82 83 networking.firewall.allowedTCPPorts = [ 2181 ]; 84 virtualisation.diskSize = 1024; 85 }; 86 }; 87 88 testScript = '' 89 start_all() 90 91 ${optionalString (mode == "zookeeper") '' 92 zookeeper1.wait_for_unit("default.target") 93 zookeeper1.wait_for_unit("zookeeper.service") 94 zookeeper1.wait_for_open_port(2181) 95 ''} 96 97 kafka.wait_for_unit("default.target") 98 kafka.wait_for_unit("apache-kafka.service") 99 kafka.wait_for_open_port(9092) 100 101 kafka.wait_until_succeeds( 102 "${kafkaPackage}/bin/kafka-topics.sh --create " 103 + "--bootstrap-server localhost:9092 --partitions 1 " 104 + "--replication-factor 1 --topic testtopic" 105 ) 106 kafka.succeed( 107 "echo 'test 1' | " 108 + "${kafkaPackage}/bin/kafka-console-producer.sh " 109 + "--bootstrap-server localhost:9092 --topic testtopic" 110 ) 111 assert "test 1" in kafka.succeed( 112 "${kafkaPackage}/bin/kafka-console-consumer.sh " 113 + "--bootstrap-server localhost:9092 --topic testtopic " 114 + "--from-beginning --max-messages 1" 115 ) 116 ''; 117 })); 118 119in 120with pkgs; 121{ 122 kafka_3_7 = makeKafkaTest "kafka_3_7" { 123 kafkaPackage = apacheKafka_3_7; 124 mode = "zookeeper"; 125 }; 126 kafka_3_8 = makeKafkaTest "kafka_3_8" { 127 kafkaPackage = apacheKafka_3_8; 128 mode = "zookeeper"; 129 }; 130 kafka_3_9 = makeKafkaTest "kafka_3_9" { 131 kafkaPackage = apacheKafka_3_9; 132 mode = "zookeeper"; 133 }; 134 kafka_4_0 = makeKafkaTest "kafka_4_0" { kafkaPackage = apacheKafka_4_0; }; 135 kafka = makeKafkaTest "kafka" { kafkaPackage = apacheKafka; }; 136}