at 24.11-pre 3.8 kB view raw
1{ system ? builtins.currentSystem, 2 config ? {}, 3 pkgs ? import ../.. { inherit system config; } 4}: 5 6with pkgs.lib; 7 8let 9 makeKafkaTest = name: { kafkaPackage, mode ? "zookeeper" }: (import ./make-test-python.nix ({ 10 inherit name; 11 meta = with pkgs.lib.maintainers; { 12 maintainers = [ nequissimus ]; 13 }; 14 15 nodes = { 16 kafka = { ... }: { 17 services.apache-kafka = mkMerge [ 18 ({ 19 enable = true; 20 package = kafkaPackage; 21 settings = { 22 "offsets.topic.replication.factor" = 1; 23 "log.dirs" = [ 24 "/var/lib/kafka/logdir1" 25 "/var/lib/kafka/logdir2" 26 ]; 27 }; 28 }) 29 (mkIf (mode == "zookeeper") { 30 settings = { 31 "zookeeper.session.timeout.ms" = 600000; 32 "zookeeper.connect" = [ "zookeeper1:2181" ]; 33 }; 34 }) 35 (mkIf (mode == "kraft") { 36 clusterId = "ak2fIHr4S8WWarOF_ODD0g"; 37 formatLogDirs = true; 38 settings = { 39 "node.id" = 1; 40 "process.roles" = [ 41 "broker" 42 "controller" 43 ]; 44 "listeners" = [ 45 "PLAINTEXT://:9092" 46 "CONTROLLER://:9093" 47 ]; 48 "listener.security.protocol.map" = [ 49 "PLAINTEXT:PLAINTEXT" 50 "CONTROLLER:PLAINTEXT" 51 ]; 52 "controller.quorum.voters" = [ 53 "1@kafka:9093" 54 ]; 55 "controller.listener.names" = [ "CONTROLLER" ]; 56 }; 57 }) 58 ]; 59 60 networking.firewall.allowedTCPPorts = [ 9092 9093 ]; 61 # i686 tests: qemu-system-i386 can simulate max 2047MB RAM (not 2048) 62 virtualisation.memorySize = 2047; 63 }; 64 } // optionalAttrs (mode == "zookeeper") { 65 zookeeper1 = { ... }: { 66 services.zookeeper = { 67 enable = true; 68 }; 69 70 networking.firewall.allowedTCPPorts = [ 2181 ]; 71 }; 72 }; 73 74 testScript = '' 75 start_all() 76 77 ${optionalString (mode == "zookeeper") '' 78 zookeeper1.wait_for_unit("default.target") 79 zookeeper1.wait_for_unit("zookeeper.service") 80 zookeeper1.wait_for_open_port(2181) 81 ''} 82 83 kafka.wait_for_unit("default.target") 84 kafka.wait_for_unit("apache-kafka.service") 85 kafka.wait_for_open_port(9092) 86 87 kafka.wait_until_succeeds( 88 "${kafkaPackage}/bin/kafka-topics.sh --create " 89 + "--bootstrap-server localhost:9092 --partitions 1 " 90 + "--replication-factor 1 --topic testtopic" 91 ) 92 kafka.succeed( 93 "echo 'test 1' | " 94 + "${kafkaPackage}/bin/kafka-console-producer.sh " 95 + "--broker-list localhost:9092 --topic testtopic" 96 ) 97 assert "test 1" in kafka.succeed( 98 "${kafkaPackage}/bin/kafka-console-consumer.sh " 99 + "--bootstrap-server localhost:9092 --topic testtopic " 100 + "--from-beginning --max-messages 1" 101 ) 102 ''; 103 }) { inherit system; }); 104 105in with pkgs; { 106 kafka_2_8 = makeKafkaTest "kafka_2_8" { kafkaPackage = apacheKafka_2_8; }; 107 kafka_3_0 = makeKafkaTest "kafka_3_0" { kafkaPackage = apacheKafka_3_0; }; 108 kafka_3_1 = makeKafkaTest "kafka_3_1" { kafkaPackage = apacheKafka_3_1; }; 109 kafka_3_2 = makeKafkaTest "kafka_3_2" { kafkaPackage = apacheKafka_3_2; }; 110 kafka_3_3 = makeKafkaTest "kafka_3_3" { kafkaPackage = apacheKafka_3_3; }; 111 kafka_3_4 = makeKafkaTest "kafka_3_4" { kafkaPackage = apacheKafka_3_4; }; 112 kafka_3_5 = makeKafkaTest "kafka_3_5" { kafkaPackage = apacheKafka_3_5; }; 113 kafka = makeKafkaTest "kafka" { kafkaPackage = apacheKafka; }; 114 kafka_kraft = makeKafkaTest "kafka_kraft" { kafkaPackage = apacheKafka; mode = "kraft"; }; 115}