1{ pkgs, package, ... }:
2
3let
4 kafkaNamedCollectionConfig = ''
5 <clickhouse>
6 <named_collections>
7 <cluster_1>
8 <!-- ClickHouse Kafka engine parameters -->
9 <kafka_broker_list>kafka:9092</kafka_broker_list>
10 <kafka_topic_list>test_topic</kafka_topic_list>
11 <kafka_group_name>clickhouse</kafka_group_name>
12 <kafka_format>JSONEachRow</kafka_format>
13 <kafka_commit_every_batch>0</kafka_commit_every_batch>
14 <kafka_num_consumers>1</kafka_num_consumers>
15 <kafka_thread_per_consumer>1</kafka_thread_per_consumer>
16
17 <!-- Kafka extended configuration -->
18 <kafka>
19 <debug>all</debug>
20 <auto_offset_reset>earliest</auto_offset_reset>
21 </kafka>
22 </cluster_1>
23 </named_collections>
24 </clickhouse>
25 '';
26
27 kafkaNamedCollection = pkgs.writeText "kafka.xml" kafkaNamedCollectionConfig;
28in
29{
30 name = "clickhouse-kafka";
31 meta.maintainers = with pkgs.lib.maintainers; [
32 jpds
33 thevar1able
34 ];
35
36 nodes = {
37 clickhouse = {
38 environment.etc = {
39 "clickhouse-server/config.d/kafka.xml" = {
40 source = "${kafkaNamedCollection}";
41 };
42 };
43
44 services.clickhouse = {
45 enable = true;
46 inherit package;
47 };
48 virtualisation.memorySize = 4096;
49 };
50
51 kafka = {
52 networking.firewall.allowedTCPPorts = [
53 9092
54 9093
55 ];
56
57 environment.systemPackages = [
58 pkgs.apacheKafka
59 pkgs.jq
60 ];
61
62 services.apache-kafka = {
63 enable = true;
64
65 # Randomly generated uuid. You can get one by running:
66 # kafka-storage.sh random-uuid
67 clusterId = "b81s-MuGSwyt_B9_h37wtQ";
68
69 formatLogDirs = true;
70
71 settings = {
72 listeners = [
73 "PLAINTEXT://:9092"
74 "CONTROLLER://:9093"
75 ];
76 "listener.security.protocol.map" = [
77 "PLAINTEXT:PLAINTEXT"
78 "CONTROLLER:PLAINTEXT"
79 ];
80 "controller.quorum.voters" = [
81 "1@kafka:9093"
82 ];
83 "controller.listener.names" = [ "CONTROLLER" ];
84
85 "node.id" = 1;
86 "broker.rack" = 1;
87
88 "process.roles" = [
89 "broker"
90 "controller"
91 ];
92
93 "log.dirs" = [ "/var/lib/apache-kafka" ];
94 "num.partitions" = 1;
95 "offsets.topic.replication.factor" = 1;
96 "transaction.state.log.replication.factor" = 1;
97 "transaction.state.log.min.isr" = 1;
98 };
99 };
100
101 systemd.services.apache-kafka.serviceConfig.StateDirectory = "apache-kafka";
102
103 virtualisation.memorySize = 1024 * 2;
104 };
105 };
106
107 testScript =
108 let
109 jsonTestMessage = pkgs.writeText "kafka-test-data.json" ''
110 { "id": 1, "first_name": "Fred", "age": 32 }
111 { "id": 2, "first_name": "Barbara", "age": 30 }
112 { "id": 3, "first_name": "Nicola", "age": 12 }
113 '';
114 # work around quote/substitution complexity by Nix, Perl, bash and SQL.
115 tableKafkaDDL = pkgs.writeText "ddl-kafka.sql" ''
116 CREATE TABLE `test_kafka_topic` (
117 `id` UInt32,
118 `first_name` String,
119 `age` UInt32
120 ) ENGINE = Kafka(cluster_1);
121 '';
122
123 tableDDL = pkgs.writeText "ddl.sql" ''
124 CREATE TABLE `test_topic` (
125 `id` UInt32,
126 `first_name` String,
127 `age` UInt32
128 ) ENGINE = MergeTree ORDER BY id;
129 '';
130
131 viewDDL = pkgs.writeText "view.sql" ''
132 CREATE MATERIALIZED VIEW kafka_view TO test_topic AS
133 SELECT
134 id,
135 first_name,
136 age,
137 FROM test_kafka_topic;
138 '';
139 selectQuery = pkgs.writeText "select.sql" "SELECT sum(age) from `test_topic`";
140 in
141 ''
142 kafka.start()
143 kafka.wait_for_unit("apache-kafka")
144 kafka.wait_for_open_port(9092)
145
146 clickhouse.start()
147 clickhouse.wait_for_unit("clickhouse")
148 clickhouse.wait_for_open_port(9000)
149
150 clickhouse.wait_until_succeeds(
151 """
152 journalctl -o cat -u clickhouse.service | grep "Merging configuration file '/etc/clickhouse-server/config.d/kafka.xml'"
153 """
154 )
155
156 clickhouse.succeed(
157 "cat ${tableKafkaDDL} | clickhouse-client"
158 )
159
160 clickhouse.succeed(
161 "cat ${tableDDL} | clickhouse-client"
162 )
163
164 clickhouse.succeed(
165 "cat ${viewDDL} | clickhouse-client"
166 )
167
168 kafka.succeed(
169 "jq -rc . ${jsonTestMessage} | kafka-console-producer.sh --topic test_topic --bootstrap-server kafka:9092"
170 )
171
172 kafka.wait_until_succeeds(
173 "journalctl -o cat -u apache-kafka.service | grep 'Created a new member id ClickHouse-clickhouse-default-test_kafka_topic'"
174 )
175
176 clickhouse.wait_until_succeeds(
177 "cat ${selectQuery} | clickhouse-client | grep 74"
178 )
179 '';
180}