1import ../make-test-python.nix ( 2 { lib, pkgs, ... }: 3 4 let 5 inherit (lib) mkMerge; 6 7 # Generate with `kafka-storage.sh random-uuid` 8 clusterId = "ii5pZE5LRkSeWrnyBhMOYQ"; 9 10 kafkaConfig = { 11 networking.firewall.allowedTCPPorts = [ 12 9092 13 9093 14 ]; 15 16 virtualisation.diskSize = 1024; 17 virtualisation.memorySize = 1024 * 2; 18 19 environment.systemPackages = [ pkgs.apacheKafka ]; 20 21 services.apache-kafka = { 22 enable = true; 23 24 clusterId = "${clusterId}"; 25 26 formatLogDirs = true; 27 28 settings = { 29 listeners = [ 30 "PLAINTEXT://:9092" 31 "CONTROLLER://:9093" 32 ]; 33 "listener.security.protocol.map" = [ 34 "PLAINTEXT:PLAINTEXT" 35 "CONTROLLER:PLAINTEXT" 36 ]; 37 "controller.quorum.voters" = lib.imap1 (i: name: "${toString i}@${name}:9093") ( 38 builtins.attrNames kafkaNodes 39 ); 40 "controller.listener.names" = [ "CONTROLLER" ]; 41 42 "process.roles" = [ 43 "broker" 44 "controller" 45 ]; 46 47 "log.dirs" = [ "/var/lib/apache-kafka" ]; 48 "num.partitions" = 6; 49 "offsets.topic.replication.factor" = 2; 50 "transaction.state.log.replication.factor" = 2; 51 "transaction.state.log.min.isr" = 2; 52 }; 53 }; 54 55 systemd.services.apache-kafka = { 56 after = [ "network-online.target" ]; 57 requires = [ "network-online.target" ]; 58 serviceConfig.StateDirectory = "apache-kafka"; 59 }; 60 }; 61 62 extraKafkaConfig = { 63 kafka1 = { 64 services.apache-kafka.settings = { 65 "node.id" = 1; 66 "broker.rack" = 1; 67 }; 68 }; 69 70 kafka2 = { 71 services.apache-kafka.settings = { 72 "node.id" = 2; 73 "broker.rack" = 2; 74 }; 75 }; 76 77 kafka3 = { 78 services.apache-kafka.settings = { 79 "node.id" = 3; 80 "broker.rack" = 3; 81 }; 82 }; 83 84 kafka4 = { 85 services.apache-kafka.settings = { 86 "node.id" = 4; 87 "broker.rack" = 3; 88 }; 89 }; 90 }; 91 92 kafkaNodes = builtins.mapAttrs ( 93 _: val: 94 mkMerge [ 95 val 96 kafkaConfig 97 ] 98 ) extraKafkaConfig; 99 in 100 { 101 name = "kafka-cluster"; 102 meta = with pkgs.lib.maintainers; { 103 maintainers = [ jpds ]; 104 }; 105 106 nodes = { 107 inherit (kafkaNodes) 108 kafka1 109 kafka2 110 kafka3 111 kafka4 112 ; 113 114 client = 115 { config, ... }: 116 { 117 environment.systemPackages = [ pkgs.apacheKafka ]; 118 virtualisation.diskSize = 1024; 119 }; 120 }; 121 122 testScript = '' 123 import json 124 125 for machine in kafka1, kafka2, kafka3, kafka4: 126 machine.wait_for_unit("apache-kafka") 127 128 for machine in kafka1, kafka2, kafka3, kafka4: 129 machine.wait_for_open_port(9092) 130 machine.wait_for_open_port(9093) 131 132 machine.wait_until_succeeds( 133 "journalctl -o cat -u apache-kafka.service | grep 'Transition from STARTING to STARTED'" 134 ) 135 136 machine.wait_until_succeeds( 137 "journalctl -o cat -u apache-kafka.service | grep 'Kafka Server started'" 138 ) 139 140 machine.wait_until_succeeds( 141 "journalctl -o cat -u apache-kafka.service | grep 'BrokerLifecycleManager' | grep 'Incarnation [[:graph:]]\+ of broker [[:digit:]] in cluster ${clusterId}'" 142 ) 143 144 current_voters_json = kafka1.wait_until_succeeds( 145 "kafka-metadata-quorum.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 describe --status | grep CurrentVoters" 146 ).replace("CurrentVoters:", "") 147 148 voters = json.loads(current_voters_json) 149 150 assert len(voters) == 4 151 152 kafka1.wait_until_succeeds( 153 "kafka-topics.sh --bootstrap-server kafka1:9092 --create --topic test-123 --replication-factor 2" 154 ) 155 156 for machine in kafka1, kafka2, kafka3, kafka4: 157 machine.wait_until_succeeds( 158 "journalctl -o cat -u apache-kafka.service | grep -E 'Created log for partition test-123-[[:digit:]] in /var/lib/apache-kafka/test-123-[[:digit:]] with properties'" 159 ) 160 161 kafka1.wait_until_succeeds( 162 "kafka-topics.sh --bootstrap-server=kafka1:9092 --describe --topic test-123 | " 163 + "grep 'PartitionCount: 6'" 164 ) 165 166 # Should never see a replica on both 3 and 4 as they're in the same rack 167 kafka1.fail( 168 "kafka-topics.sh --bootstrap-server=kafka1:9092 --describe --topic test-123 | " 169 + "grep -E 'Replicas: (3,4|4,3)'" 170 ) 171 172 client.succeed( 173 "echo 'test 2' | " 174 + "kafka-console-producer.sh " 175 + "--bootstrap-server kafka1:9092 " 176 + "--topic test-123" 177 ) 178 assert "test 2" in client.succeed( 179 "kafka-console-consumer.sh " 180 + "--bootstrap-server kafka2:9092 --topic test-123 " 181 + "--group readtest " 182 + "--from-beginning --max-messages 1" 183 ) 184 185 client.succeed( 186 "echo 'test 3' | " 187 + "kafka-console-producer.sh " 188 + "--bootstrap-server kafka2:9092 " 189 + "--topic test-123" 190 ) 191 assert "test 3" in client.succeed( 192 "kafka-console-consumer.sh " 193 + "--bootstrap-server kafka3:9092 --topic test-123 " 194 + "--group readtest " 195 + "--max-messages 1" 196 ) 197 ''; 198 } 199)