nixos/clickhouse: Added Kafka engine/Keeper/S3 testcase (#359653)

Changed files
+486 -2
nixos
+1 -1
nixos/tests/all-tests.nix
···
cinnamon-wayland = runTest ./cinnamon-wayland.nix;
cjdns = runTest ./cjdns.nix;
clatd = runTest ./clatd.nix;
-
clickhouse = runTest ./clickhouse.nix;
cloud-init = handleTest ./cloud-init.nix { };
cloud-init-hostname = handleTest ./cloud-init-hostname.nix { };
cloudlog = runTest ./cloudlog.nix;
···
cinnamon-wayland = runTest ./cinnamon-wayland.nix;
cjdns = runTest ./cjdns.nix;
clatd = runTest ./clatd.nix;
+
clickhouse = import ./clickhouse { inherit runTest; };
cloud-init = handleTest ./cloud-init.nix { };
cloud-init-hostname = handleTest ./cloud-init-hostname.nix { };
cloudlog = runTest ./cloudlog.nix;
+1 -1
nixos/tests/clickhouse.nix nixos/tests/clickhouse/base.nix
···
{ pkgs, ... }:
{
name = "clickhouse";
-
meta.maintainers = with pkgs.lib.maintainers; [ ];
nodes.machine = {
services.clickhouse.enable = true;
···
{ pkgs, ... }:
{
name = "clickhouse";
+
meta.maintainers = with pkgs.lib.maintainers; [ jpds ];
nodes.machine = {
services.clickhouse.enable = true;
+8
nixos/tests/clickhouse/default.nix
···
···
+
{ runTest }:
+
+
{
+
base = runTest ./base.nix;
+
kafka = runTest ./kafka.nix;
+
keeper = runTest ./keeper.nix;
+
s3 = runTest ./s3.nix;
+
}
+172
nixos/tests/clickhouse/kafka.nix
···
···
+
{ pkgs, ... }:
+
+
let
+
kafkaNamedCollectionConfig = ''
+
<clickhouse>
+
<named_collections>
+
<cluster_1>
+
<!-- ClickHouse Kafka engine parameters -->
+
<kafka_broker_list>kafka:9092</kafka_broker_list>
+
<kafka_topic_list>test_topic</kafka_topic_list>
+
<kafka_group_name>clickhouse</kafka_group_name>
+
<kafka_format>JSONEachRow</kafka_format>
+
<kafka_commit_every_batch>0</kafka_commit_every_batch>
+
<kafka_num_consumers>1</kafka_num_consumers>
+
<kafka_thread_per_consumer>1</kafka_thread_per_consumer>
+
+
<!-- Kafka extended configuration -->
+
<kafka>
+
<debug>all</debug>
+
<auto_offset_reset>earliest</auto_offset_reset>
+
</kafka>
+
</cluster_1>
+
</named_collections>
+
</clickhouse>
+
'';
+
+
kafkaNamedCollection = pkgs.writeText "kafka.xml" kafkaNamedCollectionConfig;
+
in
+
{
+
name = "clickhouse-kafka";
+
meta.maintainers = with pkgs.lib.maintainers; [ jpds ];
+
+
nodes = {
+
clickhouse = {
+
environment.etc = {
+
"clickhouse-server/config.d/kafka.xml" = {
+
source = "${kafkaNamedCollection}";
+
};
+
};
+
+
services.clickhouse.enable = true;
+
virtualisation.memorySize = 4096;
+
};
+
+
kafka = {
+
networking.firewall.allowedTCPPorts = [
+
9092
+
9093
+
];
+
+
environment.systemPackages = [
+
pkgs.apacheKafka
+
pkgs.jq
+
];
+
+
services.apache-kafka = {
+
enable = true;
+
+
# Randomly generated uuid. You can get one by running:
+
# kafka-storage.sh random-uuid
+
clusterId = "b81s-MuGSwyt_B9_h37wtQ";
+
+
formatLogDirs = true;
+
+
settings = {
+
listeners = [
+
"PLAINTEXT://:9092"
+
"CONTROLLER://:9093"
+
];
+
"listener.security.protocol.map" = [
+
"PLAINTEXT:PLAINTEXT"
+
"CONTROLLER:PLAINTEXT"
+
];
+
"controller.quorum.voters" = [
+
"1@kafka:9093"
+
];
+
"controller.listener.names" = [ "CONTROLLER" ];
+
+
"node.id" = 1;
+
"broker.rack" = 1;
+
+
"process.roles" = [
+
"broker"
+
"controller"
+
];
+
+
"log.dirs" = [ "/var/lib/apache-kafka" ];
+
"num.partitions" = 1;
+
"offsets.topic.replication.factor" = 1;
+
"transaction.state.log.replication.factor" = 1;
+
"transaction.state.log.min.isr" = 1;
+
};
+
};
+
+
systemd.services.apache-kafka.serviceConfig.StateDirectory = "apache-kafka";
+
};
+
};
+
+
testScript =
+
let
+
jsonTestMessage = pkgs.writeText "kafka-test-data.json" ''
+
{ "id": 1, "first_name": "Fred", "age": 32 }
+
{ "id": 2, "first_name": "Barbara", "age": 30 }
+
{ "id": 3, "first_name": "Nicola", "age": 12 }
+
'';
+
# work around quote/substitution complexity by Nix, Perl, bash and SQL.
+
tableKafkaDDL = pkgs.writeText "ddl-kafka.sql" ''
+
CREATE TABLE `test_kafka_topic` (
+
`id` UInt32,
+
`first_name` String,
+
`age` UInt32
+
) ENGINE = Kafka(cluster_1);
+
'';
+
+
tableDDL = pkgs.writeText "ddl.sql" ''
+
CREATE TABLE `test_topic` (
+
`id` UInt32,
+
`first_name` String,
+
`age` UInt32
+
) ENGINE = MergeTree ORDER BY id;
+
'';
+
+
viewDDL = pkgs.writeText "view.sql" ''
+
CREATE MATERIALIZED VIEW kafka_view TO test_topic AS
+
SELECT
+
id,
+
first_name,
+
age,
+
FROM test_kafka_topic;
+
'';
+
selectQuery = pkgs.writeText "select.sql" "SELECT sum(age) from `test_topic`";
+
in
+
''
+
kafka.start()
+
kafka.wait_for_unit("apache-kafka")
+
kafka.wait_for_open_port(9092)
+
+
clickhouse.start()
+
clickhouse.wait_for_unit("clickhouse")
+
clickhouse.wait_for_open_port(9000)
+
+
clickhouse.wait_until_succeeds(
+
"""
+
journalctl -o cat -u clickhouse.service | grep "Merging configuration file '/etc/clickhouse-server/config.d/kafka.xml'"
+
"""
+
)
+
+
clickhouse.succeed(
+
"cat ${tableKafkaDDL} | clickhouse-client"
+
)
+
+
clickhouse.succeed(
+
"cat ${tableDDL} | clickhouse-client"
+
)
+
+
clickhouse.succeed(
+
"cat ${viewDDL} | clickhouse-client"
+
)
+
+
kafka.succeed(
+
"jq -rc . ${jsonTestMessage} | kafka-console-producer.sh --topic test_topic --bootstrap-server kafka:9092"
+
)
+
+
kafka.wait_until_succeeds(
+
"journalctl -o cat -u apache-kafka.service | grep 'Created a new member id ClickHouse-clickhouse-default-test_kafka_topic'"
+
)
+
+
clickhouse.wait_until_succeeds(
+
"cat ${selectQuery} | clickhouse-client | grep 74"
+
)
+
'';
+
}
+183
nixos/tests/clickhouse/keeper.nix
···
···
+
{ lib, pkgs, ... }:
+
rec {
+
name = "clickhouse-keeper";
+
meta.maintainers = with pkgs.lib.maintainers; [ jpds ];
+
+
nodes =
+
let
+
node = i: {
+
+
environment.etc = {
+
"clickhouse-server/config.d/cluster.xml".text = ''
+
<clickhouse>
+
<remote_servers>
+
<perftest_2shards_1replicas>
+
${lib.concatStrings (
+
lib.imap0 (j: name: ''
+
<shard>
+
<replica>
+
<host>${name}</host>
+
<port>9000</port>
+
</replica>
+
</shard>
+
'') (builtins.attrNames nodes)
+
)}
+
</perftest_2shards_1replicas>
+
</remote_servers>
+
</clickhouse>
+
'';
+
+
"clickhouse-server/config.d/keeper.xml".text = ''
+
<clickhouse>
+
<keeper_server>
+
<server_id>${toString i}</server_id>
+
<tcp_port>9181</tcp_port>
+
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
+
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
+
+
<coordination_settings>
+
<operation_timeout_ms>10000</operation_timeout_ms>
+
<session_timeout_ms>30000</session_timeout_ms>
+
<raft_logs_level>trace</raft_logs_level>
+
<rotate_log_storage_interval>10000</rotate_log_storage_interval>
+
</coordination_settings>
+
+
<raft_configuration>
+
${lib.concatStrings (
+
lib.imap1 (j: name: ''
+
<server>
+
<id>${toString j}</id>
+
<hostname>${name}</hostname>
+
<port>9444</port>
+
</server>
+
'') (builtins.attrNames nodes)
+
)}
+
</raft_configuration>
+
</keeper_server>
+
+
<zookeeper>
+
${lib.concatStrings (
+
lib.imap0 (j: name: ''
+
<node>
+
<host>${name}</host>
+
<port>9181</port>
+
</node>
+
'') (builtins.attrNames nodes)
+
)}
+
</zookeeper>
+
+
<distributed_ddl>
+
<path>/clickhouse/testcluster/task_queue/ddl</path>
+
</distributed_ddl>
+
</clickhouse>
+
'';
+
+
"clickhouse-server/config.d/listen.xml".text = ''
+
<clickhouse>
+
<listen_host>::</listen_host>
+
</clickhouse>
+
'';
+
+
"clickhouse-server/config.d/macros.xml".text = ''
+
<clickhouse>
+
<macros>
+
<replica>${toString i}</replica>
+
<cluster>perftest_2shards_1replicas</cluster>
+
</macros>
+
</clickhouse>
+
'';
+
};
+
+
networking.firewall.allowedTCPPorts = [
+
9009
+
9181
+
9444
+
];
+
+
services.clickhouse.enable = true;
+
+
systemd.services.clickhouse = {
+
after = [ "network-online.target" ];
+
requires = [ "network-online.target" ];
+
};
+
+
virtualisation.memorySize = 1024 * 4;
+
virtualisation.diskSize = 1024 * 10;
+
};
+
in
+
{
+
clickhouse1 = node 1;
+
clickhouse2 = node 2;
+
};
+
+
testScript =
+
let
+
# work around quote/substitution complexity by Nix, Perl, bash and SQL.
+
clustersQuery = pkgs.writeText "clusters.sql" "SHOW clusters";
+
keeperQuery = pkgs.writeText "keeper.sql" "SELECT * FROM system.zookeeper WHERE path IN ('/', '/clickhouse') FORMAT VERTICAL";
+
systemClustersQuery = pkgs.writeText "system-clusters.sql" "SELECT host_name, host_address, replica_num FROM system.clusters WHERE cluster = 'perftest_2shards_1replicas'";
+
+
tableDDL = pkgs.writeText "table.sql" ''
+
CREATE TABLE test ON cluster 'perftest_2shards_1replicas' ( A Int64, S String)
+
Engine = ReplicatedMergeTree('/clickhouse/{cluster}/tables/{database}/{table}', '{replica}')
+
ORDER BY A;
+
'';
+
+
insertDDL = pkgs.writeText "insert.sql" "
+
INSERT INTO test SELECT number, '' FROM numbers(100000000);
+
";
+
+
selectCountQuery = pkgs.writeText "select-count.sql" "
+
select count() from test;
+
";
+
in
+
''
+
clickhouse1.start()
+
clickhouse2.start()
+
+
for machine in clickhouse1, clickhouse2:
+
machine.wait_for_unit("clickhouse.service")
+
machine.wait_for_open_port(9000)
+
machine.wait_for_open_port(9009)
+
machine.wait_for_open_port(9181)
+
machine.wait_for_open_port(9444)
+
+
machine.wait_until_succeeds(
+
"""
+
journalctl -o cat -u clickhouse.service | grep "Merging configuration file '/etc/clickhouse-server/config.d/keeper.xml'"
+
"""
+
)
+
+
machine.log(machine.succeed(
+
"cat ${clustersQuery} | clickhouse-client | grep perftest_2shards_1replicas"
+
))
+
+
machine.log(machine.succeed(
+
"cat ${keeperQuery} | clickhouse-client"
+
))
+
+
machine.succeed(
+
"cat ${systemClustersQuery} | clickhouse-client | grep clickhouse1"
+
)
+
machine.succeed(
+
"cat ${systemClustersQuery} | clickhouse-client | grep clickhouse2"
+
)
+
+
machine.succeed(
+
"ls /var/lib/clickhouse/coordination/log | grep changelog"
+
)
+
+
clickhouse2.succeed(
+
"cat ${tableDDL} | clickhouse-client"
+
)
+
+
clickhouse2.succeed(
+
"cat ${insertDDL} | clickhouse-client"
+
)
+
+
for machine in clickhouse1, clickhouse2:
+
machine.wait_until_succeeds(
+
"cat ${selectCountQuery} | clickhouse-client | grep 100000000"
+
)
+
'';
+
}
+121
nixos/tests/clickhouse/s3.nix
···
···
+
{ pkgs, ... }:
+
+
let
+
s3 = {
+
bucket = "clickhouse-bucket";
+
accessKey = "BKIKJAA5BMMU2RHO6IBB";
+
secretKey = "V7f1CwQqAcwo80UEIJEjc5gVQUSSx5ohQ9GSrr12";
+
};
+
+
clickhouseS3StorageConfig = ''
+
<clickhouse>
+
<storage_configuration>
+
<disks>
+
<s3_disk>
+
<type>s3</type>
+
<endpoint>http://minio:9000/${s3.bucket}/</endpoint>
+
<access_key_id>${s3.accessKey}</access_key_id>
+
<secret_access_key>${s3.secretKey}</secret_access_key>
+
<metadata_path>/var/lib/clickhouse/disks/s3_disk/</metadata_path>
+
</s3_disk>
+
<s3_cache>
+
<type>cache</type>
+
<disk>s3_disk</disk>
+
<path>/var/lib/clickhouse/disks/s3_cache/</path>
+
<max_size>10Gi</max_size>
+
</s3_cache>
+
</disks>
+
<policies>
+
<s3_main>
+
<volumes>
+
<main>
+
<disk>s3_disk</disk>
+
</main>
+
</volumes>
+
</s3_main>
+
</policies>
+
</storage_configuration>
+
</clickhouse>
+
'';
+
in
+
{
+
name = "clickhouse-s3";
+
meta.maintainers = with pkgs.lib.maintainers; [ jpds ];
+
+
nodes = {
+
clickhouse = {
+
environment.etc = {
+
"clickhouse-server/config.d/s3.xml" = {
+
text = "${clickhouseS3StorageConfig}";
+
};
+
};
+
+
services.clickhouse.enable = true;
+
virtualisation.diskSize = 15 * 1024;
+
virtualisation.memorySize = 4 * 1024;
+
};
+
+
minio =
+
{ pkgs, ... }:
+
{
+
virtualisation.diskSize = 2 * 1024;
+
networking.firewall.allowedTCPPorts = [ 9000 ];
+
+
services.minio = {
+
enable = true;
+
inherit (s3) accessKey secretKey;
+
};
+
+
environment.systemPackages = [ pkgs.minio-client ];
+
};
+
};
+
+
testScript =
+
let
+
# work around quote/substitution complexity by Nix, Perl, bash and SQL.
+
tableDDL = pkgs.writeText "ddl.sql" ''
+
CREATE TABLE `demo` (
+
`value` String
+
)
+
ENGINE = MergeTree
+
ORDER BY value
+
SETTINGS storage_policy = 's3_main';
+
'';
+
insertQuery = pkgs.writeText "insert.sql" "INSERT INTO `demo` (`value`) VALUES ('foo');";
+
selectQuery = pkgs.writeText "select.sql" "SELECT * from `demo`";
+
in
+
''
+
minio.wait_for_unit("minio")
+
minio.wait_for_open_port(9000)
+
minio.succeed(
+
"mc alias set minio "
+
+ "http://localhost:9000 "
+
+ "${s3.accessKey} ${s3.secretKey} --api s3v4",
+
"mc mb minio/${s3.bucket}",
+
)
+
+
clickhouse.start()
+
clickhouse.wait_for_unit("clickhouse.service")
+
clickhouse.wait_for_open_port(9000)
+
+
clickhouse.wait_until_succeeds(
+
"""
+
journalctl -o cat -u clickhouse.service | grep "Merging configuration file '/etc/clickhouse-server/config.d/s3.xml'"
+
"""
+
)
+
+
clickhouse.succeed(
+
"cat ${tableDDL} | clickhouse-client"
+
)
+
clickhouse.succeed(
+
"cat ${insertQuery} | clickhouse-client"
+
)
+
clickhouse.succeed(
+
"cat ${selectQuery} | clickhouse-client | grep foo"
+
)
+
+
minio.log(minio.succeed(
+
"mc ls minio/${s3.bucket}",
+
))
+
'';
+
}