1import ../make-test-python.nix (
2 { lib, pkgs, ... }:
3
4 let
5 inherit (lib) mkMerge;
6
7 # Generate with `kafka-storage.sh random-uuid`
8 clusterAId = "ihzlrasUQ9O3Yy0ZWYkd6w";
9
10 clusterBId = "Bnu_zrzKRH6-7KcK7t3I5Q";
11
12 kafkaConfig = {
13 networking.firewall.allowedTCPPorts = [
14 9092
15 9093
16 ];
17
18 virtualisation.diskSize = 1024;
19 virtualisation.memorySize = 1024 * 2;
20
21 environment.systemPackages = [ pkgs.apacheKafka ];
22
23 services.apache-kafka = {
24 enable = true;
25
26 formatLogDirs = true;
27
28 settings = {
29 listeners = [
30 "PLAINTEXT://:9092"
31 "CONTROLLER://:9093"
32 ];
33 "listener.security.protocol.map" = [
34 "PLAINTEXT:PLAINTEXT"
35 "CONTROLLER:PLAINTEXT"
36 ];
37 "controller.listener.names" = [ "CONTROLLER" ];
38
39 "process.roles" = [
40 "broker"
41 "controller"
42 ];
43
44 "log.dirs" = [ "/var/lib/apache-kafka" ];
45 "num.partitions" = 1;
46 "offsets.topic.replication.factor" = 1;
47 "transaction.state.log.replication.factor" = 1;
48 "transaction.state.log.min.isr" = 1;
49 };
50 };
51
52 systemd.services.apache-kafka = {
53 after = [ "network-online.target" ];
54 requires = [ "network-online.target" ];
55 serviceConfig.StateDirectory = "apache-kafka";
56 };
57 };
58
59 extraKafkaConfig = {
60 kafkaa1 = {
61 services.apache-kafka = {
62 clusterId = "${clusterAId}";
63
64 settings = {
65 "node.id" = 1;
66 "controller.quorum.voters" = [ "1@kafkaa1:9093" ];
67 };
68 };
69 };
70
71 kafkab1 = {
72 services.apache-kafka = {
73 clusterId = "${clusterBId}";
74
75 settings = {
76 "node.id" = 1;
77 "controller.quorum.voters" = [ "1@kafkab1:9093" ];
78 };
79 };
80 };
81 };
82
83 kafkaNodes = builtins.mapAttrs (
84 _: val:
85 mkMerge [
86 val
87 kafkaConfig
88 ]
89 ) extraKafkaConfig;
90
91 mirrorMakerProperties = pkgs.writeText "mm2.properties" ''
92 name = A->B
93
94 clusters = A, B
95
96 A.bootstrap.servers = kafkaa1:9092
97 B.bootstrap.servers = kafkab1:9092
98
99 A->B.enabled = true
100 A->B.topics = .*
101
102 B->A.enabled = false
103 B->A.topics = .*
104
105 replication.factor=1
106 replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
107
108 tasks.max = 2
109 refresh.topics.enabled = true
110 refresh.topics.interval.seconds = 5
111 sync.topic.configs.enabled = true
112
113 checkpoints.topic.replication.factor=1
114 heartbeats.topic.replication.factor=1
115 offset-syncs.topic.replication.factor=1
116
117 offset.storage.replication.factor=1
118 status.storage.replication.factor=1
119 config.storage.replication.factor=1
120
121 emit.checkpoints.enabled = true
122 emit.checkpoints.interval.seconds = 5
123 '';
124 in
125 {
126 name = "kafka-mirrormaker";
127 meta = with pkgs.lib.maintainers; {
128 maintainers = [ jpds ];
129 };
130
131 nodes = {
132 inherit (kafkaNodes) kafkaa1 kafkab1;
133
134 mirrormaker =
135 { config, ... }:
136 {
137 virtualisation.diskSize = 1024;
138 virtualisation.memorySize = 1024 * 2;
139
140 # Define a mirrormaker systemd service
141 systemd.services.kafka-connect-mirror-maker = {
142 after = [ "network-online.target" ];
143 requires = [ "network-online.target" ];
144 wantedBy = [ "multi-user.target" ];
145
146 serviceConfig = {
147 ExecStart = ''
148 ${pkgs.apacheKafka}/bin/connect-mirror-maker.sh ${mirrorMakerProperties}
149 '';
150 Restart = "on-failure";
151 RestartSec = "5s";
152 };
153 };
154 };
155 };
156
157 testScript = ''
158 import json
159
160 for machine in kafkaa1, kafkab1:
161 machine.wait_for_unit("apache-kafka")
162
163 for machine in kafkaa1, kafkab1:
164 machine.wait_for_open_port(9092)
165 machine.wait_for_open_port(9093)
166
167 machine.wait_until_succeeds(
168 "journalctl -o cat -u apache-kafka.service | grep 'Transition from STARTING to STARTED'"
169 )
170
171 machine.wait_until_succeeds(
172 "journalctl -o cat -u apache-kafka.service | grep 'Kafka Server started'"
173 )
174
175 for machine in kafkaa1, kafkab1:
176 current_voters_json = machine.wait_until_succeeds(
177 f"kafka-metadata-quorum.sh --bootstrap-server {machine.name}:9092 describe --status | grep CurrentVoters"
178 ).replace("CurrentVoters:", "")
179
180 voters = json.loads(current_voters_json)
181
182 assert len(voters) == 1
183
184 mirrormaker.wait_for_unit("kafka-connect-mirror-maker")
185
186 mirrormaker.wait_until_succeeds(
187 "journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'Kafka MirrorMaker initializing'"
188 )
189 mirrormaker.wait_until_succeeds(
190 "journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'Targeting clusters \[A, B\]'"
191 )
192 mirrormaker.wait_until_succeeds(
193 "journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'INFO \[Worker clientId=A->B, groupId=A-mm2\] Finished starting connectors and tasks'"
194 )
195
196 mirrormaker.wait_until_succeeds(
197 """
198 journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'INFO \[MirrorSourceConnector\|task-0\] \[Producer clientId=A->B\|A->B-0\|offset-syncs-source-producer\] Cluster ID: ${clusterAId}'
199 """
200 )
201
202 kafkaa1.wait_until_succeeds(
203 "journalctl -o cat -u apache-kafka.service | grep 'Stabilized group B-mm2'"
204 )
205
206 kafkab1.wait_until_succeeds(
207 "journalctl -o cat -u apache-kafka.service | grep 'Stabilized group A-mm2'"
208 )
209
210 kafkaa1.wait_until_succeeds(
211 "kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-mm-1 --partitions 1 --replication-factor 1"
212 )
213
214 for machine in kafkaa1, kafkab1:
215 machine.succeed(
216 "kafka-topics.sh --bootstrap-server localhost:9092 --list | grep 'test-mm-1'"
217 )
218
219 mirrormaker.wait_until_succeeds(
220 "journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'replicating [[:digit:]]\+ topic-partitions A->B: \[test-mm-1-0\]'"
221 )
222
223 mirrormaker.wait_until_succeeds(
224 "journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'Found [[:digit:]]\+ new topic-partitions on A'"
225 )
226
227 kafkaa1.wait_until_succeeds(
228 "kafka-verifiable-producer.sh --bootstrap-server kafkaa1:9092 --throughput 10 --max-messages 100 --topic test-mm-1"
229 )
230
231 mirrormaker.wait_until_succeeds(
232 "journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'Committing offsets for [[:digit:]]\+ acknowledged messages'"
233 )
234
235 kafkab1.wait_until_succeeds(
236 "kafka-verifiable-consumer.sh --bootstrap-server kafkab1:9092 --topic test-mm-1 --group-id testreplication --max-messages 100"
237 )
238 '';
239 }
240)