at 23.11-pre 6.7 kB view raw
1{ config, lib, pkgs, ... }: 2with lib; 3let 4 cfg = config.services.hadoop; 5 6 # Config files for hadoop services 7 hadoopConf = "${import ./conf.nix { inherit cfg pkgs lib; }}/"; 8 9 # Generator for HDFS service options 10 hadoopServiceOption = { serviceName, firewallOption ? true, extraOpts ? null }: { 11 enable = mkEnableOption (lib.mdDoc serviceName); 12 restartIfChanged = mkOption { 13 type = types.bool; 14 description = lib.mdDoc '' 15 Automatically restart the service on config change. 16 This can be set to false to defer restarts on clusters running critical applications. 17 Please consider the security implications of inadvertently running an older version, 18 and the possibility of unexpected behavior caused by inconsistent versions across a cluster when disabling this option. 19 ''; 20 default = false; 21 }; 22 extraFlags = mkOption{ 23 type = with types; listOf str; 24 default = []; 25 description = lib.mdDoc "Extra command line flags to pass to ${serviceName}"; 26 example = [ 27 "-Dcom.sun.management.jmxremote" 28 "-Dcom.sun.management.jmxremote.port=8010" 29 ]; 30 }; 31 extraEnv = mkOption{ 32 type = with types; attrsOf str; 33 default = {}; 34 description = lib.mdDoc "Extra environment variables for ${serviceName}"; 35 }; 36 } // (optionalAttrs firewallOption { 37 openFirewall = mkOption { 38 type = types.bool; 39 default = false; 40 description = lib.mdDoc "Open firewall ports for ${serviceName}."; 41 }; 42 }) // (optionalAttrs (extraOpts != null) extraOpts); 43 44 # Generator for HDFS service configs 45 hadoopServiceConfig = 46 { name 47 , serviceOptions ? cfg.hdfs."${toLower name}" 48 , description ? "Hadoop HDFS ${name}" 49 , User ? "hdfs" 50 , allowedTCPPorts ? [ ] 51 , preStart ? "" 52 , environment ? { } 53 , extraConfig ? { } 54 }: ( 55 56 mkIf serviceOptions.enable ( mkMerge [{ 57 systemd.services."hdfs-${toLower name}" = { 58 inherit description preStart; 59 environment = environment // serviceOptions.extraEnv; 60 wantedBy = [ "multi-user.target" ]; 61 inherit (serviceOptions) restartIfChanged; 62 serviceConfig = { 63 inherit User; 64 SyslogIdentifier = "hdfs-${toLower name}"; 65 ExecStart = "${cfg.package}/bin/hdfs --config ${hadoopConf} ${toLower name} ${escapeShellArgs serviceOptions.extraFlags}"; 66 Restart = "always"; 67 }; 68 }; 69 70 services.hadoop.gatewayRole.enable = true; 71 72 networking.firewall.allowedTCPPorts = mkIf 73 ((builtins.hasAttr "openFirewall" serviceOptions) && serviceOptions.openFirewall) 74 allowedTCPPorts; 75 } extraConfig]) 76 ); 77 78in 79{ 80 options.services.hadoop.hdfs = { 81 82 namenode = hadoopServiceOption { serviceName = "HDFS NameNode"; } // { 83 formatOnInit = mkOption { 84 type = types.bool; 85 default = false; 86 description = lib.mdDoc '' 87 Format HDFS namenode on first start. This is useful for quickly spinning up 88 ephemeral HDFS clusters with a single namenode. 89 For HA clusters, initialization involves multiple steps across multiple nodes. 90 Follow this guide to initialize an HA cluster manually: 91 <https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html> 92 ''; 93 }; 94 }; 95 96 datanode = hadoopServiceOption { serviceName = "HDFS DataNode"; } // { 97 dataDirs = mkOption { 98 default = null; 99 description = lib.mdDoc "Tier and path definitions for datanode storage."; 100 type = with types; nullOr (listOf (submodule { 101 options = { 102 type = mkOption { 103 type = enum [ "SSD" "DISK" "ARCHIVE" "RAM_DISK" ]; 104 description = lib.mdDoc '' 105 Storage types ([SSD]/[DISK]/[ARCHIVE]/[RAM_DISK]) for HDFS storage policies. 106 ''; 107 }; 108 path = mkOption { 109 type = path; 110 example = [ "/var/lib/hadoop/hdfs/dn" ]; 111 description = lib.mdDoc "Determines where on the local filesystem a data node should store its blocks."; 112 }; 113 }; 114 })); 115 }; 116 }; 117 118 journalnode = hadoopServiceOption { serviceName = "HDFS JournalNode"; }; 119 120 zkfc = hadoopServiceOption { 121 serviceName = "HDFS ZooKeeper failover controller"; 122 firewallOption = false; 123 }; 124 125 httpfs = hadoopServiceOption { serviceName = "HDFS JournalNode"; } // { 126 tempPath = mkOption { 127 type = types.path; 128 default = "/tmp/hadoop/httpfs"; 129 description = lib.mdDoc "HTTPFS_TEMP path used by HTTPFS"; 130 }; 131 }; 132 133 }; 134 135 config = mkMerge [ 136 (hadoopServiceConfig { 137 name = "NameNode"; 138 allowedTCPPorts = [ 139 9870 # namenode.http-address 140 8020 # namenode.rpc-address 141 8022 # namenode.servicerpc-address 142 8019 # dfs.ha.zkfc.port 143 ]; 144 preStart = (mkIf cfg.hdfs.namenode.formatOnInit 145 "${cfg.package}/bin/hdfs --config ${hadoopConf} namenode -format -nonInteractive || true" 146 ); 147 }) 148 149 (hadoopServiceConfig { 150 name = "DataNode"; 151 # port numbers for datanode changed between hadoop 2 and 3 152 allowedTCPPorts = if versionAtLeast cfg.package.version "3" then [ 153 9864 # datanode.http.address 154 9866 # datanode.address 155 9867 # datanode.ipc.address 156 ] else [ 157 50075 # datanode.http.address 158 50010 # datanode.address 159 50020 # datanode.ipc.address 160 ]; 161 extraConfig.services.hadoop.hdfsSiteInternal."dfs.datanode.data.dir" = mkIf (cfg.hdfs.datanode.dataDirs!= null) 162 (concatMapStringsSep "," (x: "["+x.type+"]file://"+x.path) cfg.hdfs.datanode.dataDirs); 163 }) 164 165 (hadoopServiceConfig { 166 name = "JournalNode"; 167 allowedTCPPorts = [ 168 8480 # dfs.journalnode.http-address 169 8485 # dfs.journalnode.rpc-address 170 ]; 171 }) 172 173 (hadoopServiceConfig { 174 name = "zkfc"; 175 description = "Hadoop HDFS ZooKeeper failover controller"; 176 }) 177 178 (hadoopServiceConfig { 179 name = "HTTPFS"; 180 environment.HTTPFS_TEMP = cfg.hdfs.httpfs.tempPath; 181 preStart = "mkdir -p $HTTPFS_TEMP"; 182 User = "httpfs"; 183 allowedTCPPorts = [ 184 14000 # httpfs.http.port 185 ]; 186 }) 187 188 (mkIf cfg.gatewayRole.enable { 189 users.users.hdfs = { 190 description = "Hadoop HDFS user"; 191 group = "hadoop"; 192 uid = config.ids.uids.hdfs; 193 }; 194 }) 195 (mkIf cfg.hdfs.httpfs.enable { 196 users.users.httpfs = { 197 description = "Hadoop HTTPFS user"; 198 group = "hadoop"; 199 isSystemUser = true; 200 }; 201 }) 202 203 ]; 204}