at 22.05-pre 2.4 kB view raw
1{ system ? builtins.currentSystem, 2 config ? {}, 3 pkgs ? import ../.. { inherit system config; } 4}: 5 6with pkgs.lib; 7 8let 9 makeKafkaTest = name: kafkaPackage: (import ./make-test-python.nix ({ 10 inherit name; 11 meta = with pkgs.lib.maintainers; { 12 maintainers = [ nequissimus ]; 13 }; 14 15 nodes = { 16 zookeeper1 = { ... }: { 17 services.zookeeper = { 18 enable = true; 19 }; 20 21 networking.firewall.allowedTCPPorts = [ 2181 ]; 22 }; 23 kafka = { ... }: { 24 services.apache-kafka = { 25 enable = true; 26 extraProperties = '' 27 offsets.topic.replication.factor = 1 28 zookeeper.session.timeout.ms = 600000 29 ''; 30 package = kafkaPackage; 31 zookeeper = "zookeeper1:2181"; 32 }; 33 34 networking.firewall.allowedTCPPorts = [ 9092 ]; 35 # i686 tests: qemu-system-i386 can simulate max 2047MB RAM (not 2048) 36 virtualisation.memorySize = 2047; 37 }; 38 }; 39 40 testScript = '' 41 start_all() 42 43 zookeeper1.wait_for_unit("default.target") 44 zookeeper1.wait_for_unit("zookeeper.service") 45 zookeeper1.wait_for_open_port(2181) 46 47 kafka.wait_for_unit("default.target") 48 kafka.wait_for_unit("apache-kafka.service") 49 kafka.wait_for_open_port(9092) 50 51 kafka.wait_until_succeeds( 52 "${kafkaPackage}/bin/kafka-topics.sh --create " 53 + "--zookeeper zookeeper1:2181 --partitions 1 " 54 + "--replication-factor 1 --topic testtopic" 55 ) 56 kafka.succeed( 57 "echo 'test 1' | " 58 + "${kafkaPackage}/bin/kafka-console-producer.sh " 59 + "--broker-list localhost:9092 --topic testtopic" 60 ) 61 '' + (if name == "kafka_0_9" then '' 62 assert "test 1" in kafka.succeed( 63 "${kafkaPackage}/bin/kafka-console-consumer.sh " 64 + "--zookeeper zookeeper1:2181 --topic testtopic " 65 + "--from-beginning --max-messages 1" 66 ) 67 '' else '' 68 assert "test 1" in kafka.succeed( 69 "${kafkaPackage}/bin/kafka-console-consumer.sh " 70 + "--bootstrap-server localhost:9092 --topic testtopic " 71 + "--from-beginning --max-messages 1" 72 ) 73 ''); 74 }) { inherit system; }); 75 76in with pkgs; { 77 kafka_2_7 = makeKafkaTest "kafka_2_7" apacheKafka_2_7; 78 kafka_2_8 = makeKafkaTest "kafka_2_8" apacheKafka_2_8; 79}