at master 4.8 kB view raw
1{ lib, pkgs, ... }: 2 3{ 4 name = "vector-nginx-clickhouse"; 5 meta.maintainers = [ pkgs.lib.maintainers.happysalada ]; 6 7 nodes = { 8 clickhouse = 9 { config, pkgs, ... }: 10 { 11 virtualisation.memorySize = 4096; 12 13 # Clickhouse module can't listen on a non-loopback IP. 14 networking.firewall.allowedTCPPorts = [ 6000 ]; 15 services.clickhouse.enable = true; 16 17 # Exercise Vector sink->source for now. 18 services.vector = { 19 enable = true; 20 21 settings = { 22 sources = { 23 vector_source = { 24 type = "vector"; 25 address = "[::]:6000"; 26 }; 27 }; 28 29 sinks = { 30 clickhouse = { 31 type = "clickhouse"; 32 inputs = [ "vector_source" ]; 33 endpoint = "http://localhost:8123"; 34 database = "nginxdb"; 35 table = "access_logs"; 36 skip_unknown_fields = true; 37 }; 38 }; 39 }; 40 }; 41 }; 42 43 nginx = 44 { config, pkgs, ... }: 45 { 46 services.nginx = { 47 enable = true; 48 virtualHosts.localhost = { }; 49 }; 50 51 services.vector = { 52 enable = true; 53 54 settings = { 55 sources = { 56 nginx_logs = { 57 type = "file"; 58 include = [ "/var/log/nginx/access.log" ]; 59 read_from = "end"; 60 }; 61 }; 62 63 sinks = { 64 vector_sink = { 65 type = "vector"; 66 inputs = [ "nginx_logs" ]; 67 address = "clickhouse:6000"; 68 }; 69 }; 70 }; 71 }; 72 73 systemd.services.vector.serviceConfig = { 74 SupplementaryGroups = [ "nginx" ]; 75 }; 76 }; 77 }; 78 79 testScript = 80 let 81 # work around quote/substitution complexity by Nix, Perl, bash and SQL. 82 databaseDDL = pkgs.writeText "database.sql" "CREATE DATABASE IF NOT EXISTS nginxdb"; 83 84 tableDDL = pkgs.writeText "table.sql" '' 85 CREATE TABLE IF NOT EXISTS nginxdb.access_logs ( 86 message String 87 ) 88 ENGINE = MergeTree() 89 ORDER BY tuple() 90 ''; 91 92 # Graciously taken from https://clickhouse.com/docs/en/integrations/vector 93 tableView = pkgs.writeText "table-view.sql" '' 94 CREATE MATERIALIZED VIEW nginxdb.access_logs_view 95 ( 96 RemoteAddr String, 97 Client String, 98 RemoteUser String, 99 TimeLocal DateTime, 100 RequestMethod String, 101 Request String, 102 HttpVersion String, 103 Status Int32, 104 BytesSent Int64, 105 UserAgent String 106 ) 107 ENGINE = MergeTree() 108 ORDER BY RemoteAddr 109 POPULATE AS 110 WITH 111 splitByWhitespace(message) as split, 112 splitByRegexp('\S \d+ "([^"]*)"', message) as referer 113 SELECT 114 split[1] AS RemoteAddr, 115 split[2] AS Client, 116 split[3] AS RemoteUser, 117 parseDateTimeBestEffort(replaceOne(trim(LEADING '[' FROM split[4]), ':', ' ')) AS TimeLocal, 118 trim(LEADING '"' FROM split[6]) AS RequestMethod, 119 split[7] AS Request, 120 trim(TRAILING '"' FROM split[8]) AS HttpVersion, 121 split[9] AS Status, 122 split[10] AS BytesSent, 123 trim(BOTH '"' from referer[2]) AS UserAgent 124 FROM 125 (SELECT message FROM nginxdb.access_logs) 126 ''; 127 128 selectQuery = pkgs.writeText "select.sql" "SELECT * from nginxdb.access_logs_view"; 129 in 130 '' 131 clickhouse.wait_for_unit("clickhouse") 132 clickhouse.wait_for_open_port(8123) 133 134 clickhouse.wait_until_succeeds( 135 "journalctl -o cat -u clickhouse.service | grep 'Started ClickHouse server'" 136 ) 137 138 clickhouse.wait_for_unit("vector") 139 clickhouse.wait_for_open_port(6000) 140 141 clickhouse.succeed( 142 "cat ${databaseDDL} | clickhouse-client" 143 ) 144 145 clickhouse.succeed( 146 "cat ${tableDDL} | clickhouse-client" 147 ) 148 149 clickhouse.succeed( 150 "cat ${tableView} | clickhouse-client" 151 ) 152 153 nginx.wait_for_unit("nginx") 154 nginx.wait_for_open_port(80) 155 nginx.wait_for_unit("vector") 156 nginx.wait_until_succeeds( 157 "journalctl -o cat -u vector.service | grep 'Starting file server'" 158 ) 159 160 nginx.succeed("curl http://localhost/") 161 nginx.succeed("curl http://localhost/") 162 163 nginx.wait_for_file("/var/log/nginx/access.log") 164 nginx.wait_until_succeeds( 165 "journalctl -o cat -u vector.service | grep 'Found new file to watch. file=/var/log/nginx/access.log'" 166 ) 167 168 clickhouse.wait_until_succeeds( 169 "cat ${selectQuery} | clickhouse-client | grep 'curl'" 170 ) 171 ''; 172}