1{ config, lib, pkgs, ... }:
2with lib;
3let
4 cfg = config.services.hadoop;
5
6 # Config files for hadoop services
7 hadoopConf = "${import ./conf.nix { inherit cfg pkgs lib; }}/";
8
9 # Generator for HDFS service options
10 hadoopServiceOption = { serviceName, firewallOption ? true, extraOpts ? null }: {
11 enable = mkEnableOption (lib.mdDoc serviceName);
12 restartIfChanged = mkOption {
13 type = types.bool;
14 description = lib.mdDoc ''
15 Automatically restart the service on config change.
16 This can be set to false to defer restarts on clusters running critical applications.
17 Please consider the security implications of inadvertently running an older version,
18 and the possibility of unexpected behavior caused by inconsistent versions across a cluster when disabling this option.
19 '';
20 default = false;
21 };
22 extraFlags = mkOption{
23 type = with types; listOf str;
24 default = [];
25 description = lib.mdDoc "Extra command line flags to pass to ${serviceName}";
26 example = [
27 "-Dcom.sun.management.jmxremote"
28 "-Dcom.sun.management.jmxremote.port=8010"
29 ];
30 };
31 extraEnv = mkOption{
32 type = with types; attrsOf str;
33 default = {};
34 description = lib.mdDoc "Extra environment variables for ${serviceName}";
35 };
36 } // (optionalAttrs firewallOption {
37 openFirewall = mkOption {
38 type = types.bool;
39 default = false;
40 description = lib.mdDoc "Open firewall ports for ${serviceName}.";
41 };
42 }) // (optionalAttrs (extraOpts != null) extraOpts);
43
44 # Generator for HDFS service configs
45 hadoopServiceConfig =
46 { name
47 , serviceOptions ? cfg.hdfs."${toLower name}"
48 , description ? "Hadoop HDFS ${name}"
49 , User ? "hdfs"
50 , allowedTCPPorts ? [ ]
51 , preStart ? ""
52 , environment ? { }
53 , extraConfig ? { }
54 }: (
55
56 mkIf serviceOptions.enable ( mkMerge [{
57 systemd.services."hdfs-${toLower name}" = {
58 inherit description preStart;
59 environment = environment // serviceOptions.extraEnv;
60 wantedBy = [ "multi-user.target" ];
61 inherit (serviceOptions) restartIfChanged;
62 serviceConfig = {
63 inherit User;
64 SyslogIdentifier = "hdfs-${toLower name}";
65 ExecStart = "${cfg.package}/bin/hdfs --config ${hadoopConf} ${toLower name} ${escapeShellArgs serviceOptions.extraFlags}";
66 Restart = "always";
67 };
68 };
69
70 services.hadoop.gatewayRole.enable = true;
71
72 networking.firewall.allowedTCPPorts = mkIf
73 ((builtins.hasAttr "openFirewall" serviceOptions) && serviceOptions.openFirewall)
74 allowedTCPPorts;
75 } extraConfig])
76 );
77
78in
79{
80 options.services.hadoop.hdfs = {
81
82 namenode = hadoopServiceOption { serviceName = "HDFS NameNode"; } // {
83 formatOnInit = mkOption {
84 type = types.bool;
85 default = false;
86 description = lib.mdDoc ''
87 Format HDFS namenode on first start. This is useful for quickly spinning up
88 ephemeral HDFS clusters with a single namenode.
89 For HA clusters, initialization involves multiple steps across multiple nodes.
90 Follow this guide to initialize an HA cluster manually:
91 <https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html>
92 '';
93 };
94 };
95
96 datanode = hadoopServiceOption { serviceName = "HDFS DataNode"; } // {
97 dataDirs = mkOption {
98 default = null;
99 description = lib.mdDoc "Tier and path definitions for datanode storage.";
100 type = with types; nullOr (listOf (submodule {
101 options = {
102 type = mkOption {
103 type = enum [ "SSD" "DISK" "ARCHIVE" "RAM_DISK" ];
104 description = lib.mdDoc ''
105 Storage types ([SSD]/[DISK]/[ARCHIVE]/[RAM_DISK]) for HDFS storage policies.
106 '';
107 };
108 path = mkOption {
109 type = path;
110 example = [ "/var/lib/hadoop/hdfs/dn" ];
111 description = lib.mdDoc "Determines where on the local filesystem a data node should store its blocks.";
112 };
113 };
114 }));
115 };
116 };
117
118 journalnode = hadoopServiceOption { serviceName = "HDFS JournalNode"; };
119
120 zkfc = hadoopServiceOption {
121 serviceName = "HDFS ZooKeeper failover controller";
122 firewallOption = false;
123 };
124
125 httpfs = hadoopServiceOption { serviceName = "HDFS JournalNode"; } // {
126 tempPath = mkOption {
127 type = types.path;
128 default = "/tmp/hadoop/httpfs";
129 description = lib.mdDoc "HTTPFS_TEMP path used by HTTPFS";
130 };
131 };
132
133 };
134
135 config = mkMerge [
136 (hadoopServiceConfig {
137 name = "NameNode";
138 allowedTCPPorts = [
139 9870 # namenode.http-address
140 8020 # namenode.rpc-address
141 8022 # namenode.servicerpc-address
142 8019 # dfs.ha.zkfc.port
143 ];
144 preStart = (mkIf cfg.hdfs.namenode.formatOnInit
145 "${cfg.package}/bin/hdfs --config ${hadoopConf} namenode -format -nonInteractive || true"
146 );
147 })
148
149 (hadoopServiceConfig {
150 name = "DataNode";
151 # port numbers for datanode changed between hadoop 2 and 3
152 allowedTCPPorts = if versionAtLeast cfg.package.version "3" then [
153 9864 # datanode.http.address
154 9866 # datanode.address
155 9867 # datanode.ipc.address
156 ] else [
157 50075 # datanode.http.address
158 50010 # datanode.address
159 50020 # datanode.ipc.address
160 ];
161 extraConfig.services.hadoop.hdfsSiteInternal."dfs.datanode.data.dir" = mkIf (cfg.hdfs.datanode.dataDirs!= null)
162 (concatMapStringsSep "," (x: "["+x.type+"]file://"+x.path) cfg.hdfs.datanode.dataDirs);
163 })
164
165 (hadoopServiceConfig {
166 name = "JournalNode";
167 allowedTCPPorts = [
168 8480 # dfs.journalnode.http-address
169 8485 # dfs.journalnode.rpc-address
170 ];
171 })
172
173 (hadoopServiceConfig {
174 name = "zkfc";
175 description = "Hadoop HDFS ZooKeeper failover controller";
176 })
177
178 (hadoopServiceConfig {
179 name = "HTTPFS";
180 environment.HTTPFS_TEMP = cfg.hdfs.httpfs.tempPath;
181 preStart = "mkdir -p $HTTPFS_TEMP";
182 User = "httpfs";
183 allowedTCPPorts = [
184 14000 # httpfs.http.port
185 ];
186 })
187
188 (mkIf cfg.gatewayRole.enable {
189 users.users.hdfs = {
190 description = "Hadoop HDFS user";
191 group = "hadoop";
192 uid = config.ids.uids.hdfs;
193 };
194 })
195 (mkIf cfg.hdfs.httpfs.enable {
196 users.users.httpfs = {
197 description = "Hadoop HTTPFS user";
198 group = "hadoop";
199 isSystemUser = true;
200 };
201 })
202
203 ];
204}