at master 4.0 kB view raw
1{ pkgs, ... }: 2 3with pkgs.lib; 4 5let 6 makeKafkaTest = 7 name: 8 { 9 kafkaPackage, 10 mode ? "kraft", 11 }: 12 (import ../make-test-python.nix ({ 13 inherit name; 14 meta = with pkgs.lib.maintainers; { 15 maintainers = [ nequissimus ]; 16 }; 17 18 nodes = { 19 kafka = 20 { ... }: 21 { 22 services.apache-kafka = mkMerge [ 23 ({ 24 enable = true; 25 package = kafkaPackage; 26 settings = { 27 "offsets.topic.replication.factor" = 1; 28 "log.dirs" = [ 29 "/var/lib/kafka/logdir1" 30 "/var/lib/kafka/logdir2" 31 ]; 32 }; 33 }) 34 (mkIf (mode == "zookeeper") { 35 settings = { 36 "zookeeper.session.timeout.ms" = 600000; 37 "zookeeper.connect" = [ "zookeeper1:2181" ]; 38 }; 39 }) 40 (mkIf (mode == "kraft") { 41 clusterId = "ak2fIHr4S8WWarOF_ODD0g"; 42 formatLogDirs = true; 43 settings = { 44 "node.id" = 1; 45 "process.roles" = [ 46 "broker" 47 "controller" 48 ]; 49 "listeners" = [ 50 "PLAINTEXT://:9092" 51 "CONTROLLER://:9093" 52 ]; 53 "listener.security.protocol.map" = [ 54 "PLAINTEXT:PLAINTEXT" 55 "CONTROLLER:PLAINTEXT" 56 ]; 57 "controller.quorum.voters" = [ 58 "1@kafka:9093" 59 ]; 60 "controller.listener.names" = [ "CONTROLLER" ]; 61 }; 62 }) 63 ]; 64 65 networking.firewall.allowedTCPPorts = [ 66 9092 67 9093 68 ]; 69 virtualisation.diskSize = 1024; 70 # i686 tests: qemu-system-i386 can simulate max 2047MB RAM (not 2048) 71 virtualisation.memorySize = 2047; 72 }; 73 } 74 // optionalAttrs (mode == "zookeeper") { 75 zookeeper1 = 76 { ... }: 77 { 78 services.zookeeper = { 79 enable = true; 80 }; 81 82 networking.firewall.allowedTCPPorts = [ 2181 ]; 83 virtualisation.diskSize = 1024; 84 }; 85 }; 86 87 testScript = '' 88 start_all() 89 90 ${optionalString (mode == "zookeeper") '' 91 zookeeper1.wait_for_unit("default.target") 92 zookeeper1.wait_for_unit("zookeeper.service") 93 zookeeper1.wait_for_open_port(2181) 94 ''} 95 96 kafka.wait_for_unit("default.target") 97 kafka.wait_for_unit("apache-kafka.service") 98 kafka.wait_for_open_port(9092) 99 100 kafka.wait_until_succeeds( 101 "${kafkaPackage}/bin/kafka-topics.sh --create " 102 + "--bootstrap-server localhost:9092 --partitions 1 " 103 + "--replication-factor 1 --topic testtopic" 104 ) 105 kafka.succeed( 106 "echo 'test 1' | " 107 + "${kafkaPackage}/bin/kafka-console-producer.sh " 108 + "--bootstrap-server localhost:9092 --topic testtopic" 109 ) 110 assert "test 1" in kafka.succeed( 111 "${kafkaPackage}/bin/kafka-console-consumer.sh " 112 + "--bootstrap-server localhost:9092 --topic testtopic " 113 + "--from-beginning --max-messages 1" 114 ) 115 ''; 116 })); 117 118in 119with pkgs; 120{ 121 kafka_3_7 = makeKafkaTest "kafka_3_7" { 122 kafkaPackage = apacheKafka_3_7; 123 mode = "zookeeper"; 124 }; 125 kafka_3_8 = makeKafkaTest "kafka_3_8" { 126 kafkaPackage = apacheKafka_3_8; 127 mode = "zookeeper"; 128 }; 129 kafka_3_9 = makeKafkaTest "kafka_3_9" { 130 kafkaPackage = apacheKafka_3_9; 131 mode = "zookeeper"; 132 }; 133 kafka_4_0 = makeKafkaTest "kafka_4_0" { kafkaPackage = apacheKafka_4_0; }; 134 kafka_4_1 = makeKafkaTest "kafka_4_1" { kafkaPackage = apacheKafka_4_1; }; 135 kafka = makeKafkaTest "kafka" { kafkaPackage = apacheKafka; }; 136}