1{ lib, pkgs, ... }:
2
3# Based on https://quickwit.io/docs/log-management/send-logs/using-vector
4
5{
6 name = "vector-syslog-quickwit";
7 meta.maintainers = [ pkgs.lib.maintainers.happysalada ];
8
9 nodes = {
10 quickwit =
11 { config, pkgs, ... }:
12 {
13 environment.systemPackages = [ pkgs.jq ];
14
15 networking.firewall.allowedTCPPorts = [ 7280 ];
16
17 services.quickwit = {
18 enable = true;
19 settings = {
20 listen_address = "::";
21 };
22 };
23 };
24
25 syslog =
26 { config, pkgs, ... }:
27 {
28 services.vector = {
29 enable = true;
30
31 settings = {
32 sources = {
33 generate_syslog = {
34 type = "demo_logs";
35 format = "syslog";
36 interval = 0.5;
37 };
38 };
39
40 transforms = {
41 remap_syslog = {
42 inputs = [ "generate_syslog" ];
43 type = "remap";
44 source = ''
45 structured = parse_syslog!(.message)
46 .timestamp_nanos = to_unix_timestamp!(structured.timestamp, unit: "nanoseconds")
47 .body = structured
48 .service_name = structured.appname
49 .resource_attributes.source_type = .source_type
50 .resource_attributes.host.hostname = structured.hostname
51 .resource_attributes.service.name = structured.appname
52 .attributes.syslog.procid = structured.procid
53 .attributes.syslog.facility = structured.facility
54 .attributes.syslog.version = structured.version
55 .severity_text = if includes(["emerg", "err", "crit", "alert"], structured.severity) {
56 "ERROR"
57 } else if structured.severity == "warning" {
58 "WARN"
59 } else if structured.severity == "debug" {
60 "DEBUG"
61 } else if includes(["info", "notice"], structured.severity) {
62 "INFO"
63 } else {
64 structured.severity
65 }
66 .scope_name = structured.msgid
67 del(.message)
68 del(.host)
69 del(.timestamp)
70 del(.service)
71 del(.source_type)
72 '';
73 };
74 };
75
76 sinks = {
77 #emit_syslog = {
78 # inputs = ["remap_syslog"];
79 # type = "console";
80 # encoding.codec = "json";
81 #};
82 quickwit_logs = {
83 type = "http";
84 method = "post";
85 inputs = [ "remap_syslog" ];
86 encoding.codec = "json";
87 framing.method = "newline_delimited";
88 uri = "http://quickwit:7280/api/v1/otel-logs-v0_7/ingest";
89 };
90 };
91 };
92 };
93 };
94 };
95
96 testScript =
97 let
98 aggregationQuery = pkgs.writeText "aggregation-query.json" ''
99 {
100 "query": "*",
101 "max_hits": 0,
102 "aggs": {
103 "count_per_minute": {
104 "histogram": {
105 "field": "timestamp_nanos",
106 "interval": 60000000
107 },
108 "aggs": {
109 "severity_text_count": {
110 "terms": {
111 "field": "severity_text"
112 }
113 }
114 }
115 }
116 }
117 }
118 '';
119 in
120 ''
121 quickwit.wait_for_unit("quickwit")
122 quickwit.wait_for_open_port(7280)
123 quickwit.wait_for_open_port(7281)
124
125 quickwit.wait_until_succeeds(
126 "journalctl -o cat -u quickwit.service | grep 'transitioned to ready state'"
127 )
128
129 syslog.wait_for_unit("vector")
130 syslog.wait_until_succeeds(
131 "journalctl -o cat -u vector.service | grep 'Vector has started'"
132 )
133
134 quickwit.wait_until_succeeds(
135 "journalctl -o cat -u quickwit.service | grep 'publish-new-splits'"
136 )
137
138 # Wait for logs to be generated
139 # Test below aggregates by the minute
140 syslog.sleep(60 * 2)
141
142 quickwit.wait_until_succeeds(
143 "curl -sSf -XGET http://127.0.0.1:7280/api/v1/otel-logs-v0_7/search?query=severity_text:ERROR |"
144 + " jq '.num_hits' | grep -v '0'"
145 )
146
147 quickwit.wait_until_succeeds(
148 "journalctl -o cat -u quickwit.service | grep 'SearchRequest'"
149 )
150
151 quickwit.wait_until_succeeds(
152 "curl -sSf -XPOST -H 'Content-Type: application/json' http://127.0.0.1:7280/api/v1/otel-logs-v0_7/search --data @${aggregationQuery} |"
153 + " jq '.num_hits' | grep -v '0'"
154 )
155
156 quickwit.wait_until_succeeds(
157 "journalctl -o cat -u quickwit.service | grep 'count_per_minute'"
158 )
159 '';
160}