at master 5.6 kB view raw
1{ lib, pkgs, ... }: 2 3{ 4 name = "vector-caddy-clickhouse"; 5 meta.maintainers = [ pkgs.lib.maintainers.happysalada ]; 6 7 nodes = { 8 caddy = 9 { config, pkgs, ... }: 10 { 11 networking.firewall.allowedTCPPorts = [ 80 ]; 12 13 services.caddy = { 14 enable = true; 15 virtualHosts = { 16 "http://caddy" = { 17 extraConfig = '' 18 encode gzip 19 20 file_server 21 root /srv 22 ''; 23 logFormat = " 24 output file ${config.services.caddy.logDir}/access-caddy.log { 25 mode 0640 26 } 27 "; 28 }; 29 }; 30 }; 31 32 systemd.services.vector.serviceConfig = { 33 SupplementaryGroups = [ "caddy" ]; 34 }; 35 36 services.vector = { 37 enable = true; 38 39 settings = { 40 sources = { 41 caddy-log = { 42 type = "file"; 43 include = [ "/var/log/caddy/*.log" ]; 44 }; 45 }; 46 47 transforms = { 48 caddy_logs_timestamp = { 49 type = "remap"; 50 inputs = [ "caddy-log" ]; 51 source = '' 52 .tmp_timestamp, err = parse_json!(.message).ts * 1000000 53 54 if err != null { 55 log("Unable to parse ts value: " + err, level: "error") 56 } else { 57 .timestamp = from_unix_timestamp!(to_int!(.tmp_timestamp), unit: "microseconds") 58 } 59 60 del(.tmp_timestamp) 61 ''; 62 }; 63 }; 64 65 sinks = { 66 vector_sink = { 67 type = "vector"; 68 inputs = [ "caddy_logs_timestamp" ]; 69 address = "clickhouse:6000"; 70 }; 71 }; 72 }; 73 }; 74 }; 75 76 client = 77 { config, pkgs, ... }: 78 { 79 environment.systemPackages = [ pkgs.curl ]; 80 }; 81 82 clickhouse = 83 { config, pkgs, ... }: 84 { 85 virtualisation.memorySize = 4096; 86 87 networking.firewall.allowedTCPPorts = [ 6000 ]; 88 89 services.vector = { 90 enable = true; 91 92 settings = { 93 sources = { 94 vector_source = { 95 type = "vector"; 96 address = "[::]:6000"; 97 }; 98 }; 99 100 sinks = { 101 clickhouse = { 102 type = "clickhouse"; 103 inputs = [ 104 "vector_source" 105 ]; 106 endpoint = "http://localhost:8123"; 107 database = "caddy"; 108 table = "access_logs"; 109 date_time_best_effort = true; 110 skip_unknown_fields = true; 111 }; 112 }; 113 }; 114 115 }; 116 117 services.clickhouse = { 118 enable = true; 119 }; 120 }; 121 }; 122 123 testScript = 124 let 125 # work around quote/substitution complexity by Nix, Perl, bash and SQL. 126 databaseDDL = pkgs.writeText "database.sql" "CREATE DATABASE IF NOT EXISTS caddy"; 127 128 tableDDL = pkgs.writeText "table.sql" '' 129 CREATE TABLE IF NOT EXISTS caddy.access_logs ( 130 timestamp DateTime64(6), 131 host LowCardinality(String), 132 message String, 133 ) 134 ENGINE = MergeTree() 135 ORDER BY timestamp 136 PARTITION BY toYYYYMM(timestamp) 137 ''; 138 139 tableViewBase = pkgs.writeText "table-view-base.sql" '' 140 CREATE TABLE IF NOT EXISTS caddy.access_logs_view_base ( 141 timestamp DateTime64(6), 142 host LowCardinality(String), 143 request JSON, 144 status UInt16, 145 ) 146 ENGINE = MergeTree() 147 ORDER BY timestamp 148 PARTITION BY toYYYYMM(timestamp) 149 ''; 150 151 tableView = pkgs.writeText "table-view.sql" '' 152 CREATE MATERIALIZED VIEW IF NOT EXISTS caddy.access_logs_view TO caddy.access_logs_view_base 153 AS SELECT 154 timestamp, 155 host, 156 simpleJSONExtractRaw(message, 'request') AS request, 157 simpleJSONExtractRaw(message, 'status') AS status 158 FROM caddy.access_logs; 159 ''; 160 161 selectQuery = pkgs.writeText "select.sql" '' 162 SELECT 163 timestamp, 164 request.host, 165 request.remote_ip, 166 request.proto, 167 request.method, 168 request.uri, 169 status 170 FROM caddy.access_logs_view_base 171 WHERE request.uri LIKE '%test-uri%' 172 FORMAT Pretty 173 ''; 174 in 175 '' 176 clickhouse.wait_for_unit("clickhouse") 177 clickhouse.wait_for_unit("vector") 178 clickhouse.wait_for_open_port(6000) 179 clickhouse.wait_for_open_port(8123) 180 181 clickhouse.succeed( 182 "cat ${databaseDDL} | clickhouse-client", 183 "cat ${tableDDL} | clickhouse-client", 184 "cat ${tableViewBase} | clickhouse-client", 185 "cat ${tableView} | clickhouse-client", 186 ) 187 188 caddy.wait_for_unit("caddy") 189 caddy.wait_for_open_port(80) 190 caddy.wait_for_unit("vector") 191 caddy.wait_until_succeeds( 192 "journalctl -o cat -u vector.service | grep 'Vector has started'" 193 ) 194 195 client.systemctl("start network-online.target") 196 client.wait_until_succeeds("curl http://caddy/test-uri") 197 198 caddy.wait_until_succeeds( 199 "journalctl -o cat -u vector.service | grep 'Found new file to watch. file=/var/log/caddy/access-caddy.log'" 200 ) 201 202 clickhouse.wait_until_succeeds( 203 "cat ${selectQuery} | clickhouse-client | grep test-uri" 204 ) 205 ''; 206}