at 25.11-pre 8.9 kB view raw
1{ 2 config, 3 lib, 4 pkgs, 5 ... 6}: 7let 8 cfg = config.services.druid; 9 inherit (lib) 10 concatStrings 11 concatStringsSep 12 mapAttrsToList 13 concatMap 14 attrByPath 15 mkIf 16 mkMerge 17 mkEnableOption 18 mkOption 19 types 20 mkPackageOption 21 ; 22 23 druidServiceOption = serviceName: { 24 enable = mkEnableOption serviceName; 25 26 restartIfChanged = mkOption { 27 type = types.bool; 28 description = '' 29 Automatically restart the service on config change. 30 This can be set to false to defer restarts on clusters running critical applications. 31 Please consider the security implications of inadvertently running an older version, 32 and the possibility of unexpected behavior caused by inconsistent versions across a cluster when disabling this option. 33 ''; 34 default = false; 35 }; 36 37 config = mkOption { 38 default = { }; 39 type = types.attrsOf types.anything; 40 description = '' 41 (key=value) Configuration to be written to runtime.properties of the druid ${serviceName} 42 <https://druid.apache.org/docs/latest/configuration/index.html> 43 ''; 44 example = { 45 "druid.plainTextPort" = "8082"; 46 "druid.service" = "servicename"; 47 }; 48 }; 49 50 jdk = mkPackageOption pkgs "JDK" { default = [ "jdk17_headless" ]; }; 51 52 jvmArgs = mkOption { 53 type = types.str; 54 default = ""; 55 description = "Arguments to pass to the JVM"; 56 }; 57 58 openFirewall = mkOption { 59 type = types.bool; 60 default = false; 61 description = "Open firewall ports for ${serviceName}."; 62 }; 63 64 internalConfig = mkOption { 65 default = { }; 66 type = types.attrsOf types.anything; 67 internal = true; 68 description = "Internal Option to add to runtime.properties for ${serviceName}."; 69 }; 70 }; 71 72 druidServiceConfig = 73 { 74 name, 75 serviceOptions ? cfg."${name}", 76 allowedTCPPorts ? [ ], 77 tmpDirs ? [ ], 78 extraConfig ? { }, 79 }: 80 (mkIf serviceOptions.enable (mkMerge [ 81 { 82 systemd = { 83 services."druid-${name}" = { 84 after = [ "network.target" ]; 85 86 description = "Druid ${name}"; 87 88 wantedBy = [ "multi-user.target" ]; 89 90 inherit (serviceOptions) restartIfChanged; 91 92 path = [ 93 cfg.package 94 serviceOptions.jdk 95 ]; 96 97 script = 98 let 99 cfgFile = 100 fileName: properties: 101 pkgs.writeTextDir fileName ( 102 concatStringsSep "\n" (mapAttrsToList (n: v: "${n}=${toString v}") properties) 103 ); 104 105 commonConfigFile = cfgFile "common.runtime.properties" cfg.commonConfig; 106 107 configFile = cfgFile "runtime.properties" (serviceOptions.config // serviceOptions.internalConfig); 108 109 extraClassPath = concatStrings (map (path: ":" + path) cfg.extraClassPaths); 110 111 extraConfDir = concatStrings (map (dir: ":" + dir + "/*") cfg.extraConfDirs); 112 in 113 '' 114 run-java -Dlog4j.configurationFile=file:${cfg.log4j} \ 115 -Ddruid.extensions.directory=${cfg.package}/extensions \ 116 -Ddruid.extensions.hadoopDependenciesDir=${cfg.package}/hadoop-dependencies \ 117 -classpath ${commonConfigFile}:${configFile}:${cfg.package}/lib/\*${extraClassPath}${extraConfDir} \ 118 ${serviceOptions.jvmArgs} \ 119 org.apache.druid.cli.Main server ${name} 120 ''; 121 122 serviceConfig = { 123 User = "druid"; 124 SyslogIdentifier = "druid-${name}"; 125 Restart = "always"; 126 }; 127 }; 128 129 tmpfiles.rules = concatMap (x: [ "d ${x} 0755 druid druid" ]) (cfg.commonTmpDirs ++ tmpDirs); 130 }; 131 networking.firewall.allowedTCPPorts = mkIf (attrByPath [ 132 "openFirewall" 133 ] false serviceOptions) allowedTCPPorts; 134 135 users = { 136 users.druid = { 137 description = "Druid user"; 138 group = "druid"; 139 isNormalUser = true; 140 }; 141 groups.druid = { }; 142 }; 143 } 144 extraConfig 145 ])); 146in 147{ 148 options.services.druid = { 149 package = mkPackageOption pkgs "apache-druid" { default = [ "druid" ]; }; 150 151 commonConfig = mkOption { 152 default = { }; 153 154 type = types.attrsOf types.anything; 155 156 description = "(key=value) Configuration to be written to common.runtime.properties"; 157 158 example = { 159 "druid.zk.service.host" = "localhost:2181"; 160 "druid.metadata.storage.type" = "mysql"; 161 "druid.metadata.storage.connector.connectURI" = "jdbc:mysql://localhost:3306/druid"; 162 "druid.extensions.loadList" = ''[ "mysql-metadata-storage" ]''; 163 }; 164 }; 165 166 commonTmpDirs = mkOption { 167 default = [ "/var/log/druid/requests" ]; 168 type = types.listOf types.str; 169 description = "Common List of directories used by druid processes"; 170 }; 171 172 log4j = mkOption { 173 type = types.path; 174 description = "Log4j Configuration for the druid process"; 175 }; 176 177 extraClassPaths = mkOption { 178 default = [ ]; 179 type = types.listOf types.str; 180 description = "Extra classpath to include in the jvm"; 181 }; 182 183 extraConfDirs = mkOption { 184 default = [ ]; 185 type = types.listOf types.path; 186 description = "Extra Conf Dirs to include in the jvm"; 187 }; 188 189 overlord = druidServiceOption "Druid Overlord"; 190 191 coordinator = druidServiceOption "Druid Coordinator"; 192 193 broker = druidServiceOption "Druid Broker"; 194 195 historical = (druidServiceOption "Druid Historical") // { 196 segmentLocations = mkOption { 197 198 default = null; 199 200 description = "Locations where the historical will store its data."; 201 202 type = 203 with types; 204 nullOr ( 205 listOf (submodule { 206 options = { 207 path = mkOption { 208 type = path; 209 description = "the path to store the segments"; 210 }; 211 212 maxSize = mkOption { 213 type = str; 214 description = "Max size the druid historical can occupy"; 215 }; 216 217 freeSpacePercent = mkOption { 218 type = float; 219 default = 1.0; 220 description = "Druid Historical will fail to write if it exceeds this value"; 221 }; 222 }; 223 }) 224 ); 225 226 }; 227 }; 228 229 middleManager = druidServiceOption "Druid middleManager"; 230 router = druidServiceOption "Druid Router"; 231 }; 232 config = mkMerge [ 233 (druidServiceConfig rec { 234 name = "overlord"; 235 allowedTCPPorts = [ (attrByPath [ "druid.plaintextPort" ] 8090 cfg."${name}".config) ]; 236 }) 237 238 (druidServiceConfig rec { 239 name = "coordinator"; 240 allowedTCPPorts = [ (attrByPath [ "druid.plaintextPort" ] 8081 cfg."${name}".config) ]; 241 }) 242 243 (druidServiceConfig rec { 244 name = "broker"; 245 246 tmpDirs = [ (attrByPath [ "druid.lookup.snapshotWorkingDir" ] "" cfg."${name}".config) ]; 247 248 allowedTCPPorts = [ (attrByPath [ "druid.plaintextPort" ] 8082 cfg."${name}".config) ]; 249 }) 250 251 (druidServiceConfig rec { 252 name = "historical"; 253 254 tmpDirs = [ 255 (attrByPath [ "druid.lookup.snapshotWorkingDir" ] "" cfg."${name}".config) 256 ] ++ (map (x: x.path) cfg."${name}".segmentLocations); 257 258 allowedTCPPorts = [ (attrByPath [ "druid.plaintextPort" ] 8083 cfg."${name}".config) ]; 259 260 extraConfig.services.druid.historical.internalConfig."druid.segmentCache.locations" = 261 builtins.toJSON cfg.historical.segmentLocations; 262 }) 263 264 (druidServiceConfig rec { 265 name = "middleManager"; 266 267 tmpDirs = [ 268 "/var/log/druid/indexer" 269 ] ++ [ (attrByPath [ "druid.indexer.task.baseTaskDir" ] "" cfg."${name}".config) ]; 270 271 allowedTCPPorts = [ (attrByPath [ "druid.plaintextPort" ] 8091 cfg."${name}".config) ]; 272 273 extraConfig = { 274 services.druid.middleManager.internalConfig = { 275 "druid.indexer.runner.javaCommand" = "${cfg.middleManager.jdk}/bin/java"; 276 "druid.indexer.runner.javaOpts" = 277 (attrByPath [ "druid.indexer.runner.javaOpts" ] "" cfg.middleManager.config) 278 + " -Dlog4j.configurationFile=file:${cfg.log4j}"; 279 }; 280 281 networking.firewall.allowedTCPPortRanges = mkIf cfg.middleManager.openFirewall [ 282 { 283 from = attrByPath [ "druid.indexer.runner.startPort" ] 8100 cfg.middleManager.config; 284 to = attrByPath [ "druid.indexer.runner.endPort" ] 65535 cfg.middleManager.config; 285 } 286 ]; 287 }; 288 }) 289 290 (druidServiceConfig rec { 291 name = "router"; 292 293 allowedTCPPorts = [ (attrByPath [ "druid.plaintextPort" ] 8888 cfg."${name}".config) ]; 294 }) 295 ]; 296 297}