1{ lib, pkgs, ... }:
2
3{
4 name = "vector-caddy-clickhouse";
5 meta.maintainers = [ pkgs.lib.maintainers.happysalada ];
6
7 nodes = {
8 caddy =
9 { config, pkgs, ... }:
10 {
11 networking.firewall.allowedTCPPorts = [ 80 ];
12
13 services.caddy = {
14 enable = true;
15 virtualHosts = {
16 "http://caddy" = {
17 extraConfig = ''
18 encode gzip
19
20 file_server
21 root /srv
22 '';
23 logFormat = "
24 output file ${config.services.caddy.logDir}/access-caddy.log {
25 mode 0640
26 }
27 ";
28 };
29 };
30 };
31
32 systemd.services.vector.serviceConfig = {
33 SupplementaryGroups = [ "caddy" ];
34 };
35
36 services.vector = {
37 enable = true;
38
39 settings = {
40 sources = {
41 caddy-log = {
42 type = "file";
43 include = [ "/var/log/caddy/*.log" ];
44 };
45 };
46
47 transforms = {
48 caddy_logs_timestamp = {
49 type = "remap";
50 inputs = [ "caddy-log" ];
51 source = ''
52 .tmp_timestamp, err = parse_json!(.message).ts * 1000000
53
54 if err != null {
55 log("Unable to parse ts value: " + err, level: "error")
56 } else {
57 .timestamp = from_unix_timestamp!(to_int!(.tmp_timestamp), unit: "microseconds")
58 }
59
60 del(.tmp_timestamp)
61 '';
62 };
63 };
64
65 sinks = {
66 vector_sink = {
67 type = "vector";
68 inputs = [ "caddy_logs_timestamp" ];
69 address = "clickhouse:6000";
70 };
71 };
72 };
73 };
74 };
75
76 client =
77 { config, pkgs, ... }:
78 {
79 environment.systemPackages = [ pkgs.curl ];
80 };
81
82 clickhouse =
83 { config, pkgs, ... }:
84 {
85 virtualisation.memorySize = 4096;
86
87 networking.firewall.allowedTCPPorts = [ 6000 ];
88
89 services.vector = {
90 enable = true;
91
92 settings = {
93 sources = {
94 vector_source = {
95 type = "vector";
96 address = "[::]:6000";
97 };
98 };
99
100 sinks = {
101 clickhouse = {
102 type = "clickhouse";
103 inputs = [
104 "vector_source"
105 ];
106 endpoint = "http://localhost:8123";
107 database = "caddy";
108 table = "access_logs";
109 date_time_best_effort = true;
110 skip_unknown_fields = true;
111 };
112 };
113 };
114
115 };
116
117 services.clickhouse = {
118 enable = true;
119 };
120 };
121 };
122
123 testScript =
124 let
125 # work around quote/substitution complexity by Nix, Perl, bash and SQL.
126 databaseDDL = pkgs.writeText "database.sql" "CREATE DATABASE IF NOT EXISTS caddy";
127
128 tableDDL = pkgs.writeText "table.sql" ''
129 CREATE TABLE IF NOT EXISTS caddy.access_logs (
130 timestamp DateTime64(6),
131 host LowCardinality(String),
132 message String,
133 )
134 ENGINE = MergeTree()
135 ORDER BY timestamp
136 PARTITION BY toYYYYMM(timestamp)
137 '';
138
139 tableViewBase = pkgs.writeText "table-view-base.sql" ''
140 CREATE TABLE IF NOT EXISTS caddy.access_logs_view_base (
141 timestamp DateTime64(6),
142 host LowCardinality(String),
143 request JSON,
144 status UInt16,
145 )
146 ENGINE = MergeTree()
147 ORDER BY timestamp
148 PARTITION BY toYYYYMM(timestamp)
149 '';
150
151 tableView = pkgs.writeText "table-view.sql" ''
152 CREATE MATERIALIZED VIEW IF NOT EXISTS caddy.access_logs_view TO caddy.access_logs_view_base
153 AS SELECT
154 timestamp,
155 host,
156 simpleJSONExtractRaw(message, 'request') AS request,
157 simpleJSONExtractRaw(message, 'status') AS status
158 FROM caddy.access_logs;
159 '';
160
161 selectQuery = pkgs.writeText "select.sql" ''
162 SELECT
163 timestamp,
164 request.host,
165 request.remote_ip,
166 request.proto,
167 request.method,
168 request.uri,
169 status
170 FROM caddy.access_logs_view_base
171 WHERE request.uri LIKE '%test-uri%'
172 FORMAT Pretty
173 '';
174 in
175 ''
176 clickhouse.wait_for_unit("clickhouse")
177 clickhouse.wait_for_unit("vector")
178 clickhouse.wait_for_open_port(6000)
179 clickhouse.wait_for_open_port(8123)
180
181 clickhouse.succeed(
182 "cat ${databaseDDL} | clickhouse-client",
183 "cat ${tableDDL} | clickhouse-client",
184 "cat ${tableViewBase} | clickhouse-client",
185 "cat ${tableView} | clickhouse-client",
186 )
187
188 caddy.wait_for_unit("caddy")
189 caddy.wait_for_open_port(80)
190 caddy.wait_for_unit("vector")
191 caddy.wait_until_succeeds(
192 "journalctl -o cat -u vector.service | grep 'Vector has started'"
193 )
194
195 client.systemctl("start network-online.target")
196 client.wait_until_succeeds("curl http://caddy/test-uri")
197
198 caddy.wait_until_succeeds(
199 "journalctl -o cat -u vector.service | grep 'Found new file to watch. file=/var/log/caddy/access-caddy.log'"
200 )
201
202 clickhouse.wait_until_succeeds(
203 "cat ${selectQuery} | clickhouse-client | grep test-uri"
204 )
205 '';
206}