at master 4.9 kB view raw
1{ lib, pkgs, ... }: 2 3# Based on https://quickwit.io/docs/log-management/send-logs/using-vector 4 5{ 6 name = "vector-syslog-quickwit"; 7 meta.maintainers = [ pkgs.lib.maintainers.happysalada ]; 8 9 nodes = { 10 quickwit = 11 { config, pkgs, ... }: 12 { 13 environment.systemPackages = [ pkgs.jq ]; 14 15 networking.firewall.allowedTCPPorts = [ 7280 ]; 16 17 services.quickwit = { 18 enable = true; 19 settings = { 20 listen_address = "::"; 21 }; 22 }; 23 }; 24 25 syslog = 26 { config, pkgs, ... }: 27 { 28 services.vector = { 29 enable = true; 30 31 settings = { 32 sources = { 33 generate_syslog = { 34 type = "demo_logs"; 35 format = "syslog"; 36 interval = 0.5; 37 }; 38 }; 39 40 transforms = { 41 remap_syslog = { 42 inputs = [ "generate_syslog" ]; 43 type = "remap"; 44 source = '' 45 structured = parse_syslog!(.message) 46 .timestamp_nanos = to_unix_timestamp!(structured.timestamp, unit: "nanoseconds") 47 .body = structured 48 .service_name = structured.appname 49 .resource_attributes.source_type = .source_type 50 .resource_attributes.host.hostname = structured.hostname 51 .resource_attributes.service.name = structured.appname 52 .attributes.syslog.procid = structured.procid 53 .attributes.syslog.facility = structured.facility 54 .attributes.syslog.version = structured.version 55 .severity_text = if includes(["emerg", "err", "crit", "alert"], structured.severity) { 56 "ERROR" 57 } else if structured.severity == "warning" { 58 "WARN" 59 } else if structured.severity == "debug" { 60 "DEBUG" 61 } else if includes(["info", "notice"], structured.severity) { 62 "INFO" 63 } else { 64 structured.severity 65 } 66 .scope_name = structured.msgid 67 del(.message) 68 del(.host) 69 del(.timestamp) 70 del(.service) 71 del(.source_type) 72 ''; 73 }; 74 }; 75 76 sinks = { 77 #emit_syslog = { 78 # inputs = ["remap_syslog"]; 79 # type = "console"; 80 # encoding.codec = "json"; 81 #}; 82 quickwit_logs = { 83 type = "http"; 84 method = "post"; 85 inputs = [ "remap_syslog" ]; 86 encoding.codec = "json"; 87 framing.method = "newline_delimited"; 88 uri = "http://quickwit:7280/api/v1/otel-logs-v0_7/ingest"; 89 }; 90 }; 91 }; 92 }; 93 }; 94 }; 95 96 testScript = 97 let 98 aggregationQuery = pkgs.writeText "aggregation-query.json" '' 99 { 100 "query": "*", 101 "max_hits": 0, 102 "aggs": { 103 "count_per_minute": { 104 "histogram": { 105 "field": "timestamp_nanos", 106 "interval": 60000000 107 }, 108 "aggs": { 109 "severity_text_count": { 110 "terms": { 111 "field": "severity_text" 112 } 113 } 114 } 115 } 116 } 117 } 118 ''; 119 in 120 '' 121 quickwit.wait_for_unit("quickwit") 122 quickwit.wait_for_open_port(7280) 123 quickwit.wait_for_open_port(7281) 124 125 quickwit.wait_until_succeeds( 126 "journalctl -o cat -u quickwit.service | grep 'transitioned to ready state'" 127 ) 128 129 syslog.wait_for_unit("vector") 130 syslog.wait_until_succeeds( 131 "journalctl -o cat -u vector.service | grep 'Vector has started'" 132 ) 133 134 quickwit.wait_until_succeeds( 135 "journalctl -o cat -u quickwit.service | grep 'publish-new-splits'" 136 ) 137 138 # Wait for logs to be generated 139 # Test below aggregates by the minute 140 syslog.sleep(60 * 2) 141 142 quickwit.wait_until_succeeds( 143 "curl -sSf -XGET http://127.0.0.1:7280/api/v1/otel-logs-v0_7/search?query=severity_text:ERROR |" 144 + " jq '.num_hits' | grep -v '0'" 145 ) 146 147 quickwit.wait_until_succeeds( 148 "journalctl -o cat -u quickwit.service | grep 'SearchRequest'" 149 ) 150 151 quickwit.wait_until_succeeds( 152 "curl -sSf -XPOST -H 'Content-Type: application/json' http://127.0.0.1:7280/api/v1/otel-logs-v0_7/search --data @${aggregationQuery} |" 153 + " jq '.num_hits' | grep -v '0'" 154 ) 155 156 quickwit.wait_until_succeeds( 157 "journalctl -o cat -u quickwit.service | grep 'count_per_minute'" 158 ) 159 ''; 160}