1{config, pkgs, lib, ...}:
2let
3 cfg = config.services.spark;
4in
5with lib;
6{
7 options = {
8 services.spark = {
9 master = {
10 enable = mkEnableOption (lib.mdDoc "Spark master service");
11 bind = mkOption {
12 type = types.str;
13 description = lib.mdDoc "Address the spark master binds to.";
14 default = "127.0.0.1";
15 example = "0.0.0.0";
16 };
17 restartIfChanged = mkOption {
18 type = types.bool;
19 description = lib.mdDoc ''
20 Automatically restart master service on config change.
21 This can be set to false to defer restarts on clusters running critical applications.
22 Please consider the security implications of inadvertently running an older version,
23 and the possibility of unexpected behavior caused by inconsistent versions across a cluster when disabling this option.
24 '';
25 default = true;
26 };
27 extraEnvironment = mkOption {
28 type = types.attrsOf types.str;
29 description = lib.mdDoc "Extra environment variables to pass to spark master. See spark-standalone documentation.";
30 default = {};
31 example = {
32 SPARK_MASTER_WEBUI_PORT = 8181;
33 SPARK_MASTER_OPTS = "-Dspark.deploy.defaultCores=5";
34 };
35 };
36 };
37 worker = {
38 enable = mkEnableOption (lib.mdDoc "Spark worker service");
39 workDir = mkOption {
40 type = types.path;
41 description = lib.mdDoc "Spark worker work dir.";
42 default = "/var/lib/spark";
43 };
44 master = mkOption {
45 type = types.str;
46 description = lib.mdDoc "Address of the spark master.";
47 default = "127.0.0.1:7077";
48 };
49 restartIfChanged = mkOption {
50 type = types.bool;
51 description = lib.mdDoc ''
52 Automatically restart worker service on config change.
53 This can be set to false to defer restarts on clusters running critical applications.
54 Please consider the security implications of inadvertently running an older version,
55 and the possibility of unexpected behavior caused by inconsistent versions across a cluster when disabling this option.
56 '';
57 default = true;
58 };
59 extraEnvironment = mkOption {
60 type = types.attrsOf types.str;
61 description = lib.mdDoc "Extra environment variables to pass to spark worker.";
62 default = {};
63 example = {
64 SPARK_WORKER_CORES = 5;
65 SPARK_WORKER_MEMORY = "2g";
66 };
67 };
68 };
69 confDir = mkOption {
70 type = types.path;
71 description = lib.mdDoc "Spark configuration directory. Spark will use the configuration files (spark-defaults.conf, spark-env.sh, log4j.properties, etc) from this directory.";
72 default = "${cfg.package}/lib/${cfg.package.untarDir}/conf";
73 defaultText = literalExpression ''"''${package}/lib/''${package.untarDir}/conf"'';
74 };
75 logDir = mkOption {
76 type = types.path;
77 description = lib.mdDoc "Spark log directory.";
78 default = "/var/log/spark";
79 };
80 package = mkOption {
81 type = types.package;
82 description = lib.mdDoc "Spark package.";
83 default = pkgs.spark;
84 defaultText = literalExpression "pkgs.spark";
85 example = literalExpression ''pkgs.spark.overrideAttrs (super: rec {
86 pname = "spark";
87 version = "2.4.4";
88
89 src = pkgs.fetchzip {
90 url = "mirror://apache/spark/"''${pname}-''${version}/''${pname}-''${version}-bin-without-hadoop.tgz";
91 sha256 = "1a9w5k0207fysgpxx6db3a00fs5hdc2ncx99x4ccy2s0v5ndc66g";
92 };
93 })'';
94 };
95 };
96 };
97 config = lib.mkIf (cfg.worker.enable || cfg.master.enable) {
98 environment.systemPackages = [ cfg.package ];
99 systemd = {
100 services = {
101 spark-master = lib.mkIf cfg.master.enable {
102 path = with pkgs; [ procps openssh nettools ];
103 description = "spark master service.";
104 after = [ "network.target" ];
105 wantedBy = [ "multi-user.target" ];
106 restartIfChanged = cfg.master.restartIfChanged;
107 environment = cfg.master.extraEnvironment // {
108 SPARK_MASTER_HOST = cfg.master.bind;
109 SPARK_CONF_DIR = cfg.confDir;
110 SPARK_LOG_DIR = cfg.logDir;
111 };
112 serviceConfig = {
113 Type = "forking";
114 User = "spark";
115 Group = "spark";
116 WorkingDirectory = "${cfg.package}/lib/${cfg.package.untarDir}";
117 ExecStart = "${cfg.package}/lib/${cfg.package.untarDir}/sbin/start-master.sh";
118 ExecStop = "${cfg.package}/lib/${cfg.package.untarDir}/sbin/stop-master.sh";
119 TimeoutSec = 300;
120 StartLimitBurst=10;
121 Restart = "always";
122 };
123 };
124 spark-worker = lib.mkIf cfg.worker.enable {
125 path = with pkgs; [ procps openssh nettools rsync ];
126 description = "spark master service.";
127 after = [ "network.target" ];
128 wantedBy = [ "multi-user.target" ];
129 restartIfChanged = cfg.worker.restartIfChanged;
130 environment = cfg.worker.extraEnvironment // {
131 SPARK_MASTER = cfg.worker.master;
132 SPARK_CONF_DIR = cfg.confDir;
133 SPARK_LOG_DIR = cfg.logDir;
134 SPARK_WORKER_DIR = cfg.worker.workDir;
135 };
136 serviceConfig = {
137 Type = "forking";
138 User = "spark";
139 WorkingDirectory = "${cfg.package}/lib/${cfg.package.untarDir}";
140 ExecStart = "${cfg.package}/lib/${cfg.package.untarDir}/sbin/start-worker.sh spark://${cfg.worker.master}";
141 ExecStop = "${cfg.package}/lib/${cfg.package.untarDir}/sbin/stop-worker.sh";
142 TimeoutSec = 300;
143 StartLimitBurst=10;
144 Restart = "always";
145 };
146 };
147 };
148 tmpfiles.rules = [
149 "d '${cfg.worker.workDir}' - spark spark - -"
150 "d '${cfg.logDir}' - spark spark - -"
151 ];
152 };
153 users = {
154 users.spark = {
155 description = "spark user.";
156 group = "spark";
157 isSystemUser = true;
158 };
159 groups.spark = { };
160 };
161 };
162}