nixos/hadoop: Add HA capabilities

- Add HDFS journalnode and ZKFC services
- Test failover of HDFS and YARN master services in full hadoop test
- Check if a minimal HDFS cluster works in the minimal HDFS test

Changed files
+282 -61
nixos
modules
services
cluster
tests
+73 -3
nixos/modules/services/cluster/hadoop/hdfs.nix
···
Whether to run the HDFS NameNode
'';
};
+
formatOnInit = mkOption {
+
type = types.bool;
+
default = false;
+
description = ''
+
Format HDFS namenode on first start. This is useful for quickly spinning up ephemeral HDFS clusters with a single namenode.
+
For HA clusters, initialization involves multiple steps across multiple nodes. Follow [this guide](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html)
+
to initialize an HA cluster manually.
+
'';
+
};
inherit restartIfChanged;
openFirewall = mkOption {
type = types.bool;
···
'';
};
};
+
journalnode = {
+
enabled = mkOption {
+
type = types.bool;
+
default = false;
+
description = ''
+
Whether to run the HDFS JournalNode
+
'';
+
};
+
inherit restartIfChanged;
+
openFirewall = mkOption {
+
type = types.bool;
+
default = true;
+
description = ''
+
Open firewall ports for journalnode
+
'';
+
};
+
};
+
zkfc = {
+
enabled = mkOption {
+
type = types.bool;
+
default = false;
+
description = ''
+
Whether to run the HDFS ZooKeeper failover controller
+
'';
+
};
+
inherit restartIfChanged;
+
};
};
config = mkMerge [
···
wantedBy = [ "multi-user.target" ];
inherit (cfg.hdfs.namenode) restartIfChanged;
-
preStart = ''
+
preStart = (mkIf cfg.hdfs.namenode.formatOnInit ''
${cfg.package}/bin/hdfs --config ${hadoopConf} namenode -format -nonInteractive || true
-
'';
+
'');
serviceConfig = {
User = "hdfs";
···
networking.firewall.allowedTCPPorts = (mkIf cfg.hdfs.namenode.openFirewall [
9870 # namenode.http-address
8020 # namenode.rpc-address
+
8022 # namenode. servicerpc-address
]);
})
(mkIf cfg.hdfs.datanode.enabled {
···
9867 # datanode.ipc.address
]);
})
+
(mkIf cfg.hdfs.journalnode.enabled {
+
systemd.services.hdfs-journalnode = {
+
description = "Hadoop HDFS JournalNode";
+
wantedBy = [ "multi-user.target" ];
+
inherit (cfg.hdfs.journalnode) restartIfChanged;
+
+
serviceConfig = {
+
User = "hdfs";
+
SyslogIdentifier = "hdfs-journalnode";
+
ExecStart = "${cfg.package}/bin/hdfs --config ${hadoopConf} journalnode";
+
Restart = "always";
+
};
+
};
+
+
networking.firewall.allowedTCPPorts = (mkIf cfg.hdfs.datanode.openFirewall [
+
8480 # dfs.journalnode.http-address
+
8485 # dfs.journalnode.rpc-address
+
]);
+
})
+
(mkIf cfg.hdfs.zkfc.enabled {
+
systemd.services.hdfs-zkfc = {
+
description = "Hadoop HDFS ZooKeeper failover controller";
+
wantedBy = [ "multi-user.target" ];
+
inherit (cfg.hdfs.zkfc) restartIfChanged;
+
+
serviceConfig = {
+
User = "hdfs";
+
SyslogIdentifier = "hdfs-zkfc";
+
ExecStart = "${cfg.package}/bin/hdfs --config ${hadoopConf} zkfc";
+
Restart = "always";
+
};
+
};
+
})
(mkIf (
-
cfg.hdfs.namenode.enabled || cfg.hdfs.datanode.enabled
+
cfg.hdfs.namenode.enabled || cfg.hdfs.datanode.enabled || cfg.hdfs.journalnode.enabled || cfg.hdfs.zkfc.enabled
) {
users.users.hdfs = {
description = "Hadoop HDFS user";
+1
nixos/modules/services/cluster/hadoop/yarn.nix
···
8030 # resourcemanager.scheduler.address
8031 # resourcemanager.resource-tracker.address
8032 # resourcemanager.address
+
8033 # resourcemanager.admin.address
]);
})
+198 -38
nixos/tests/hadoop/hadoop.nix
···
+
# This test is very comprehensive. It tests whether all hadoop services work well with each other.
+
# Run this when updating the Hadoop package or making significant changes to the hadoop module.
+
# For a more basic test, see hdfs.nix and yarn.nix
import ../make-test-python.nix ({pkgs, ...}: {
nodes = let
package = pkgs.hadoop;
coreSite = {
-
"fs.defaultFS" = "hdfs://master";
+
"fs.defaultFS" = "hdfs://ns1";
+
};
+
hdfsSite = {
+
"dfs.namenode.rpc-bind-host" = "0.0.0.0";
+
"dfs.namenode.http-bind-host" = "0.0.0.0";
+
"dfs.namenode.servicerpc-bind-host" = "0.0.0.0";
+
+
# HA Quorum Journal Manager configuration
+
"dfs.nameservices" = "ns1";
+
"dfs.ha.namenodes.ns1" = "nn1,nn2";
+
"dfs.namenode.shared.edits.dir.ns1.nn1" = "qjournal://jn1:8485;jn2:8485;jn3:8485/ns1";
+
"dfs.namenode.shared.edits.dir.ns1.nn2" = "qjournal://jn1:8485;jn2:8485;jn3:8485/ns1";
+
"dfs.namenode.rpc-address.ns1.nn1" = "nn1:8020";
+
"dfs.namenode.rpc-address.ns1.nn2" = "nn2:8020";
+
"dfs.namenode.servicerpc-address.ns1.nn1" = "nn1:8022";
+
"dfs.namenode.servicerpc-address.ns1.nn2" = "nn2:8022";
+
"dfs.namenode.http-address.ns1.nn1" = "nn1:9870";
+
"dfs.namenode.http-address.ns1.nn2" = "nn2:9870";
+
+
# Automatic failover configuration
+
"dfs.client.failover.proxy.provider.ns1" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider";
+
"dfs.ha.automatic-failover.enabled.ns1" = "true";
+
"dfs.ha.fencing.methods" = "shell(true)";
+
"ha.zookeeper.quorum" = "zk1:2181";
+
};
+
yarnSiteHA = {
+
"yarn.resourcemanager.zk-address" = "zk1:2181";
+
"yarn.resourcemanager.ha.enabled" = "true";
+
"yarn.resourcemanager.ha.rm-ids" = "rm1,rm2";
+
"yarn.resourcemanager.hostname.rm1" = "rm1";
+
"yarn.resourcemanager.hostname.rm2" = "rm2";
+
"yarn.resourcemanager.ha.automatic-failover.enabled" = "true";
+
"yarn.resourcemanager.cluster-id" = "cluster1";
+
# yarn.resourcemanager.webapp.address needs to be defined even though yarn.resourcemanager.hostname is set. This shouldn't be necessary, but there's a bug in
+
# hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/amfilter/AmFilterInitializer.java:70
+
# that causes AM containers to fail otherwise.
+
"yarn.resourcemanager.webapp.address.rm1" = "rm1:8088";
+
"yarn.resourcemanager.webapp.address.rm2" = "rm2:8088";
};
in {
-
master = {pkgs, options, ...}: {
+
zk1 = { ... }: {
+
services.zookeeper.enable = true;
+
networking.firewall.allowedTCPPorts = [ 2181 ];
+
};
+
+
# HDFS cluster
+
nn1 = {pkgs, options, ...}: {
services.hadoop = {
-
inherit package coreSite;
+
inherit package coreSite hdfsSite;
hdfs.namenode.enabled = true;
-
yarn.resourcemanager.enabled = true;
+
hdfs.zkfc.enabled = true;
};
-
virtualisation.memorySize = 1024;
+
};
+
nn2 = {pkgs, options, ...}: {
+
services.hadoop = {
+
inherit package coreSite hdfsSite;
+
hdfs.namenode.enabled = true;
+
hdfs.zkfc.enabled = true;
+
};
};
-
worker = {pkgs, options, ...}: {
+
jn1 = {pkgs, options, ...}: {
+
services.hadoop = {
+
inherit package coreSite hdfsSite;
+
hdfs.journalnode.enabled = true;
+
};
+
};
+
jn2 = {pkgs, options, ...}: {
services.hadoop = {
-
inherit package coreSite;
+
inherit package coreSite hdfsSite;
+
hdfs.journalnode.enabled = true;
+
};
+
};
+
jn3 = {pkgs, options, ...}: {
+
services.hadoop = {
+
inherit package coreSite hdfsSite;
+
hdfs.journalnode.enabled = true;
+
};
+
};
+
+
dn1 = {pkgs, options, ...}: {
+
services.hadoop = {
+
inherit package coreSite hdfsSite;
hdfs.datanode.enabled = true;
+
};
+
};
+
+
# YARN cluster
+
rm1 = {pkgs, options, ...}: {
+
virtualisation.memorySize = 1024;
+
services.hadoop = {
+
inherit package coreSite hdfsSite;
+
yarnSite = options.services.hadoop.yarnSite.default // yarnSiteHA;
+
yarn.resourcemanager.enabled = true;
+
};
+
};
+
rm2 = {pkgs, options, ...}: {
+
virtualisation.memorySize = 1024;
+
services.hadoop = {
+
inherit package coreSite hdfsSite;
+
yarnSite = options.services.hadoop.yarnSite.default // yarnSiteHA;
+
yarn.resourcemanager.enabled = true;
+
};
+
};
+
nm1 = {pkgs, options, ...}: {
+
virtualisation.memorySize = 2048;
+
services.hadoop = {
+
inherit package coreSite hdfsSite;
+
yarnSite = options.services.hadoop.yarnSite.default // yarnSiteHA;
yarn.nodemanager.enabled = true;
-
yarnSite = options.services.hadoop.yarnSite.default // {
-
"yarn.resourcemanager.hostname" = "master";
-
};
};
-
virtualisation.memorySize = 2048;
};
};
testScript = ''
start_all()
-
master.wait_for_unit("network.target")
-
master.wait_for_unit("hdfs-namenode")
+
#### HDFS tests ####
-
master.wait_for_open_port(8020)
-
master.wait_for_open_port(9870)
+
zk1.wait_for_unit("network.target")
+
jn1.wait_for_unit("network.target")
+
jn2.wait_for_unit("network.target")
+
jn3.wait_for_unit("network.target")
+
nn1.wait_for_unit("network.target")
+
nn2.wait_for_unit("network.target")
+
dn1.wait_for_unit("network.target")
-
worker.wait_for_unit("network.target")
-
worker.wait_for_unit("hdfs-datanode")
-
worker.wait_for_open_port(9864)
-
worker.wait_for_open_port(9866)
-
worker.wait_for_open_port(9867)
+
zk1.wait_for_unit("zookeeper")
+
jn1.wait_for_unit("hdfs-journalnode")
+
jn2.wait_for_unit("hdfs-journalnode")
+
jn3.wait_for_unit("hdfs-journalnode")
-
master.succeed("curl -f http://worker:9864")
-
worker.succeed("curl -f http://master:9870")
+
zk1.wait_for_open_port(2181)
+
jn1.wait_for_open_port(8480)
+
jn1.wait_for_open_port(8485)
+
jn2.wait_for_open_port(8480)
+
jn2.wait_for_open_port(8485)
-
worker.succeed("sudo -u hdfs hdfs dfsadmin -safemode wait")
+
# Namenodes must be stopped before initializing the cluster
+
nn1.succeed("systemctl stop hdfs-namenode")
+
nn2.succeed("systemctl stop hdfs-namenode")
+
nn1.succeed("systemctl stop hdfs-zkfc")
+
nn2.succeed("systemctl stop hdfs-zkfc")
-
master.wait_for_unit("yarn-resourcemanager")
+
# Initialize zookeeper for failover controller
+
nn1.succeed("sudo -u hdfs hdfs zkfc -formatZK 2>&1 | systemd-cat")
-
master.wait_for_open_port(8030)
-
master.wait_for_open_port(8031)
-
master.wait_for_open_port(8032)
-
master.wait_for_open_port(8088)
-
worker.succeed("curl -f http://master:8088")
+
# Format NN1 and start it
+
nn1.succeed("sudo -u hdfs hadoop namenode -format 2>&1 | systemd-cat")
+
nn1.succeed("systemctl start hdfs-namenode")
+
nn1.wait_for_open_port(9870)
+
nn1.wait_for_open_port(8022)
+
nn1.wait_for_open_port(8020)
-
worker.wait_for_unit("yarn-nodemanager")
-
worker.wait_for_open_port(8042)
-
worker.wait_for_open_port(8040)
-
master.succeed("curl -f http://worker:8042")
+
# Bootstrap NN2 from NN1 and start it
+
nn2.succeed("sudo -u hdfs hdfs namenode -bootstrapStandby 2>&1 | systemd-cat")
+
nn2.succeed("systemctl start hdfs-namenode")
+
nn2.wait_for_open_port(9870)
+
nn2.wait_for_open_port(8022)
+
nn2.wait_for_open_port(8020)
+
nn1.succeed("netstat -tulpne | systemd-cat")
-
assert "Total Nodes:1" in worker.succeed("yarn node -list")
+
# Start failover controllers
+
nn1.succeed("systemctl start hdfs-zkfc")
+
nn2.succeed("systemctl start hdfs-zkfc")
-
assert "Estimated value of Pi is" in worker.succeed("HADOOP_USER_NAME=hdfs yarn jar $(readlink $(which yarn) | sed -r 's~bin/yarn~lib/hadoop-*/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar~g') pi 2 10")
-
assert "SUCCEEDED" in worker.succeed("yarn application -list -appStates FINISHED")
-
worker.succeed("sudo -u hdfs hdfs dfs -ls / | systemd-cat")
+
# DN should have started by now, but confirm anyway
+
dn1.wait_for_unit("hdfs-datanode")
+
# Print states of namenodes
+
dn1.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState | systemd-cat")
+
# Wait for cluster to exit safemode
+
dn1.succeed("sudo -u hdfs hdfs dfsadmin -safemode wait")
+
dn1.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState | systemd-cat")
+
# test R/W
+
dn1.succeed("echo testfilecontents | sudo -u hdfs hdfs dfs -put - /testfile")
+
assert "testfilecontents" in dn1.succeed("sudo -u hdfs hdfs dfs -cat /testfile")
+
+
# Test NN failover
+
nn1.succeed("systemctl stop hdfs-namenode")
+
assert "active" in dn1.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState")
+
dn1.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState | systemd-cat")
+
assert "testfilecontents" in dn1.succeed("sudo -u hdfs hdfs dfs -cat /testfile")
+
+
nn1.succeed("systemctl start hdfs-namenode")
+
nn1.wait_for_open_port(9870)
+
nn1.wait_for_open_port(8022)
+
nn1.wait_for_open_port(8020)
+
assert "standby" in dn1.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState")
+
dn1.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState | systemd-cat")
+
+
#### YARN tests ####
+
+
rm1.wait_for_unit("network.target")
+
rm2.wait_for_unit("network.target")
+
nm1.wait_for_unit("network.target")
+
+
rm1.wait_for_unit("yarn-resourcemanager")
+
rm1.wait_for_open_port(8088)
+
rm2.wait_for_unit("yarn-resourcemanager")
+
rm2.wait_for_open_port(8088)
+
+
nm1.wait_for_unit("yarn-nodemanager")
+
nm1.wait_for_open_port(8042)
+
nm1.wait_for_open_port(8040)
+
nm1.wait_until_succeeds("yarn node -list | grep Nodes:1")
+
nm1.succeed("sudo -u yarn yarn rmadmin -getAllServiceState | systemd-cat")
+
nm1.succeed("sudo -u yarn yarn node -list | systemd-cat")
+
+
# Test RM failover
+
rm1.succeed("systemctl stop yarn-resourcemanager")
+
assert "standby" not in nm1.succeed("sudo -u yarn yarn rmadmin -getAllServiceState")
+
nm1.succeed("sudo -u yarn yarn rmadmin -getAllServiceState | systemd-cat")
+
rm1.succeed("systemctl start yarn-resourcemanager")
+
rm1.wait_for_unit("yarn-resourcemanager")
+
rm1.wait_for_open_port(8088)
+
assert "standby" in nm1.succeed("sudo -u yarn yarn rmadmin -getAllServiceState")
+
nm1.succeed("sudo -u yarn yarn rmadmin -getAllServiceState | systemd-cat")
+
+
assert "Estimated value of Pi is" in nm1.succeed("HADOOP_USER_NAME=hdfs yarn jar $(readlink $(which yarn) | sed -r 's~bin/yarn~lib/hadoop-*/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar~g') pi 2 10")
+
assert "SUCCEEDED" in nm1.succeed("yarn application -list -appStates FINISHED")
'';
-
})
+
})
+9 -10
nixos/tests/hadoop/hdfs.nix
···
+
# Test a minimal HDFS cluster with no HA
import ../make-test-python.nix ({...}: {
nodes = {
namenode = {pkgs, ...}: {
services.hadoop = {
package = pkgs.hadoop;
-
hdfs.namenode.enabled = true;
+
hdfs.namenode = {
+
enabled = true;
+
formatOnInit = true;
+
};
coreSite = {
"fs.defaultFS" = "hdfs://namenode:8020";
};
···
"dfs.namenode.http-bind-host" = "0.0.0.0";
};
};
-
networking.firewall.allowedTCPPorts = [
-
9870 # namenode.http-address
-
8020 # namenode.rpc-address
-
];
};
datanode = {pkgs, ...}: {
services.hadoop = {
···
"fs.defaultFS" = "hdfs://namenode:8020";
};
};
-
networking.firewall.allowedTCPPorts = [
-
9864 # datanode.http.address
-
9866 # datanode.address
-
9867 # datanode.ipc.address
-
];
};
};
···
namenode.succeed("curl -f http://namenode:9870")
datanode.succeed("curl -f http://datanode:9864")
+
+
datanode.succeed("sudo -u hdfs hdfs dfsadmin -safemode wait")
+
datanode.succeed("echo testfilecontents | sudo -u hdfs hdfs dfs -put - /testfile")
+
assert "testfilecontents" in datanode.succeed("sudo -u hdfs hdfs dfs -cat /testfile")
'';
})
+1 -10
nixos/tests/hadoop/yarn.nix
···
+
# This only tests if YARN is able to start its services
import ../make-test-python.nix ({...}: {
nodes = {
resourcemanager = {pkgs, ...}: {
···
services.hadoop.yarnSite = {
"yarn.resourcemanager.scheduler.class" = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler";
};
-
networking.firewall.allowedTCPPorts = [
-
8088 # resourcemanager.webapp.address
-
8031 # resourcemanager.resource-tracker.address
-
];
};
nodemanager = {pkgs, ...}: {
services.hadoop.package = pkgs.hadoop;
···
services.hadoop.yarnSite = {
"yarn.resourcemanager.hostname" = "resourcemanager";
"yarn.nodemanager.log-dirs" = "/tmp/userlogs";
-
"yarn.nodemanager.address" = "0.0.0.0:8041";
};
-
networking.firewall.allowedTCPPorts = [
-
8042 # nodemanager.webapp.address
-
8041 # nodemanager.address
-
];
};
};
···
nodemanager.wait_for_unit("yarn-nodemanager")
nodemanager.wait_for_unit("network.target")
nodemanager.wait_for_open_port(8042)
-
nodemanager.wait_for_open_port(8041)
resourcemanager.succeed("curl -f http://localhost:8088")
nodemanager.succeed("curl -f http://localhost:8042")