1import ../make-test-python.nix (
2 { lib, pkgs, ... }:
3
4 # Based on https://quickwit.io/docs/log-management/send-logs/using-vector
5
6 {
7 name = "vector-syslog-quickwit";
8 meta.maintainers = [ pkgs.lib.maintainers.happysalada ];
9
10 nodes = {
11 quickwit =
12 { config, pkgs, ... }:
13 {
14 environment.systemPackages = [ pkgs.jq ];
15
16 networking.firewall.allowedTCPPorts = [ 7280 ];
17
18 services.quickwit = {
19 enable = true;
20 settings = {
21 listen_address = "::";
22 };
23 };
24 };
25
26 syslog =
27 { config, pkgs, ... }:
28 {
29 services.vector = {
30 enable = true;
31
32 settings = {
33 sources = {
34 generate_syslog = {
35 type = "demo_logs";
36 format = "syslog";
37 interval = 0.5;
38 };
39 };
40
41 transforms = {
42 remap_syslog = {
43 inputs = [ "generate_syslog" ];
44 type = "remap";
45 source = ''
46 structured = parse_syslog!(.message)
47 .timestamp_nanos = to_unix_timestamp!(structured.timestamp, unit: "nanoseconds")
48 .body = structured
49 .service_name = structured.appname
50 .resource_attributes.source_type = .source_type
51 .resource_attributes.host.hostname = structured.hostname
52 .resource_attributes.service.name = structured.appname
53 .attributes.syslog.procid = structured.procid
54 .attributes.syslog.facility = structured.facility
55 .attributes.syslog.version = structured.version
56 .severity_text = if includes(["emerg", "err", "crit", "alert"], structured.severity) {
57 "ERROR"
58 } else if structured.severity == "warning" {
59 "WARN"
60 } else if structured.severity == "debug" {
61 "DEBUG"
62 } else if includes(["info", "notice"], structured.severity) {
63 "INFO"
64 } else {
65 structured.severity
66 }
67 .scope_name = structured.msgid
68 del(.message)
69 del(.host)
70 del(.timestamp)
71 del(.service)
72 del(.source_type)
73 '';
74 };
75 };
76
77 sinks = {
78 #emit_syslog = {
79 # inputs = ["remap_syslog"];
80 # type = "console";
81 # encoding.codec = "json";
82 #};
83 quickwit_logs = {
84 type = "http";
85 method = "post";
86 inputs = [ "remap_syslog" ];
87 encoding.codec = "json";
88 framing.method = "newline_delimited";
89 uri = "http://quickwit:7280/api/v1/otel-logs-v0_7/ingest";
90 };
91 };
92 };
93 };
94 };
95 };
96
97 testScript =
98 let
99 aggregationQuery = pkgs.writeText "aggregation-query.json" ''
100 {
101 "query": "*",
102 "max_hits": 0,
103 "aggs": {
104 "count_per_minute": {
105 "histogram": {
106 "field": "timestamp_nanos",
107 "interval": 60000000
108 },
109 "aggs": {
110 "severity_text_count": {
111 "terms": {
112 "field": "severity_text"
113 }
114 }
115 }
116 }
117 }
118 }
119 '';
120 in
121 ''
122 quickwit.wait_for_unit("quickwit")
123 quickwit.wait_for_open_port(7280)
124 quickwit.wait_for_open_port(7281)
125
126 quickwit.wait_until_succeeds(
127 "journalctl -o cat -u quickwit.service | grep 'transitioned to ready state'"
128 )
129
130 syslog.wait_for_unit("vector")
131 syslog.wait_until_succeeds(
132 "journalctl -o cat -u vector.service | grep 'Vector has started'"
133 )
134
135 quickwit.wait_until_succeeds(
136 "journalctl -o cat -u quickwit.service | grep 'publish-new-splits'"
137 )
138
139 # Wait for logs to be generated
140 # Test below aggregates by the minute
141 syslog.sleep(60 * 2)
142
143 quickwit.wait_until_succeeds(
144 "curl -sSf -XGET http://127.0.0.1:7280/api/v1/otel-logs-v0_7/search?query=severity_text:ERROR |"
145 + " jq '.num_hits' | grep -v '0'"
146 )
147
148 quickwit.wait_until_succeeds(
149 "journalctl -o cat -u quickwit.service | grep 'SearchRequest'"
150 )
151
152 quickwit.wait_until_succeeds(
153 "curl -sSf -XPOST -H 'Content-Type: application/json' http://127.0.0.1:7280/api/v1/otel-logs-v0_7/search --data @${aggregationQuery} |"
154 + " jq '.num_hits' | grep -v '0'"
155 )
156
157 quickwit.wait_until_succeeds(
158 "journalctl -o cat -u quickwit.service | grep 'count_per_minute'"
159 )
160 '';
161 }
162)