1{ lib, pkgs, ... }:
2
3{
4 name = "vector-nginx-clickhouse";
5 meta.maintainers = [ pkgs.lib.maintainers.happysalada ];
6
7 nodes = {
8 clickhouse =
9 { config, pkgs, ... }:
10 {
11 virtualisation.memorySize = 4096;
12
13 # Clickhouse module can't listen on a non-loopback IP.
14 networking.firewall.allowedTCPPorts = [ 6000 ];
15 services.clickhouse.enable = true;
16
17 # Exercise Vector sink->source for now.
18 services.vector = {
19 enable = true;
20
21 settings = {
22 sources = {
23 vector_source = {
24 type = "vector";
25 address = "[::]:6000";
26 };
27 };
28
29 sinks = {
30 clickhouse = {
31 type = "clickhouse";
32 inputs = [ "vector_source" ];
33 endpoint = "http://localhost:8123";
34 database = "nginxdb";
35 table = "access_logs";
36 skip_unknown_fields = true;
37 };
38 };
39 };
40 };
41 };
42
43 nginx =
44 { config, pkgs, ... }:
45 {
46 services.nginx = {
47 enable = true;
48 virtualHosts.localhost = { };
49 };
50
51 services.vector = {
52 enable = true;
53
54 settings = {
55 sources = {
56 nginx_logs = {
57 type = "file";
58 include = [ "/var/log/nginx/access.log" ];
59 read_from = "end";
60 };
61 };
62
63 sinks = {
64 vector_sink = {
65 type = "vector";
66 inputs = [ "nginx_logs" ];
67 address = "clickhouse:6000";
68 };
69 };
70 };
71 };
72
73 systemd.services.vector.serviceConfig = {
74 SupplementaryGroups = [ "nginx" ];
75 };
76 };
77 };
78
79 testScript =
80 let
81 # work around quote/substitution complexity by Nix, Perl, bash and SQL.
82 databaseDDL = pkgs.writeText "database.sql" "CREATE DATABASE IF NOT EXISTS nginxdb";
83
84 tableDDL = pkgs.writeText "table.sql" ''
85 CREATE TABLE IF NOT EXISTS nginxdb.access_logs (
86 message String
87 )
88 ENGINE = MergeTree()
89 ORDER BY tuple()
90 '';
91
92 # Graciously taken from https://clickhouse.com/docs/en/integrations/vector
93 tableView = pkgs.writeText "table-view.sql" ''
94 CREATE MATERIALIZED VIEW nginxdb.access_logs_view
95 (
96 RemoteAddr String,
97 Client String,
98 RemoteUser String,
99 TimeLocal DateTime,
100 RequestMethod String,
101 Request String,
102 HttpVersion String,
103 Status Int32,
104 BytesSent Int64,
105 UserAgent String
106 )
107 ENGINE = MergeTree()
108 ORDER BY RemoteAddr
109 POPULATE AS
110 WITH
111 splitByWhitespace(message) as split,
112 splitByRegexp('\S \d+ "([^"]*)"', message) as referer
113 SELECT
114 split[1] AS RemoteAddr,
115 split[2] AS Client,
116 split[3] AS RemoteUser,
117 parseDateTimeBestEffort(replaceOne(trim(LEADING '[' FROM split[4]), ':', ' ')) AS TimeLocal,
118 trim(LEADING '"' FROM split[6]) AS RequestMethod,
119 split[7] AS Request,
120 trim(TRAILING '"' FROM split[8]) AS HttpVersion,
121 split[9] AS Status,
122 split[10] AS BytesSent,
123 trim(BOTH '"' from referer[2]) AS UserAgent
124 FROM
125 (SELECT message FROM nginxdb.access_logs)
126 '';
127
128 selectQuery = pkgs.writeText "select.sql" "SELECT * from nginxdb.access_logs_view";
129 in
130 ''
131 clickhouse.wait_for_unit("clickhouse")
132 clickhouse.wait_for_open_port(8123)
133
134 clickhouse.wait_until_succeeds(
135 "journalctl -o cat -u clickhouse.service | grep 'Started ClickHouse server'"
136 )
137
138 clickhouse.wait_for_unit("vector")
139 clickhouse.wait_for_open_port(6000)
140
141 clickhouse.succeed(
142 "cat ${databaseDDL} | clickhouse-client"
143 )
144
145 clickhouse.succeed(
146 "cat ${tableDDL} | clickhouse-client"
147 )
148
149 clickhouse.succeed(
150 "cat ${tableView} | clickhouse-client"
151 )
152
153 nginx.wait_for_unit("nginx")
154 nginx.wait_for_open_port(80)
155 nginx.wait_for_unit("vector")
156 nginx.wait_until_succeeds(
157 "journalctl -o cat -u vector.service | grep 'Starting file server'"
158 )
159
160 nginx.succeed("curl http://localhost/")
161 nginx.succeed("curl http://localhost/")
162
163 nginx.wait_for_file("/var/log/nginx/access.log")
164 nginx.wait_until_succeeds(
165 "journalctl -o cat -u vector.service | grep 'Found new file to watch. file=/var/log/nginx/access.log'"
166 )
167
168 clickhouse.wait_until_succeeds(
169 "cat ${selectQuery} | clickhouse-client | grep 'curl'"
170 )
171 '';
172}