at 25.11-pre 5.2 kB view raw
1import ../make-test-python.nix ( 2 { lib, pkgs, ... }: 3 4 # Based on https://quickwit.io/docs/log-management/send-logs/using-vector 5 6 { 7 name = "vector-syslog-quickwit"; 8 meta.maintainers = [ pkgs.lib.maintainers.happysalada ]; 9 10 nodes = { 11 quickwit = 12 { config, pkgs, ... }: 13 { 14 environment.systemPackages = [ pkgs.jq ]; 15 16 networking.firewall.allowedTCPPorts = [ 7280 ]; 17 18 services.quickwit = { 19 enable = true; 20 settings = { 21 listen_address = "::"; 22 }; 23 }; 24 }; 25 26 syslog = 27 { config, pkgs, ... }: 28 { 29 services.vector = { 30 enable = true; 31 32 settings = { 33 sources = { 34 generate_syslog = { 35 type = "demo_logs"; 36 format = "syslog"; 37 interval = 0.5; 38 }; 39 }; 40 41 transforms = { 42 remap_syslog = { 43 inputs = [ "generate_syslog" ]; 44 type = "remap"; 45 source = '' 46 structured = parse_syslog!(.message) 47 .timestamp_nanos = to_unix_timestamp!(structured.timestamp, unit: "nanoseconds") 48 .body = structured 49 .service_name = structured.appname 50 .resource_attributes.source_type = .source_type 51 .resource_attributes.host.hostname = structured.hostname 52 .resource_attributes.service.name = structured.appname 53 .attributes.syslog.procid = structured.procid 54 .attributes.syslog.facility = structured.facility 55 .attributes.syslog.version = structured.version 56 .severity_text = if includes(["emerg", "err", "crit", "alert"], structured.severity) { 57 "ERROR" 58 } else if structured.severity == "warning" { 59 "WARN" 60 } else if structured.severity == "debug" { 61 "DEBUG" 62 } else if includes(["info", "notice"], structured.severity) { 63 "INFO" 64 } else { 65 structured.severity 66 } 67 .scope_name = structured.msgid 68 del(.message) 69 del(.host) 70 del(.timestamp) 71 del(.service) 72 del(.source_type) 73 ''; 74 }; 75 }; 76 77 sinks = { 78 #emit_syslog = { 79 # inputs = ["remap_syslog"]; 80 # type = "console"; 81 # encoding.codec = "json"; 82 #}; 83 quickwit_logs = { 84 type = "http"; 85 method = "post"; 86 inputs = [ "remap_syslog" ]; 87 encoding.codec = "json"; 88 framing.method = "newline_delimited"; 89 uri = "http://quickwit:7280/api/v1/otel-logs-v0_7/ingest"; 90 }; 91 }; 92 }; 93 }; 94 }; 95 }; 96 97 testScript = 98 let 99 aggregationQuery = pkgs.writeText "aggregation-query.json" '' 100 { 101 "query": "*", 102 "max_hits": 0, 103 "aggs": { 104 "count_per_minute": { 105 "histogram": { 106 "field": "timestamp_nanos", 107 "interval": 60000000 108 }, 109 "aggs": { 110 "severity_text_count": { 111 "terms": { 112 "field": "severity_text" 113 } 114 } 115 } 116 } 117 } 118 } 119 ''; 120 in 121 '' 122 quickwit.wait_for_unit("quickwit") 123 quickwit.wait_for_open_port(7280) 124 quickwit.wait_for_open_port(7281) 125 126 quickwit.wait_until_succeeds( 127 "journalctl -o cat -u quickwit.service | grep 'transitioned to ready state'" 128 ) 129 130 syslog.wait_for_unit("vector") 131 syslog.wait_until_succeeds( 132 "journalctl -o cat -u vector.service | grep 'Vector has started'" 133 ) 134 135 quickwit.wait_until_succeeds( 136 "journalctl -o cat -u quickwit.service | grep 'publish-new-splits'" 137 ) 138 139 # Wait for logs to be generated 140 # Test below aggregates by the minute 141 syslog.sleep(60 * 2) 142 143 quickwit.wait_until_succeeds( 144 "curl -sSf -XGET http://127.0.0.1:7280/api/v1/otel-logs-v0_7/search?query=severity_text:ERROR |" 145 + " jq '.num_hits' | grep -v '0'" 146 ) 147 148 quickwit.wait_until_succeeds( 149 "journalctl -o cat -u quickwit.service | grep 'SearchRequest'" 150 ) 151 152 quickwit.wait_until_succeeds( 153 "curl -sSf -XPOST -H 'Content-Type: application/json' http://127.0.0.1:7280/api/v1/otel-logs-v0_7/search --data @${aggregationQuery} |" 154 + " jq '.num_hits' | grep -v '0'" 155 ) 156 157 quickwit.wait_until_succeeds( 158 "journalctl -o cat -u quickwit.service | grep 'count_per_minute'" 159 ) 160 ''; 161 } 162)