1{
2 config,
3 lib,
4 pkgs,
5 ...
6}:
7let
8 cfg = config.services.druid;
9 inherit (lib)
10 concatStrings
11 concatStringsSep
12 mapAttrsToList
13 concatMap
14 attrByPath
15 mkIf
16 mkMerge
17 mkEnableOption
18 mkOption
19 types
20 mkPackageOption
21 ;
22
23 druidServiceOption = serviceName: {
24 enable = mkEnableOption serviceName;
25
26 restartIfChanged = mkOption {
27 type = types.bool;
28 description = ''
29 Automatically restart the service on config change.
30 This can be set to false to defer restarts on clusters running critical applications.
31 Please consider the security implications of inadvertently running an older version,
32 and the possibility of unexpected behavior caused by inconsistent versions across a cluster when disabling this option.
33 '';
34 default = false;
35 };
36
37 config = mkOption {
38 default = { };
39 type = types.attrsOf types.anything;
40 description = ''
41 (key=value) Configuration to be written to runtime.properties of the druid ${serviceName}
42 <https://druid.apache.org/docs/latest/configuration/index.html>
43 '';
44 example = {
45 "druid.plainTextPort" = "8082";
46 "druid.service" = "servicename";
47 };
48 };
49
50 jdk = mkPackageOption pkgs "JDK" { default = [ "jdk17_headless" ]; };
51
52 jvmArgs = mkOption {
53 type = types.str;
54 default = "";
55 description = "Arguments to pass to the JVM";
56 };
57
58 openFirewall = mkOption {
59 type = types.bool;
60 default = false;
61 description = "Open firewall ports for ${serviceName}.";
62 };
63
64 internalConfig = mkOption {
65 default = { };
66 type = types.attrsOf types.anything;
67 internal = true;
68 description = "Internal Option to add to runtime.properties for ${serviceName}.";
69 };
70 };
71
72 druidServiceConfig =
73 {
74 name,
75 serviceOptions ? cfg."${name}",
76 allowedTCPPorts ? [ ],
77 tmpDirs ? [ ],
78 extraConfig ? { },
79 }:
80 (mkIf serviceOptions.enable (mkMerge [
81 {
82 systemd = {
83 services."druid-${name}" = {
84 after = [ "network.target" ];
85
86 description = "Druid ${name}";
87
88 wantedBy = [ "multi-user.target" ];
89
90 inherit (serviceOptions) restartIfChanged;
91
92 path = [
93 cfg.package
94 serviceOptions.jdk
95 ];
96
97 script =
98 let
99 cfgFile =
100 fileName: properties:
101 pkgs.writeTextDir fileName (
102 concatStringsSep "\n" (mapAttrsToList (n: v: "${n}=${toString v}") properties)
103 );
104
105 commonConfigFile = cfgFile "common.runtime.properties" cfg.commonConfig;
106
107 configFile = cfgFile "runtime.properties" (serviceOptions.config // serviceOptions.internalConfig);
108
109 extraClassPath = concatStrings (map (path: ":" + path) cfg.extraClassPaths);
110
111 extraConfDir = concatStrings (map (dir: ":" + dir + "/*") cfg.extraConfDirs);
112 in
113 ''
114 run-java -Dlog4j.configurationFile=file:${cfg.log4j} \
115 -Ddruid.extensions.directory=${cfg.package}/extensions \
116 -Ddruid.extensions.hadoopDependenciesDir=${cfg.package}/hadoop-dependencies \
117 -classpath ${commonConfigFile}:${configFile}:${cfg.package}/lib/\*${extraClassPath}${extraConfDir} \
118 ${serviceOptions.jvmArgs} \
119 org.apache.druid.cli.Main server ${name}
120 '';
121
122 serviceConfig = {
123 User = "druid";
124 SyslogIdentifier = "druid-${name}";
125 Restart = "always";
126 };
127 };
128
129 tmpfiles.rules = concatMap (x: [ "d ${x} 0755 druid druid" ]) (cfg.commonTmpDirs ++ tmpDirs);
130 };
131 networking.firewall.allowedTCPPorts = mkIf (attrByPath [
132 "openFirewall"
133 ] false serviceOptions) allowedTCPPorts;
134
135 users = {
136 users.druid = {
137 description = "Druid user";
138 group = "druid";
139 isNormalUser = true;
140 };
141 groups.druid = { };
142 };
143 }
144 extraConfig
145 ]));
146in
147{
148 options.services.druid = {
149 package = mkPackageOption pkgs "apache-druid" { default = [ "druid" ]; };
150
151 commonConfig = mkOption {
152 default = { };
153
154 type = types.attrsOf types.anything;
155
156 description = "(key=value) Configuration to be written to common.runtime.properties";
157
158 example = {
159 "druid.zk.service.host" = "localhost:2181";
160 "druid.metadata.storage.type" = "mysql";
161 "druid.metadata.storage.connector.connectURI" = "jdbc:mysql://localhost:3306/druid";
162 "druid.extensions.loadList" = ''[ "mysql-metadata-storage" ]'';
163 };
164 };
165
166 commonTmpDirs = mkOption {
167 default = [ "/var/log/druid/requests" ];
168 type = types.listOf types.str;
169 description = "Common List of directories used by druid processes";
170 };
171
172 log4j = mkOption {
173 type = types.path;
174 description = "Log4j Configuration for the druid process";
175 };
176
177 extraClassPaths = mkOption {
178 default = [ ];
179 type = types.listOf types.str;
180 description = "Extra classpath to include in the jvm";
181 };
182
183 extraConfDirs = mkOption {
184 default = [ ];
185 type = types.listOf types.path;
186 description = "Extra Conf Dirs to include in the jvm";
187 };
188
189 overlord = druidServiceOption "Druid Overlord";
190
191 coordinator = druidServiceOption "Druid Coordinator";
192
193 broker = druidServiceOption "Druid Broker";
194
195 historical = (druidServiceOption "Druid Historical") // {
196 segmentLocations = mkOption {
197
198 default = null;
199
200 description = "Locations where the historical will store its data.";
201
202 type =
203 with types;
204 nullOr (
205 listOf (submodule {
206 options = {
207 path = mkOption {
208 type = path;
209 description = "the path to store the segments";
210 };
211
212 maxSize = mkOption {
213 type = str;
214 description = "Max size the druid historical can occupy";
215 };
216
217 freeSpacePercent = mkOption {
218 type = float;
219 default = 1.0;
220 description = "Druid Historical will fail to write if it exceeds this value";
221 };
222 };
223 })
224 );
225
226 };
227 };
228
229 middleManager = druidServiceOption "Druid middleManager";
230 router = druidServiceOption "Druid Router";
231 };
232 config = mkMerge [
233 (druidServiceConfig rec {
234 name = "overlord";
235 allowedTCPPorts = [ (attrByPath [ "druid.plaintextPort" ] 8090 cfg."${name}".config) ];
236 })
237
238 (druidServiceConfig rec {
239 name = "coordinator";
240 allowedTCPPorts = [ (attrByPath [ "druid.plaintextPort" ] 8081 cfg."${name}".config) ];
241 })
242
243 (druidServiceConfig rec {
244 name = "broker";
245
246 tmpDirs = [ (attrByPath [ "druid.lookup.snapshotWorkingDir" ] "" cfg."${name}".config) ];
247
248 allowedTCPPorts = [ (attrByPath [ "druid.plaintextPort" ] 8082 cfg."${name}".config) ];
249 })
250
251 (druidServiceConfig rec {
252 name = "historical";
253
254 tmpDirs = [
255 (attrByPath [ "druid.lookup.snapshotWorkingDir" ] "" cfg."${name}".config)
256 ] ++ (map (x: x.path) cfg."${name}".segmentLocations);
257
258 allowedTCPPorts = [ (attrByPath [ "druid.plaintextPort" ] 8083 cfg."${name}".config) ];
259
260 extraConfig.services.druid.historical.internalConfig."druid.segmentCache.locations" =
261 builtins.toJSON cfg.historical.segmentLocations;
262 })
263
264 (druidServiceConfig rec {
265 name = "middleManager";
266
267 tmpDirs = [
268 "/var/log/druid/indexer"
269 ] ++ [ (attrByPath [ "druid.indexer.task.baseTaskDir" ] "" cfg."${name}".config) ];
270
271 allowedTCPPorts = [ (attrByPath [ "druid.plaintextPort" ] 8091 cfg."${name}".config) ];
272
273 extraConfig = {
274 services.druid.middleManager.internalConfig = {
275 "druid.indexer.runner.javaCommand" = "${cfg.middleManager.jdk}/bin/java";
276 "druid.indexer.runner.javaOpts" =
277 (attrByPath [ "druid.indexer.runner.javaOpts" ] "" cfg.middleManager.config)
278 + " -Dlog4j.configurationFile=file:${cfg.log4j}";
279 };
280
281 networking.firewall.allowedTCPPortRanges = mkIf cfg.middleManager.openFirewall [
282 {
283 from = attrByPath [ "druid.indexer.runner.startPort" ] 8100 cfg.middleManager.config;
284 to = attrByPath [ "druid.indexer.runner.endPort" ] 65535 cfg.middleManager.config;
285 }
286 ];
287 };
288 })
289
290 (druidServiceConfig rec {
291 name = "router";
292
293 allowedTCPPorts = [ (attrByPath [ "druid.plaintextPort" ] 8888 cfg."${name}".config) ];
294 })
295 ];
296
297}