at master 4.9 kB view raw
1{ pkgs, package, ... }: 2 3let 4 kafkaNamedCollectionConfig = '' 5 <clickhouse> 6 <named_collections> 7 <cluster_1> 8 <!-- ClickHouse Kafka engine parameters --> 9 <kafka_broker_list>kafka:9092</kafka_broker_list> 10 <kafka_topic_list>test_topic</kafka_topic_list> 11 <kafka_group_name>clickhouse</kafka_group_name> 12 <kafka_format>JSONEachRow</kafka_format> 13 <kafka_commit_every_batch>0</kafka_commit_every_batch> 14 <kafka_num_consumers>1</kafka_num_consumers> 15 <kafka_thread_per_consumer>1</kafka_thread_per_consumer> 16 17 <!-- Kafka extended configuration --> 18 <kafka> 19 <debug>all</debug> 20 <auto_offset_reset>earliest</auto_offset_reset> 21 </kafka> 22 </cluster_1> 23 </named_collections> 24 </clickhouse> 25 ''; 26 27 kafkaNamedCollection = pkgs.writeText "kafka.xml" kafkaNamedCollectionConfig; 28in 29{ 30 name = "clickhouse-kafka"; 31 meta.maintainers = with pkgs.lib.maintainers; [ 32 jpds 33 thevar1able 34 ]; 35 36 nodes = { 37 clickhouse = { 38 environment.etc = { 39 "clickhouse-server/config.d/kafka.xml" = { 40 source = "${kafkaNamedCollection}"; 41 }; 42 }; 43 44 services.clickhouse = { 45 enable = true; 46 inherit package; 47 }; 48 virtualisation.memorySize = 4096; 49 }; 50 51 kafka = { 52 networking.firewall.allowedTCPPorts = [ 53 9092 54 9093 55 ]; 56 57 environment.systemPackages = [ 58 pkgs.apacheKafka 59 pkgs.jq 60 ]; 61 62 services.apache-kafka = { 63 enable = true; 64 65 # Randomly generated uuid. You can get one by running: 66 # kafka-storage.sh random-uuid 67 clusterId = "b81s-MuGSwyt_B9_h37wtQ"; 68 69 formatLogDirs = true; 70 71 settings = { 72 listeners = [ 73 "PLAINTEXT://:9092" 74 "CONTROLLER://:9093" 75 ]; 76 "listener.security.protocol.map" = [ 77 "PLAINTEXT:PLAINTEXT" 78 "CONTROLLER:PLAINTEXT" 79 ]; 80 "controller.quorum.voters" = [ 81 "1@kafka:9093" 82 ]; 83 "controller.listener.names" = [ "CONTROLLER" ]; 84 85 "node.id" = 1; 86 "broker.rack" = 1; 87 88 "process.roles" = [ 89 "broker" 90 "controller" 91 ]; 92 93 "log.dirs" = [ "/var/lib/apache-kafka" ]; 94 "num.partitions" = 1; 95 "offsets.topic.replication.factor" = 1; 96 "transaction.state.log.replication.factor" = 1; 97 "transaction.state.log.min.isr" = 1; 98 }; 99 }; 100 101 systemd.services.apache-kafka.serviceConfig.StateDirectory = "apache-kafka"; 102 103 virtualisation.memorySize = 1024 * 2; 104 }; 105 }; 106 107 testScript = 108 let 109 jsonTestMessage = pkgs.writeText "kafka-test-data.json" '' 110 { "id": 1, "first_name": "Fred", "age": 32 } 111 { "id": 2, "first_name": "Barbara", "age": 30 } 112 { "id": 3, "first_name": "Nicola", "age": 12 } 113 ''; 114 # work around quote/substitution complexity by Nix, Perl, bash and SQL. 115 tableKafkaDDL = pkgs.writeText "ddl-kafka.sql" '' 116 CREATE TABLE `test_kafka_topic` ( 117 `id` UInt32, 118 `first_name` String, 119 `age` UInt32 120 ) ENGINE = Kafka(cluster_1); 121 ''; 122 123 tableDDL = pkgs.writeText "ddl.sql" '' 124 CREATE TABLE `test_topic` ( 125 `id` UInt32, 126 `first_name` String, 127 `age` UInt32 128 ) ENGINE = MergeTree ORDER BY id; 129 ''; 130 131 viewDDL = pkgs.writeText "view.sql" '' 132 CREATE MATERIALIZED VIEW kafka_view TO test_topic AS 133 SELECT 134 id, 135 first_name, 136 age, 137 FROM test_kafka_topic; 138 ''; 139 selectQuery = pkgs.writeText "select.sql" "SELECT sum(age) from `test_topic`"; 140 in 141 '' 142 kafka.start() 143 kafka.wait_for_unit("apache-kafka") 144 kafka.wait_for_open_port(9092) 145 146 clickhouse.start() 147 clickhouse.wait_for_unit("clickhouse") 148 clickhouse.wait_for_open_port(9000) 149 150 clickhouse.wait_until_succeeds( 151 """ 152 journalctl -o cat -u clickhouse.service | grep "Merging configuration file '/etc/clickhouse-server/config.d/kafka.xml'" 153 """ 154 ) 155 156 clickhouse.succeed( 157 "cat ${tableKafkaDDL} | clickhouse-client" 158 ) 159 160 clickhouse.succeed( 161 "cat ${tableDDL} | clickhouse-client" 162 ) 163 164 clickhouse.succeed( 165 "cat ${viewDDL} | clickhouse-client" 166 ) 167 168 kafka.succeed( 169 "jq -rc . ${jsonTestMessage} | kafka-console-producer.sh --topic test_topic --bootstrap-server kafka:9092" 170 ) 171 172 kafka.wait_until_succeeds( 173 "journalctl -o cat -u apache-kafka.service | grep 'Created a new member id ClickHouse-clickhouse-default-test_kafka_topic'" 174 ) 175 176 clickhouse.wait_until_succeeds( 177 "cat ${selectQuery} | clickhouse-client | grep 74" 178 ) 179 ''; 180}