at master 5.9 kB view raw
1{ lib, pkgs, ... }: 2let 3 # Take the original journald message and create a new payload which only 4 # contains the relevant fields - these must match the database columns. 5 journalVrlRemapTransform = { 6 journald_remap = { 7 inputs = [ "journald" ]; 8 type = "remap"; 9 source = '' 10 m = {} 11 m.app = .SYSLOG_IDENTIFIER 12 m.host = .host 13 m.boot_id = ._BOOT_ID 14 m.severity = to_int(.PRIORITY) ?? 0 15 m.level = to_syslog_level(m.severity) ?? "" 16 m.message = strip_ansi_escape_codes!(.message) 17 m.timestamp = .timestamp 18 m.uid = to_int(._UID) ?? 0 19 m.pid = to_int(._PID) ?? 0 20 . = [m] 21 ''; 22 }; 23 }; 24in 25{ 26 name = "vector-journald-clickhouse"; 27 meta.maintainers = [ pkgs.lib.maintainers.happysalada ]; 28 29 nodes = { 30 clickhouse = 31 { config, pkgs, ... }: 32 { 33 virtualisation.diskSize = 5 * 1024; 34 virtualisation.memorySize = 4096; 35 36 networking.firewall.allowedTCPPorts = [ 6000 ]; 37 38 services.vector = { 39 enable = true; 40 journaldAccess = true; 41 42 settings = { 43 sources = { 44 journald = { 45 type = "journald"; 46 }; 47 48 vector_source = { 49 type = "vector"; 50 address = "[::]:6000"; 51 }; 52 }; 53 54 transforms = journalVrlRemapTransform; 55 56 sinks = { 57 clickhouse = { 58 type = "clickhouse"; 59 inputs = [ 60 "journald_remap" 61 "vector_source" 62 ]; 63 endpoint = "http://localhost:8123"; 64 auth = { 65 strategy = "basic"; 66 user = "vector"; 67 password = "helloclickhouseworld"; 68 }; 69 database = "journald"; 70 table = "logs"; 71 date_time_best_effort = true; 72 }; 73 }; 74 }; 75 76 }; 77 78 services.clickhouse = { 79 enable = true; 80 }; 81 82 # ACL configuration for Vector 83 environment = { 84 etc."clickhouse-server/users.d/vector.xml".text = '' 85 <clickhouse> 86 <users> 87 <vector> 88 <password>helloclickhouseworld</password> 89 90 <access_management>0</access_management> 91 92 <quota>default</quota> 93 <default_database>journald</default_database> 94 95 <grants> 96 <query>GRANT INSERT ON journald.logs</query> 97 </grants> 98 </vector> 99 </users> 100 </clickhouse> 101 ''; 102 103 # ACL configuration for read-only client 104 etc."clickhouse-server/users.d/grafana.xml".text = '' 105 <clickhouse> 106 <users> 107 <grafana> 108 <password>helloclickhouseworld2</password> 109 110 <access_management>0</access_management> 111 112 <quota>default</quota> 113 <default_database>journald</default_database> 114 115 <grants> 116 <query>GRANT SELECT ON journald.logs</query> 117 </grants> 118 </grafana> 119 </users> 120 </clickhouse> 121 ''; 122 }; 123 }; 124 125 vector = 126 { config, pkgs, ... }: 127 { 128 services.vector = { 129 enable = true; 130 journaldAccess = true; 131 132 settings = { 133 sources = { 134 journald = { 135 type = "journald"; 136 }; 137 }; 138 139 transforms = journalVrlRemapTransform; 140 141 sinks = { 142 vector_sink = { 143 type = "vector"; 144 inputs = [ "journald_remap" ]; 145 address = "clickhouse:6000"; 146 }; 147 }; 148 }; 149 }; 150 }; 151 }; 152 153 testScript = 154 let 155 # work around quote/substitution complexity by Nix, Perl, bash and SQL. 156 databaseDDL = pkgs.writeText "database.sql" "CREATE DATABASE IF NOT EXISTS journald"; 157 158 # https://clickhouse.com/blog/storing-log-data-in-clickhouse-fluent-bit-vector-open-telemetry 159 # ORDER BY advice: https://kb.altinity.com/engines/mergetree-table-engine-family/pick-keys/ 160 tableDDL = pkgs.writeText "table.sql" '' 161 CREATE TABLE IF NOT EXISTS journald.logs ( 162 timestamp DateTime64(6), 163 host LowCardinality(String), 164 boot_id LowCardinality(String), 165 app LowCardinality(String), 166 level LowCardinality(String), 167 severity UInt8, 168 message String, 169 uid UInt16, 170 pid UInt32, 171 ) 172 ENGINE = MergeTree() 173 ORDER BY (host, boot_id, toStartOfHour(timestamp), app, timestamp) 174 PARTITION BY toYYYYMM(timestamp) 175 ''; 176 177 selectQuery = pkgs.writeText "select.sql" '' 178 SELECT COUNT(host) FROM journald.logs 179 WHERE message LIKE '%Vector has started%' 180 ''; 181 in 182 '' 183 clickhouse.wait_for_unit("clickhouse") 184 clickhouse.wait_for_open_port(6000) 185 clickhouse.wait_for_open_port(8123) 186 187 clickhouse.succeed( 188 "cat ${databaseDDL} | clickhouse-client" 189 ) 190 191 clickhouse.succeed( 192 "cat ${tableDDL} | clickhouse-client" 193 ) 194 195 for machine in clickhouse, vector: 196 machine.wait_for_unit("vector") 197 machine.wait_until_succeeds( 198 "journalctl -o cat -u vector.service | grep 'Vector has started'" 199 ) 200 201 clickhouse.fail( 202 "cat ${selectQuery} | clickhouse-client --user vector --password helloclickhouseworld | grep 2" 203 ) 204 205 clickhouse.wait_until_succeeds( 206 "cat ${selectQuery} | clickhouse-client --user grafana --password helloclickhouseworld2 | grep 2" 207 ) 208 ''; 209}