1{ pkgs, ... }:
2
3with pkgs.lib;
4
5let
6 makeKafkaTest =
7 name:
8 {
9 kafkaPackage,
10 mode ? "kraft",
11 }:
12 (import ../make-test-python.nix ({
13 inherit name;
14 meta = with pkgs.lib.maintainers; {
15 maintainers = [ nequissimus ];
16 };
17
18 nodes = {
19 kafka =
20 { ... }:
21 {
22 services.apache-kafka = mkMerge [
23 ({
24 enable = true;
25 package = kafkaPackage;
26 settings = {
27 "offsets.topic.replication.factor" = 1;
28 "log.dirs" = [
29 "/var/lib/kafka/logdir1"
30 "/var/lib/kafka/logdir2"
31 ];
32 };
33 })
34 (mkIf (mode == "zookeeper") {
35 settings = {
36 "zookeeper.session.timeout.ms" = 600000;
37 "zookeeper.connect" = [ "zookeeper1:2181" ];
38 };
39 })
40 (mkIf (mode == "kraft") {
41 clusterId = "ak2fIHr4S8WWarOF_ODD0g";
42 formatLogDirs = true;
43 settings = {
44 "node.id" = 1;
45 "process.roles" = [
46 "broker"
47 "controller"
48 ];
49 "listeners" = [
50 "PLAINTEXT://:9092"
51 "CONTROLLER://:9093"
52 ];
53 "listener.security.protocol.map" = [
54 "PLAINTEXT:PLAINTEXT"
55 "CONTROLLER:PLAINTEXT"
56 ];
57 "controller.quorum.voters" = [
58 "1@kafka:9093"
59 ];
60 "controller.listener.names" = [ "CONTROLLER" ];
61 };
62 })
63 ];
64
65 networking.firewall.allowedTCPPorts = [
66 9092
67 9093
68 ];
69 virtualisation.diskSize = 1024;
70 # i686 tests: qemu-system-i386 can simulate max 2047MB RAM (not 2048)
71 virtualisation.memorySize = 2047;
72 };
73 }
74 // optionalAttrs (mode == "zookeeper") {
75 zookeeper1 =
76 { ... }:
77 {
78 services.zookeeper = {
79 enable = true;
80 };
81
82 networking.firewall.allowedTCPPorts = [ 2181 ];
83 virtualisation.diskSize = 1024;
84 };
85 };
86
87 testScript = ''
88 start_all()
89
90 ${optionalString (mode == "zookeeper") ''
91 zookeeper1.wait_for_unit("default.target")
92 zookeeper1.wait_for_unit("zookeeper.service")
93 zookeeper1.wait_for_open_port(2181)
94 ''}
95
96 kafka.wait_for_unit("default.target")
97 kafka.wait_for_unit("apache-kafka.service")
98 kafka.wait_for_open_port(9092)
99
100 kafka.wait_until_succeeds(
101 "${kafkaPackage}/bin/kafka-topics.sh --create "
102 + "--bootstrap-server localhost:9092 --partitions 1 "
103 + "--replication-factor 1 --topic testtopic"
104 )
105 kafka.succeed(
106 "echo 'test 1' | "
107 + "${kafkaPackage}/bin/kafka-console-producer.sh "
108 + "--bootstrap-server localhost:9092 --topic testtopic"
109 )
110 assert "test 1" in kafka.succeed(
111 "${kafkaPackage}/bin/kafka-console-consumer.sh "
112 + "--bootstrap-server localhost:9092 --topic testtopic "
113 + "--from-beginning --max-messages 1"
114 )
115 '';
116 }));
117
118in
119with pkgs;
120{
121 kafka_3_7 = makeKafkaTest "kafka_3_7" {
122 kafkaPackage = apacheKafka_3_7;
123 mode = "zookeeper";
124 };
125 kafka_3_8 = makeKafkaTest "kafka_3_8" {
126 kafkaPackage = apacheKafka_3_8;
127 mode = "zookeeper";
128 };
129 kafka_3_9 = makeKafkaTest "kafka_3_9" {
130 kafkaPackage = apacheKafka_3_9;
131 mode = "zookeeper";
132 };
133 kafka_4_0 = makeKafkaTest "kafka_4_0" { kafkaPackage = apacheKafka_4_0; };
134 kafka_4_1 = makeKafkaTest "kafka_4_1" { kafkaPackage = apacheKafka_4_1; };
135 kafka = makeKafkaTest "kafka" { kafkaPackage = apacheKafka; };
136}