1{ lib, pkgs, ... }:
2let
3 # Take the original journald message and create a new payload which only
4 # contains the relevant fields - these must match the database columns.
5 journalVrlRemapTransform = {
6 journald_remap = {
7 inputs = [ "journald" ];
8 type = "remap";
9 source = ''
10 m = {}
11 m.app = .SYSLOG_IDENTIFIER
12 m.host = .host
13 m.boot_id = ._BOOT_ID
14 m.severity = to_int(.PRIORITY) ?? 0
15 m.level = to_syslog_level(m.severity) ?? ""
16 m.message = strip_ansi_escape_codes!(.message)
17 m.timestamp = .timestamp
18 m.uid = to_int(._UID) ?? 0
19 m.pid = to_int(._PID) ?? 0
20 . = [m]
21 '';
22 };
23 };
24in
25{
26 name = "vector-journald-clickhouse";
27 meta.maintainers = [ pkgs.lib.maintainers.happysalada ];
28
29 nodes = {
30 clickhouse =
31 { config, pkgs, ... }:
32 {
33 virtualisation.diskSize = 5 * 1024;
34 virtualisation.memorySize = 4096;
35
36 networking.firewall.allowedTCPPorts = [ 6000 ];
37
38 services.vector = {
39 enable = true;
40 journaldAccess = true;
41
42 settings = {
43 sources = {
44 journald = {
45 type = "journald";
46 };
47
48 vector_source = {
49 type = "vector";
50 address = "[::]:6000";
51 };
52 };
53
54 transforms = journalVrlRemapTransform;
55
56 sinks = {
57 clickhouse = {
58 type = "clickhouse";
59 inputs = [
60 "journald_remap"
61 "vector_source"
62 ];
63 endpoint = "http://localhost:8123";
64 auth = {
65 strategy = "basic";
66 user = "vector";
67 password = "helloclickhouseworld";
68 };
69 database = "journald";
70 table = "logs";
71 date_time_best_effort = true;
72 };
73 };
74 };
75
76 };
77
78 services.clickhouse = {
79 enable = true;
80 };
81
82 # ACL configuration for Vector
83 environment = {
84 etc."clickhouse-server/users.d/vector.xml".text = ''
85 <clickhouse>
86 <users>
87 <vector>
88 <password>helloclickhouseworld</password>
89
90 <access_management>0</access_management>
91
92 <quota>default</quota>
93 <default_database>journald</default_database>
94
95 <grants>
96 <query>GRANT INSERT ON journald.logs</query>
97 </grants>
98 </vector>
99 </users>
100 </clickhouse>
101 '';
102
103 # ACL configuration for read-only client
104 etc."clickhouse-server/users.d/grafana.xml".text = ''
105 <clickhouse>
106 <users>
107 <grafana>
108 <password>helloclickhouseworld2</password>
109
110 <access_management>0</access_management>
111
112 <quota>default</quota>
113 <default_database>journald</default_database>
114
115 <grants>
116 <query>GRANT SELECT ON journald.logs</query>
117 </grants>
118 </grafana>
119 </users>
120 </clickhouse>
121 '';
122 };
123 };
124
125 vector =
126 { config, pkgs, ... }:
127 {
128 services.vector = {
129 enable = true;
130 journaldAccess = true;
131
132 settings = {
133 sources = {
134 journald = {
135 type = "journald";
136 };
137 };
138
139 transforms = journalVrlRemapTransform;
140
141 sinks = {
142 vector_sink = {
143 type = "vector";
144 inputs = [ "journald_remap" ];
145 address = "clickhouse:6000";
146 };
147 };
148 };
149 };
150 };
151 };
152
153 testScript =
154 let
155 # work around quote/substitution complexity by Nix, Perl, bash and SQL.
156 databaseDDL = pkgs.writeText "database.sql" "CREATE DATABASE IF NOT EXISTS journald";
157
158 # https://clickhouse.com/blog/storing-log-data-in-clickhouse-fluent-bit-vector-open-telemetry
159 # ORDER BY advice: https://kb.altinity.com/engines/mergetree-table-engine-family/pick-keys/
160 tableDDL = pkgs.writeText "table.sql" ''
161 CREATE TABLE IF NOT EXISTS journald.logs (
162 timestamp DateTime64(6),
163 host LowCardinality(String),
164 boot_id LowCardinality(String),
165 app LowCardinality(String),
166 level LowCardinality(String),
167 severity UInt8,
168 message String,
169 uid UInt16,
170 pid UInt32,
171 )
172 ENGINE = MergeTree()
173 ORDER BY (host, boot_id, toStartOfHour(timestamp), app, timestamp)
174 PARTITION BY toYYYYMM(timestamp)
175 '';
176
177 selectQuery = pkgs.writeText "select.sql" ''
178 SELECT COUNT(host) FROM journald.logs
179 WHERE message LIKE '%Vector has started%'
180 '';
181 in
182 ''
183 clickhouse.wait_for_unit("clickhouse")
184 clickhouse.wait_for_open_port(6000)
185 clickhouse.wait_for_open_port(8123)
186
187 clickhouse.succeed(
188 "cat ${databaseDDL} | clickhouse-client"
189 )
190
191 clickhouse.succeed(
192 "cat ${tableDDL} | clickhouse-client"
193 )
194
195 for machine in clickhouse, vector:
196 machine.wait_for_unit("vector")
197 machine.wait_until_succeeds(
198 "journalctl -o cat -u vector.service | grep 'Vector has started'"
199 )
200
201 clickhouse.fail(
202 "cat ${selectQuery} | clickhouse-client --user vector --password helloclickhouseworld | grep 2"
203 )
204
205 clickhouse.wait_until_succeeds(
206 "cat ${selectQuery} | clickhouse-client --user grafana --password helloclickhouseworld2 | grep 2"
207 )
208 '';
209}