···
1
+
import ../make-test-python.nix (
5
+
inherit (lib) mkMerge;
7
+
# Generate with `kafka-storage.sh random-uuid`
8
+
clusterAId = "ihzlrasUQ9O3Yy0ZWYkd6w";
10
+
clusterBId = "Bnu_zrzKRH6-7KcK7t3I5Q";
13
+
networking.firewall.allowedTCPPorts = [
18
+
virtualisation.diskSize = 1024;
19
+
virtualisation.memorySize = 1024 * 2;
21
+
environment.systemPackages = [ pkgs.apacheKafka ];
23
+
services.apache-kafka = {
26
+
formatLogDirs = true;
31
+
"CONTROLLER://:9093"
33
+
"listener.security.protocol.map" = [
34
+
"PLAINTEXT:PLAINTEXT"
35
+
"CONTROLLER:PLAINTEXT"
37
+
"controller.listener.names" = [ "CONTROLLER" ];
44
+
"log.dirs" = [ "/var/lib/apache-kafka" ];
45
+
"num.partitions" = 1;
46
+
"offsets.topic.replication.factor" = 1;
47
+
"transaction.state.log.replication.factor" = 1;
48
+
"transaction.state.log.min.isr" = 1;
52
+
systemd.services.apache-kafka = {
53
+
after = [ "network-online.target" ];
54
+
requires = [ "network-online.target" ];
55
+
serviceConfig.StateDirectory = "apache-kafka";
59
+
extraKafkaConfig = {
61
+
services.apache-kafka = {
62
+
clusterId = "${clusterAId}";
66
+
"controller.quorum.voters" = [ "1@kafkaa1:9093" ];
72
+
services.apache-kafka = {
73
+
clusterId = "${clusterBId}";
77
+
"controller.quorum.voters" = [ "1@kafkab1:9093" ];
83
+
kafkaNodes = builtins.mapAttrs (
91
+
mirrorMakerProperties = pkgs.writeText "mm2.properties" ''
96
+
A.bootstrap.servers = kafkaa1:9092
97
+
B.bootstrap.servers = kafkab1:9092
102
+
B->A.enabled = false
105
+
replication.factor=1
106
+
replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
109
+
refresh.topics.enabled = true
110
+
refresh.topics.interval.seconds = 5
111
+
sync.topic.configs.enabled = true
113
+
checkpoints.topic.replication.factor=1
114
+
heartbeats.topic.replication.factor=1
115
+
offset-syncs.topic.replication.factor=1
117
+
offset.storage.replication.factor=1
118
+
status.storage.replication.factor=1
119
+
config.storage.replication.factor=1
121
+
emit.checkpoints.enabled = true
122
+
emit.checkpoints.interval.seconds = 5
126
+
name = "kafka-mirrormaker";
127
+
meta = with pkgs.lib.maintainers; {
128
+
maintainers = [ jpds ];
132
+
inherit (kafkaNodes) kafkaa1 kafkab1;
137
+
virtualisation.diskSize = 1024;
138
+
virtualisation.memorySize = 1024 * 2;
140
+
# Define a mirrormaker systemd service
141
+
systemd.services.kafka-connect-mirror-maker = {
142
+
after = [ "network-online.target" ];
143
+
requires = [ "network-online.target" ];
144
+
wantedBy = [ "multi-user.target" ];
148
+
${pkgs.apacheKafka}/bin/connect-mirror-maker.sh ${mirrorMakerProperties}
150
+
Restart = "on-failure";
160
+
for machine in kafkaa1, kafkab1:
161
+
machine.wait_for_unit("apache-kafka")
163
+
for machine in kafkaa1, kafkab1:
164
+
machine.wait_for_open_port(9092)
165
+
machine.wait_for_open_port(9093)
167
+
machine.wait_until_succeeds(
168
+
"journalctl -o cat -u apache-kafka.service | grep 'Transition from STARTING to STARTED'"
171
+
machine.wait_until_succeeds(
172
+
"journalctl -o cat -u apache-kafka.service | grep 'Kafka Server started'"
175
+
for machine in kafkaa1, kafkab1:
176
+
current_voters_json = machine.wait_until_succeeds(
177
+
f"kafka-metadata-quorum.sh --bootstrap-server {machine.name}:9092 describe --status | grep CurrentVoters"
178
+
).replace("CurrentVoters:", "")
180
+
voters = json.loads(current_voters_json)
182
+
assert len(voters) == 1
184
+
mirrormaker.wait_for_unit("kafka-connect-mirror-maker")
186
+
mirrormaker.wait_until_succeeds(
187
+
"journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'Kafka MirrorMaker initializing'"
189
+
mirrormaker.wait_until_succeeds(
190
+
"journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'Targeting clusters \[A, B\]'"
192
+
mirrormaker.wait_until_succeeds(
193
+
"journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'INFO \[Worker clientId=A->B, groupId=A-mm2\] Finished starting connectors and tasks'"
196
+
mirrormaker.wait_until_succeeds(
198
+
journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'INFO \[MirrorSourceConnector\|task-0\] \[Producer clientId=A->B\|A->B-0\|offset-syncs-source-producer\] Cluster ID: ${clusterAId}'
202
+
kafkaa1.wait_until_succeeds(
203
+
"journalctl -o cat -u apache-kafka.service | grep 'Stabilized group B-mm2'"
206
+
kafkab1.wait_until_succeeds(
207
+
"journalctl -o cat -u apache-kafka.service | grep 'Stabilized group A-mm2'"
210
+
kafkaa1.wait_until_succeeds(
211
+
"kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-mm-1 --partitions 1 --replication-factor 1"
214
+
for machine in kafkaa1, kafkab1:
216
+
"kafka-topics.sh --bootstrap-server localhost:9092 --list | grep 'test-mm-1'"
219
+
mirrormaker.wait_until_succeeds(
220
+
"journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'replicating [[:digit:]]\+ topic-partitions A->B: \[test-mm-1-0\]'"
223
+
mirrormaker.wait_until_succeeds(
224
+
"journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'Found [[:digit:]]\+ new topic-partitions on A'"
227
+
kafkaa1.wait_until_succeeds(
228
+
"kafka-verifiable-producer.sh --bootstrap-server kafkaa1:9092 --throughput 10 --max-messages 100 --topic test-mm-1"
231
+
mirrormaker.wait_until_succeeds(
232
+
"journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'Committing offsets for [[:digit:]]\+ acknowledged messages'"
235
+
kafkab1.wait_until_succeeds(
236
+
"kafka-verifiable-consumer.sh --bootstrap-server kafkab1:9092 --topic test-mm-1 --group-id testreplication --max-messages 100"