at 25.11-pre 7.3 kB view raw
1{ 2 config, 3 lib, 4 pkgs, 5 ... 6}: 7let 8 cfg = config.services.hadoop; 9 10 # Config files for hadoop services 11 hadoopConf = "${import ./conf.nix { inherit cfg pkgs lib; }}/"; 12 13 # Generator for HDFS service options 14 hadoopServiceOption = 15 { 16 serviceName, 17 firewallOption ? true, 18 extraOpts ? null, 19 }: 20 { 21 enable = lib.mkEnableOption serviceName; 22 restartIfChanged = lib.mkOption { 23 type = lib.types.bool; 24 description = '' 25 Automatically restart the service on config change. 26 This can be set to false to defer restarts on clusters running critical applications. 27 Please consider the security implications of inadvertently running an older version, 28 and the possibility of unexpected behavior caused by inconsistent versions across a cluster when disabling this option. 29 ''; 30 default = false; 31 }; 32 extraFlags = lib.mkOption { 33 type = with lib.types; listOf str; 34 default = [ ]; 35 description = "Extra command line flags to pass to ${serviceName}"; 36 example = [ 37 "-Dcom.sun.management.jmxremote" 38 "-Dcom.sun.management.jmxremote.port=8010" 39 ]; 40 }; 41 extraEnv = lib.mkOption { 42 type = with lib.types; attrsOf str; 43 default = { }; 44 description = "Extra environment variables for ${serviceName}"; 45 }; 46 } 47 // (lib.optionalAttrs firewallOption { 48 openFirewall = lib.mkOption { 49 type = lib.types.bool; 50 default = false; 51 description = "Open firewall ports for ${serviceName}."; 52 }; 53 }) 54 // (lib.optionalAttrs (extraOpts != null) extraOpts); 55 56 # Generator for HDFS service configs 57 hadoopServiceConfig = 58 { 59 name, 60 serviceOptions ? cfg.hdfs."${lib.toLower name}", 61 description ? "Hadoop HDFS ${name}", 62 User ? "hdfs", 63 allowedTCPPorts ? [ ], 64 preStart ? "", 65 environment ? { }, 66 extraConfig ? { }, 67 }: 68 ( 69 70 lib.mkIf serviceOptions.enable ( 71 lib.mkMerge [ 72 { 73 systemd.services."hdfs-${lib.toLower name}" = { 74 inherit description preStart; 75 environment = environment // serviceOptions.extraEnv; 76 wantedBy = [ "multi-user.target" ]; 77 inherit (serviceOptions) restartIfChanged; 78 serviceConfig = { 79 inherit User; 80 SyslogIdentifier = "hdfs-${lib.toLower name}"; 81 ExecStart = "${cfg.package}/bin/hdfs --config ${hadoopConf} ${lib.toLower name} ${lib.escapeShellArgs serviceOptions.extraFlags}"; 82 Restart = "always"; 83 }; 84 }; 85 86 services.hadoop.gatewayRole.enable = true; 87 88 networking.firewall.allowedTCPPorts = lib.mkIf ( 89 (builtins.hasAttr "openFirewall" serviceOptions) && serviceOptions.openFirewall 90 ) allowedTCPPorts; 91 } 92 extraConfig 93 ] 94 ) 95 ); 96 97in 98{ 99 options.services.hadoop.hdfs = { 100 101 namenode = hadoopServiceOption { serviceName = "HDFS NameNode"; } // { 102 formatOnInit = lib.mkOption { 103 type = lib.types.bool; 104 default = false; 105 description = '' 106 Format HDFS namenode on first start. This is useful for quickly spinning up 107 ephemeral HDFS clusters with a single namenode. 108 For HA clusters, initialization involves multiple steps across multiple nodes. 109 Follow this guide to initialize an HA cluster manually: 110 <https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html> 111 ''; 112 }; 113 }; 114 115 datanode = hadoopServiceOption { serviceName = "HDFS DataNode"; } // { 116 dataDirs = lib.mkOption { 117 default = null; 118 description = "Tier and path definitions for datanode storage."; 119 type = 120 with lib.types; 121 nullOr ( 122 listOf (submodule { 123 options = { 124 type = lib.mkOption { 125 type = enum [ 126 "SSD" 127 "DISK" 128 "ARCHIVE" 129 "RAM_DISK" 130 ]; 131 description = '' 132 Storage types ([SSD]/[DISK]/[ARCHIVE]/[RAM_DISK]) for HDFS storage policies. 133 ''; 134 }; 135 path = lib.mkOption { 136 type = path; 137 example = [ "/var/lib/hadoop/hdfs/dn" ]; 138 description = "Determines where on the local filesystem a data node should store its blocks."; 139 }; 140 }; 141 }) 142 ); 143 }; 144 }; 145 146 journalnode = hadoopServiceOption { serviceName = "HDFS JournalNode"; }; 147 148 zkfc = hadoopServiceOption { 149 serviceName = "HDFS ZooKeeper failover controller"; 150 firewallOption = false; 151 }; 152 153 httpfs = hadoopServiceOption { serviceName = "HDFS JournalNode"; } // { 154 tempPath = lib.mkOption { 155 type = lib.types.path; 156 default = "/tmp/hadoop/httpfs"; 157 description = "HTTPFS_TEMP path used by HTTPFS"; 158 }; 159 }; 160 161 }; 162 163 config = lib.mkMerge [ 164 (hadoopServiceConfig { 165 name = "NameNode"; 166 allowedTCPPorts = [ 167 9870 # namenode.http-address 168 8020 # namenode.rpc-address 169 8022 # namenode.servicerpc-address 170 8019 # dfs.ha.zkfc.port 171 ]; 172 preStart = ( 173 lib.mkIf cfg.hdfs.namenode.formatOnInit "${cfg.package}/bin/hdfs --config ${hadoopConf} namenode -format -nonInteractive || true" 174 ); 175 }) 176 177 (hadoopServiceConfig { 178 name = "DataNode"; 179 # port numbers for datanode changed between hadoop 2 and 3 180 allowedTCPPorts = 181 if lib.versionAtLeast cfg.package.version "3" then 182 [ 183 9864 # datanode.http.address 184 9866 # datanode.address 185 9867 # datanode.ipc.address 186 ] 187 else 188 [ 189 50075 # datanode.http.address 190 50010 # datanode.address 191 50020 # datanode.ipc.address 192 ]; 193 extraConfig.services.hadoop.hdfsSiteInternal."dfs.datanode.data.dir" = lib.mkIf ( 194 cfg.hdfs.datanode.dataDirs != null 195 ) (lib.concatMapStringsSep "," (x: "[" + x.type + "]file://" + x.path) cfg.hdfs.datanode.dataDirs); 196 }) 197 198 (hadoopServiceConfig { 199 name = "JournalNode"; 200 allowedTCPPorts = [ 201 8480 # dfs.journalnode.http-address 202 8485 # dfs.journalnode.rpc-address 203 ]; 204 }) 205 206 (hadoopServiceConfig { 207 name = "zkfc"; 208 description = "Hadoop HDFS ZooKeeper failover controller"; 209 }) 210 211 (hadoopServiceConfig { 212 name = "HTTPFS"; 213 environment.HTTPFS_TEMP = cfg.hdfs.httpfs.tempPath; 214 preStart = "mkdir -p $HTTPFS_TEMP"; 215 User = "httpfs"; 216 allowedTCPPorts = [ 217 14000 # httpfs.http.port 218 ]; 219 }) 220 221 (lib.mkIf cfg.gatewayRole.enable { 222 users.users.hdfs = { 223 description = "Hadoop HDFS user"; 224 group = "hadoop"; 225 uid = config.ids.uids.hdfs; 226 }; 227 }) 228 (lib.mkIf cfg.hdfs.httpfs.enable { 229 users.users.httpfs = { 230 description = "Hadoop HTTPFS user"; 231 group = "hadoop"; 232 isSystemUser = true; 233 }; 234 }) 235 236 ]; 237}