1{
2 config,
3 lib,
4 pkgs,
5 ...
6}:
7let
8 cfg = config.services.hadoop;
9
10 # Config files for hadoop services
11 hadoopConf = "${import ./conf.nix { inherit cfg pkgs lib; }}/";
12
13 # Generator for HDFS service options
14 hadoopServiceOption =
15 {
16 serviceName,
17 firewallOption ? true,
18 extraOpts ? null,
19 }:
20 {
21 enable = lib.mkEnableOption serviceName;
22 restartIfChanged = lib.mkOption {
23 type = lib.types.bool;
24 description = ''
25 Automatically restart the service on config change.
26 This can be set to false to defer restarts on clusters running critical applications.
27 Please consider the security implications of inadvertently running an older version,
28 and the possibility of unexpected behavior caused by inconsistent versions across a cluster when disabling this option.
29 '';
30 default = false;
31 };
32 extraFlags = lib.mkOption {
33 type = with lib.types; listOf str;
34 default = [ ];
35 description = "Extra command line flags to pass to ${serviceName}";
36 example = [
37 "-Dcom.sun.management.jmxremote"
38 "-Dcom.sun.management.jmxremote.port=8010"
39 ];
40 };
41 extraEnv = lib.mkOption {
42 type = with lib.types; attrsOf str;
43 default = { };
44 description = "Extra environment variables for ${serviceName}";
45 };
46 }
47 // (lib.optionalAttrs firewallOption {
48 openFirewall = lib.mkOption {
49 type = lib.types.bool;
50 default = false;
51 description = "Open firewall ports for ${serviceName}.";
52 };
53 })
54 // (lib.optionalAttrs (extraOpts != null) extraOpts);
55
56 # Generator for HDFS service configs
57 hadoopServiceConfig =
58 {
59 name,
60 serviceOptions ? cfg.hdfs."${lib.toLower name}",
61 description ? "Hadoop HDFS ${name}",
62 User ? "hdfs",
63 allowedTCPPorts ? [ ],
64 preStart ? "",
65 environment ? { },
66 extraConfig ? { },
67 }:
68 (
69
70 lib.mkIf serviceOptions.enable (
71 lib.mkMerge [
72 {
73 systemd.services."hdfs-${lib.toLower name}" = {
74 inherit description preStart;
75 environment = environment // serviceOptions.extraEnv;
76 wantedBy = [ "multi-user.target" ];
77 inherit (serviceOptions) restartIfChanged;
78 serviceConfig = {
79 inherit User;
80 SyslogIdentifier = "hdfs-${lib.toLower name}";
81 ExecStart = "${cfg.package}/bin/hdfs --config ${hadoopConf} ${lib.toLower name} ${lib.escapeShellArgs serviceOptions.extraFlags}";
82 Restart = "always";
83 };
84 };
85
86 services.hadoop.gatewayRole.enable = true;
87
88 networking.firewall.allowedTCPPorts = lib.mkIf (
89 (builtins.hasAttr "openFirewall" serviceOptions) && serviceOptions.openFirewall
90 ) allowedTCPPorts;
91 }
92 extraConfig
93 ]
94 )
95 );
96
97in
98{
99 options.services.hadoop.hdfs = {
100
101 namenode = hadoopServiceOption { serviceName = "HDFS NameNode"; } // {
102 formatOnInit = lib.mkOption {
103 type = lib.types.bool;
104 default = false;
105 description = ''
106 Format HDFS namenode on first start. This is useful for quickly spinning up
107 ephemeral HDFS clusters with a single namenode.
108 For HA clusters, initialization involves multiple steps across multiple nodes.
109 Follow this guide to initialize an HA cluster manually:
110 <https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html>
111 '';
112 };
113 };
114
115 datanode = hadoopServiceOption { serviceName = "HDFS DataNode"; } // {
116 dataDirs = lib.mkOption {
117 default = null;
118 description = "Tier and path definitions for datanode storage.";
119 type =
120 with lib.types;
121 nullOr (
122 listOf (submodule {
123 options = {
124 type = lib.mkOption {
125 type = enum [
126 "SSD"
127 "DISK"
128 "ARCHIVE"
129 "RAM_DISK"
130 ];
131 description = ''
132 Storage types ([SSD]/[DISK]/[ARCHIVE]/[RAM_DISK]) for HDFS storage policies.
133 '';
134 };
135 path = lib.mkOption {
136 type = path;
137 example = [ "/var/lib/hadoop/hdfs/dn" ];
138 description = "Determines where on the local filesystem a data node should store its blocks.";
139 };
140 };
141 })
142 );
143 };
144 };
145
146 journalnode = hadoopServiceOption { serviceName = "HDFS JournalNode"; };
147
148 zkfc = hadoopServiceOption {
149 serviceName = "HDFS ZooKeeper failover controller";
150 firewallOption = false;
151 };
152
153 httpfs = hadoopServiceOption { serviceName = "HDFS JournalNode"; } // {
154 tempPath = lib.mkOption {
155 type = lib.types.path;
156 default = "/tmp/hadoop/httpfs";
157 description = "HTTPFS_TEMP path used by HTTPFS";
158 };
159 };
160
161 };
162
163 config = lib.mkMerge [
164 (hadoopServiceConfig {
165 name = "NameNode";
166 allowedTCPPorts = [
167 9870 # namenode.http-address
168 8020 # namenode.rpc-address
169 8022 # namenode.servicerpc-address
170 8019 # dfs.ha.zkfc.port
171 ];
172 preStart = (
173 lib.mkIf cfg.hdfs.namenode.formatOnInit "${cfg.package}/bin/hdfs --config ${hadoopConf} namenode -format -nonInteractive || true"
174 );
175 })
176
177 (hadoopServiceConfig {
178 name = "DataNode";
179 # port numbers for datanode changed between hadoop 2 and 3
180 allowedTCPPorts =
181 if lib.versionAtLeast cfg.package.version "3" then
182 [
183 9864 # datanode.http.address
184 9866 # datanode.address
185 9867 # datanode.ipc.address
186 ]
187 else
188 [
189 50075 # datanode.http.address
190 50010 # datanode.address
191 50020 # datanode.ipc.address
192 ];
193 extraConfig.services.hadoop.hdfsSiteInternal."dfs.datanode.data.dir" = lib.mkIf (
194 cfg.hdfs.datanode.dataDirs != null
195 ) (lib.concatMapStringsSep "," (x: "[" + x.type + "]file://" + x.path) cfg.hdfs.datanode.dataDirs);
196 })
197
198 (hadoopServiceConfig {
199 name = "JournalNode";
200 allowedTCPPorts = [
201 8480 # dfs.journalnode.http-address
202 8485 # dfs.journalnode.rpc-address
203 ];
204 })
205
206 (hadoopServiceConfig {
207 name = "zkfc";
208 description = "Hadoop HDFS ZooKeeper failover controller";
209 })
210
211 (hadoopServiceConfig {
212 name = "HTTPFS";
213 environment.HTTPFS_TEMP = cfg.hdfs.httpfs.tempPath;
214 preStart = "mkdir -p $HTTPFS_TEMP";
215 User = "httpfs";
216 allowedTCPPorts = [
217 14000 # httpfs.http.port
218 ];
219 })
220
221 (lib.mkIf cfg.gatewayRole.enable {
222 users.users.hdfs = {
223 description = "Hadoop HDFS user";
224 group = "hadoop";
225 uid = config.ids.uids.hdfs;
226 };
227 })
228 (lib.mkIf cfg.hdfs.httpfs.enable {
229 users.users.httpfs = {
230 description = "Hadoop HTTPFS user";
231 group = "hadoop";
232 isSystemUser = true;
233 };
234 })
235
236 ];
237}