1import ../make-test-python.nix ({ lib, pkgs, ... }:
2
3{
4 name = "vector-nginx-clickhouse";
5 meta.maintainers = [ pkgs.lib.maintainers.happysalada ];
6
7 nodes = {
8 clickhouse = { config, pkgs, ... }: {
9 virtualisation.memorySize = 4096;
10
11 # Clickhouse module can't listen on a non-loopback IP.
12 networking.firewall.allowedTCPPorts = [ 6000 ];
13 services.clickhouse.enable = true;
14
15 # Exercise Vector sink->source for now.
16 services.vector = {
17 enable = true;
18
19 settings = {
20 sources = {
21 vector_source = {
22 type = "vector";
23 address = "[::]:6000";
24 };
25 };
26
27 sinks = {
28 clickhouse = {
29 type = "clickhouse";
30 inputs = [ "vector_source" ];
31 endpoint = "http://localhost:8123";
32 database = "nginxdb";
33 table = "access_logs";
34 skip_unknown_fields = true;
35 };
36 };
37 };
38 };
39 };
40
41 nginx = { config, pkgs, ... }: {
42 services.nginx = {
43 enable = true;
44 virtualHosts.localhost = {};
45 };
46
47 services.vector = {
48 enable = true;
49
50 settings = {
51 sources = {
52 nginx_logs = {
53 type = "file";
54 include = [ "/var/log/nginx/access.log" ];
55 read_from = "end";
56 };
57 };
58
59 sinks = {
60 vector_sink = {
61 type = "vector";
62 inputs = [ "nginx_logs" ];
63 address = "clickhouse:6000";
64 };
65 };
66 };
67 };
68
69 systemd.services.vector.serviceConfig = {
70 SupplementaryGroups = [ "nginx" ];
71 };
72 };
73 };
74
75 testScript =
76 let
77 # work around quote/substitution complexity by Nix, Perl, bash and SQL.
78 databaseDDL = pkgs.writeText "database.sql" "CREATE DATABASE IF NOT EXISTS nginxdb";
79
80 tableDDL = pkgs.writeText "table.sql" ''
81 CREATE TABLE IF NOT EXISTS nginxdb.access_logs (
82 message String
83 )
84 ENGINE = MergeTree()
85 ORDER BY tuple()
86 '';
87
88 # Graciously taken from https://clickhouse.com/docs/en/integrations/vector
89 tableView = pkgs.writeText "table-view.sql" ''
90 CREATE MATERIALIZED VIEW nginxdb.access_logs_view
91 (
92 RemoteAddr String,
93 Client String,
94 RemoteUser String,
95 TimeLocal DateTime,
96 RequestMethod String,
97 Request String,
98 HttpVersion String,
99 Status Int32,
100 BytesSent Int64,
101 UserAgent String
102 )
103 ENGINE = MergeTree()
104 ORDER BY RemoteAddr
105 POPULATE AS
106 WITH
107 splitByWhitespace(message) as split,
108 splitByRegexp('\S \d+ "([^"]*)"', message) as referer
109 SELECT
110 split[1] AS RemoteAddr,
111 split[2] AS Client,
112 split[3] AS RemoteUser,
113 parseDateTimeBestEffort(replaceOne(trim(LEADING '[' FROM split[4]), ':', ' ')) AS TimeLocal,
114 trim(LEADING '"' FROM split[6]) AS RequestMethod,
115 split[7] AS Request,
116 trim(TRAILING '"' FROM split[8]) AS HttpVersion,
117 split[9] AS Status,
118 split[10] AS BytesSent,
119 trim(BOTH '"' from referer[2]) AS UserAgent
120 FROM
121 (SELECT message FROM nginxdb.access_logs)
122 '';
123
124 selectQuery = pkgs.writeText "select.sql" "SELECT * from nginxdb.access_logs_view";
125 in
126 ''
127 clickhouse.wait_for_unit("clickhouse")
128 clickhouse.wait_for_open_port(8123)
129
130 clickhouse.wait_until_succeeds(
131 "journalctl -o cat -u clickhouse.service | grep 'Started ClickHouse server'"
132 )
133
134 clickhouse.wait_for_unit("vector")
135 clickhouse.wait_for_open_port(6000)
136
137 clickhouse.succeed(
138 "cat ${databaseDDL} | clickhouse-client"
139 )
140
141 clickhouse.succeed(
142 "cat ${tableDDL} | clickhouse-client"
143 )
144
145 clickhouse.succeed(
146 "cat ${tableView} | clickhouse-client"
147 )
148
149 nginx.wait_for_unit("nginx")
150 nginx.wait_for_open_port(80)
151 nginx.wait_for_unit("vector")
152 nginx.wait_until_succeeds(
153 "journalctl -o cat -u vector.service | grep 'Starting file server'"
154 )
155
156 nginx.succeed("curl http://localhost/")
157 nginx.succeed("curl http://localhost/")
158
159 nginx.wait_for_file("/var/log/nginx/access.log")
160 nginx.wait_until_succeeds(
161 "journalctl -o cat -u vector.service | grep 'Found new file to watch. file=/var/log/nginx/access.log'"
162 )
163
164 clickhouse.wait_until_succeeds(
165 "cat ${selectQuery} | clickhouse-client | grep 'curl'"
166 )
167 '';
168})