1import ../make-test-python.nix (
2 { lib, pkgs, ... }:
3
4 let
5 inherit (lib) mkMerge;
6
7 # Generate with `kafka-storage.sh random-uuid`
8 clusterId = "ii5pZE5LRkSeWrnyBhMOYQ";
9
10 kafkaConfig = {
11 networking.firewall.allowedTCPPorts = [
12 9092
13 9093
14 ];
15
16 virtualisation.diskSize = 1024;
17 virtualisation.memorySize = 1024 * 2;
18
19 environment.systemPackages = [ pkgs.apacheKafka ];
20
21 services.apache-kafka = {
22 enable = true;
23
24 clusterId = "${clusterId}";
25
26 formatLogDirs = true;
27
28 settings = {
29 listeners = [
30 "PLAINTEXT://:9092"
31 "CONTROLLER://:9093"
32 ];
33 "listener.security.protocol.map" = [
34 "PLAINTEXT:PLAINTEXT"
35 "CONTROLLER:PLAINTEXT"
36 ];
37 "controller.quorum.voters" = lib.imap1 (i: name: "${toString i}@${name}:9093") (
38 builtins.attrNames kafkaNodes
39 );
40 "controller.listener.names" = [ "CONTROLLER" ];
41
42 "process.roles" = [
43 "broker"
44 "controller"
45 ];
46
47 "log.dirs" = [ "/var/lib/apache-kafka" ];
48 "num.partitions" = 6;
49 "offsets.topic.replication.factor" = 2;
50 "transaction.state.log.replication.factor" = 2;
51 "transaction.state.log.min.isr" = 2;
52 };
53 };
54
55 systemd.services.apache-kafka = {
56 after = [ "network-online.target" ];
57 requires = [ "network-online.target" ];
58 serviceConfig.StateDirectory = "apache-kafka";
59 };
60 };
61
62 extraKafkaConfig = {
63 kafka1 = {
64 services.apache-kafka.settings = {
65 "node.id" = 1;
66 "broker.rack" = 1;
67 };
68 };
69
70 kafka2 = {
71 services.apache-kafka.settings = {
72 "node.id" = 2;
73 "broker.rack" = 2;
74 };
75 };
76
77 kafka3 = {
78 services.apache-kafka.settings = {
79 "node.id" = 3;
80 "broker.rack" = 3;
81 };
82 };
83
84 kafka4 = {
85 services.apache-kafka.settings = {
86 "node.id" = 4;
87 "broker.rack" = 3;
88 };
89 };
90 };
91
92 kafkaNodes = builtins.mapAttrs (
93 _: val:
94 mkMerge [
95 val
96 kafkaConfig
97 ]
98 ) extraKafkaConfig;
99 in
100 {
101 name = "kafka-cluster";
102 meta = with pkgs.lib.maintainers; {
103 maintainers = [ jpds ];
104 };
105
106 nodes = {
107 inherit (kafkaNodes)
108 kafka1
109 kafka2
110 kafka3
111 kafka4
112 ;
113
114 client =
115 { config, ... }:
116 {
117 environment.systemPackages = [ pkgs.apacheKafka ];
118 virtualisation.diskSize = 1024;
119 };
120 };
121
122 testScript = ''
123 import json
124
125 for machine in kafka1, kafka2, kafka3, kafka4:
126 machine.wait_for_unit("apache-kafka")
127
128 for machine in kafka1, kafka2, kafka3, kafka4:
129 machine.wait_for_open_port(9092)
130 machine.wait_for_open_port(9093)
131
132 machine.wait_until_succeeds(
133 "journalctl -o cat -u apache-kafka.service | grep 'Transition from STARTING to STARTED'"
134 )
135
136 machine.wait_until_succeeds(
137 "journalctl -o cat -u apache-kafka.service | grep 'Kafka Server started'"
138 )
139
140 machine.wait_until_succeeds(
141 "journalctl -o cat -u apache-kafka.service | grep 'BrokerLifecycleManager' | grep 'Incarnation [[:graph:]]\+ of broker [[:digit:]] in cluster ${clusterId}'"
142 )
143
144 current_voters_json = kafka1.wait_until_succeeds(
145 "kafka-metadata-quorum.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 describe --status | grep CurrentVoters"
146 ).replace("CurrentVoters:", "")
147
148 voters = json.loads(current_voters_json)
149
150 assert len(voters) == 4
151
152 kafka1.wait_until_succeeds(
153 "kafka-topics.sh --bootstrap-server kafka1:9092 --create --topic test-123 --replication-factor 2"
154 )
155
156 for machine in kafka1, kafka2, kafka3, kafka4:
157 machine.wait_until_succeeds(
158 "journalctl -o cat -u apache-kafka.service | grep -E 'Created log for partition test-123-[[:digit:]] in /var/lib/apache-kafka/test-123-[[:digit:]] with properties'"
159 )
160
161 kafka1.wait_until_succeeds(
162 "kafka-topics.sh --bootstrap-server=kafka1:9092 --describe --topic test-123 | "
163 + "grep 'PartitionCount: 6'"
164 )
165
166 # Should never see a replica on both 3 and 4 as they're in the same rack
167 kafka1.fail(
168 "kafka-topics.sh --bootstrap-server=kafka1:9092 --describe --topic test-123 | "
169 + "grep -E 'Replicas: (3,4|4,3)'"
170 )
171
172 client.succeed(
173 "echo 'test 2' | "
174 + "kafka-console-producer.sh "
175 + "--bootstrap-server kafka1:9092 "
176 + "--topic test-123"
177 )
178 assert "test 2" in client.succeed(
179 "kafka-console-consumer.sh "
180 + "--bootstrap-server kafka2:9092 --topic test-123 "
181 + "--group readtest "
182 + "--from-beginning --max-messages 1"
183 )
184
185 client.succeed(
186 "echo 'test 3' | "
187 + "kafka-console-producer.sh "
188 + "--bootstrap-server kafka2:9092 "
189 + "--topic test-123"
190 )
191 assert "test 3" in client.succeed(
192 "kafka-console-consumer.sh "
193 + "--bootstrap-server kafka3:9092 --topic test-123 "
194 + "--group readtest "
195 + "--max-messages 1"
196 )
197 '';
198 }
199)