at master 7.0 kB view raw
1import ../make-test-python.nix ( 2 { lib, pkgs, ... }: 3 4 let 5 inherit (lib) mkMerge; 6 7 # Generate with `kafka-storage.sh random-uuid` 8 clusterAId = "ihzlrasUQ9O3Yy0ZWYkd6w"; 9 10 clusterBId = "Bnu_zrzKRH6-7KcK7t3I5Q"; 11 12 kafkaConfig = { 13 networking.firewall.allowedTCPPorts = [ 14 9092 15 9093 16 ]; 17 18 virtualisation.diskSize = 1024; 19 virtualisation.memorySize = 1024 * 2; 20 21 environment.systemPackages = [ pkgs.apacheKafka ]; 22 23 services.apache-kafka = { 24 enable = true; 25 26 formatLogDirs = true; 27 28 settings = { 29 listeners = [ 30 "PLAINTEXT://:9092" 31 "CONTROLLER://:9093" 32 ]; 33 "listener.security.protocol.map" = [ 34 "PLAINTEXT:PLAINTEXT" 35 "CONTROLLER:PLAINTEXT" 36 ]; 37 "controller.listener.names" = [ "CONTROLLER" ]; 38 39 "process.roles" = [ 40 "broker" 41 "controller" 42 ]; 43 44 "log.dirs" = [ "/var/lib/apache-kafka" ]; 45 "num.partitions" = 1; 46 "offsets.topic.replication.factor" = 1; 47 "transaction.state.log.replication.factor" = 1; 48 "transaction.state.log.min.isr" = 1; 49 }; 50 }; 51 52 systemd.services.apache-kafka = { 53 after = [ "network-online.target" ]; 54 requires = [ "network-online.target" ]; 55 serviceConfig.StateDirectory = "apache-kafka"; 56 }; 57 }; 58 59 extraKafkaConfig = { 60 kafkaa1 = { 61 services.apache-kafka = { 62 clusterId = "${clusterAId}"; 63 64 settings = { 65 "node.id" = 1; 66 "controller.quorum.voters" = [ "1@kafkaa1:9093" ]; 67 }; 68 }; 69 }; 70 71 kafkab1 = { 72 services.apache-kafka = { 73 clusterId = "${clusterBId}"; 74 75 settings = { 76 "node.id" = 1; 77 "controller.quorum.voters" = [ "1@kafkab1:9093" ]; 78 }; 79 }; 80 }; 81 }; 82 83 kafkaNodes = builtins.mapAttrs ( 84 _: val: 85 mkMerge [ 86 val 87 kafkaConfig 88 ] 89 ) extraKafkaConfig; 90 91 mirrorMakerProperties = pkgs.writeText "mm2.properties" '' 92 name = A->B 93 94 clusters = A, B 95 96 A.bootstrap.servers = kafkaa1:9092 97 B.bootstrap.servers = kafkab1:9092 98 99 A->B.enabled = true 100 A->B.topics = .* 101 102 B->A.enabled = false 103 B->A.topics = .* 104 105 replication.factor=1 106 replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy 107 108 tasks.max = 2 109 refresh.topics.enabled = true 110 refresh.topics.interval.seconds = 5 111 sync.topic.configs.enabled = true 112 113 checkpoints.topic.replication.factor=1 114 heartbeats.topic.replication.factor=1 115 offset-syncs.topic.replication.factor=1 116 117 offset.storage.replication.factor=1 118 status.storage.replication.factor=1 119 config.storage.replication.factor=1 120 121 emit.checkpoints.enabled = true 122 emit.checkpoints.interval.seconds = 5 123 ''; 124 in 125 { 126 name = "kafka-mirrormaker"; 127 meta = with pkgs.lib.maintainers; { 128 maintainers = [ jpds ]; 129 }; 130 131 nodes = { 132 inherit (kafkaNodes) kafkaa1 kafkab1; 133 134 mirrormaker = 135 { config, ... }: 136 { 137 virtualisation.diskSize = 1024; 138 virtualisation.memorySize = 1024 * 2; 139 140 # Define a mirrormaker systemd service 141 systemd.services.kafka-connect-mirror-maker = { 142 after = [ "network-online.target" ]; 143 requires = [ "network-online.target" ]; 144 wantedBy = [ "multi-user.target" ]; 145 146 serviceConfig = { 147 ExecStart = '' 148 ${pkgs.apacheKafka}/bin/connect-mirror-maker.sh ${mirrorMakerProperties} 149 ''; 150 Restart = "on-failure"; 151 RestartSec = "5s"; 152 }; 153 }; 154 }; 155 }; 156 157 testScript = '' 158 import json 159 160 for machine in kafkaa1, kafkab1: 161 machine.wait_for_unit("apache-kafka") 162 163 for machine in kafkaa1, kafkab1: 164 machine.wait_for_open_port(9092) 165 machine.wait_for_open_port(9093) 166 167 machine.wait_until_succeeds( 168 "journalctl -o cat -u apache-kafka.service | grep 'Transition from STARTING to STARTED'" 169 ) 170 171 machine.wait_until_succeeds( 172 "journalctl -o cat -u apache-kafka.service | grep 'Kafka Server started'" 173 ) 174 175 for machine in kafkaa1, kafkab1: 176 current_voters_json = machine.wait_until_succeeds( 177 f"kafka-metadata-quorum.sh --bootstrap-server {machine.name}:9092 describe --status | grep CurrentVoters" 178 ).replace("CurrentVoters:", "") 179 180 voters = json.loads(current_voters_json) 181 182 assert len(voters) == 1 183 184 mirrormaker.wait_for_unit("kafka-connect-mirror-maker") 185 186 mirrormaker.wait_until_succeeds( 187 "journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'Kafka MirrorMaker initializing'" 188 ) 189 mirrormaker.wait_until_succeeds( 190 "journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'Targeting clusters \[A, B\]'" 191 ) 192 mirrormaker.wait_until_succeeds( 193 "journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'INFO \[Worker clientId=A->B, groupId=A-mm2\] Finished starting connectors and tasks'" 194 ) 195 196 mirrormaker.wait_until_succeeds( 197 """ 198 journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'INFO \[MirrorSourceConnector\|task-0\] \[Producer clientId=A->B\|A->B-0\|offset-syncs-source-producer\] Cluster ID: ${clusterAId}' 199 """ 200 ) 201 202 kafkaa1.wait_until_succeeds( 203 "journalctl -o cat -u apache-kafka.service | grep 'Stabilized group B-mm2'" 204 ) 205 206 kafkab1.wait_until_succeeds( 207 "journalctl -o cat -u apache-kafka.service | grep 'Stabilized group A-mm2'" 208 ) 209 210 kafkaa1.wait_until_succeeds( 211 "kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-mm-1 --partitions 1 --replication-factor 1" 212 ) 213 214 for machine in kafkaa1, kafkab1: 215 machine.succeed( 216 "kafka-topics.sh --bootstrap-server localhost:9092 --list | grep 'test-mm-1'" 217 ) 218 219 mirrormaker.wait_until_succeeds( 220 "journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'replicating [[:digit:]]\+ topic-partitions A->B: \[test-mm-1-0\]'" 221 ) 222 223 mirrormaker.wait_until_succeeds( 224 "journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'Found [[:digit:]]\+ new topic-partitions on A'" 225 ) 226 227 kafkaa1.wait_until_succeeds( 228 "kafka-verifiable-producer.sh --bootstrap-server kafkaa1:9092 --throughput 10 --max-messages 100 --topic test-mm-1" 229 ) 230 231 mirrormaker.wait_until_succeeds( 232 "journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'Committing offsets for [[:digit:]]\+ acknowledged messages'" 233 ) 234 235 kafkab1.wait_until_succeeds( 236 "kafka-verifiable-consumer.sh --bootstrap-server kafkab1:9092 --topic test-mm-1 --group-id testreplication --max-messages 100" 237 ) 238 ''; 239 } 240)