1{ pkgs, ... }:
2
3with pkgs.lib;
4
5let
6 makeKafkaTest =
7 name:
8 {
9 kafkaPackage,
10 mode ? "kraft",
11 }:
12 (import ../make-test-python.nix ({
13 inherit name;
14 meta = with pkgs.lib.maintainers; {
15 maintainers = [ nequissimus ];
16 };
17
18 nodes =
19 {
20 kafka =
21 { ... }:
22 {
23 services.apache-kafka = mkMerge [
24 ({
25 enable = true;
26 package = kafkaPackage;
27 settings = {
28 "offsets.topic.replication.factor" = 1;
29 "log.dirs" = [
30 "/var/lib/kafka/logdir1"
31 "/var/lib/kafka/logdir2"
32 ];
33 };
34 })
35 (mkIf (mode == "zookeeper") {
36 settings = {
37 "zookeeper.session.timeout.ms" = 600000;
38 "zookeeper.connect" = [ "zookeeper1:2181" ];
39 };
40 })
41 (mkIf (mode == "kraft") {
42 clusterId = "ak2fIHr4S8WWarOF_ODD0g";
43 formatLogDirs = true;
44 settings = {
45 "node.id" = 1;
46 "process.roles" = [
47 "broker"
48 "controller"
49 ];
50 "listeners" = [
51 "PLAINTEXT://:9092"
52 "CONTROLLER://:9093"
53 ];
54 "listener.security.protocol.map" = [
55 "PLAINTEXT:PLAINTEXT"
56 "CONTROLLER:PLAINTEXT"
57 ];
58 "controller.quorum.voters" = [
59 "1@kafka:9093"
60 ];
61 "controller.listener.names" = [ "CONTROLLER" ];
62 };
63 })
64 ];
65
66 networking.firewall.allowedTCPPorts = [
67 9092
68 9093
69 ];
70 virtualisation.diskSize = 1024;
71 # i686 tests: qemu-system-i386 can simulate max 2047MB RAM (not 2048)
72 virtualisation.memorySize = 2047;
73 };
74 }
75 // optionalAttrs (mode == "zookeeper") {
76 zookeeper1 =
77 { ... }:
78 {
79 services.zookeeper = {
80 enable = true;
81 };
82
83 networking.firewall.allowedTCPPorts = [ 2181 ];
84 virtualisation.diskSize = 1024;
85 };
86 };
87
88 testScript = ''
89 start_all()
90
91 ${optionalString (mode == "zookeeper") ''
92 zookeeper1.wait_for_unit("default.target")
93 zookeeper1.wait_for_unit("zookeeper.service")
94 zookeeper1.wait_for_open_port(2181)
95 ''}
96
97 kafka.wait_for_unit("default.target")
98 kafka.wait_for_unit("apache-kafka.service")
99 kafka.wait_for_open_port(9092)
100
101 kafka.wait_until_succeeds(
102 "${kafkaPackage}/bin/kafka-topics.sh --create "
103 + "--bootstrap-server localhost:9092 --partitions 1 "
104 + "--replication-factor 1 --topic testtopic"
105 )
106 kafka.succeed(
107 "echo 'test 1' | "
108 + "${kafkaPackage}/bin/kafka-console-producer.sh "
109 + "--bootstrap-server localhost:9092 --topic testtopic"
110 )
111 assert "test 1" in kafka.succeed(
112 "${kafkaPackage}/bin/kafka-console-consumer.sh "
113 + "--bootstrap-server localhost:9092 --topic testtopic "
114 + "--from-beginning --max-messages 1"
115 )
116 '';
117 }));
118
119in
120with pkgs;
121{
122 kafka_3_7 = makeKafkaTest "kafka_3_7" {
123 kafkaPackage = apacheKafka_3_7;
124 mode = "zookeeper";
125 };
126 kafka_3_8 = makeKafkaTest "kafka_3_8" {
127 kafkaPackage = apacheKafka_3_8;
128 mode = "zookeeper";
129 };
130 kafka_3_9 = makeKafkaTest "kafka_3_9" {
131 kafkaPackage = apacheKafka_3_9;
132 mode = "zookeeper";
133 };
134 kafka_4_0 = makeKafkaTest "kafka_4_0" { kafkaPackage = apacheKafka_4_0; };
135 kafka = makeKafkaTest "kafka" { kafkaPackage = apacheKafka; };
136}