1{
2 config,
3 pkgs,
4 lib,
5 ...
6}:
7let
8 cfg = config.services.spark;
9in
10{
11 options = {
12 services.spark = {
13 master = {
14 enable = lib.mkEnableOption "Spark master service";
15 bind = lib.mkOption {
16 type = lib.types.str;
17 description = "Address the spark master binds to.";
18 default = "127.0.0.1";
19 example = "0.0.0.0";
20 };
21 restartIfChanged = lib.mkOption {
22 type = lib.types.bool;
23 description = ''
24 Automatically restart master service on config change.
25 This can be set to false to defer restarts on clusters running critical applications.
26 Please consider the security implications of inadvertently running an older version,
27 and the possibility of unexpected behavior caused by inconsistent versions across a cluster when disabling this option.
28 '';
29 default = true;
30 };
31 extraEnvironment = lib.mkOption {
32 type = lib.types.attrsOf lib.types.str;
33 description = "Extra environment variables to pass to spark master. See spark-standalone documentation.";
34 default = { };
35 example = {
36 SPARK_MASTER_WEBUI_PORT = 8181;
37 SPARK_MASTER_OPTS = "-Dspark.deploy.defaultCores=5";
38 };
39 };
40 };
41 worker = {
42 enable = lib.mkEnableOption "Spark worker service";
43 workDir = lib.mkOption {
44 type = lib.types.path;
45 description = "Spark worker work dir.";
46 default = "/var/lib/spark";
47 };
48 master = lib.mkOption {
49 type = lib.types.str;
50 description = "Address of the spark master.";
51 default = "127.0.0.1:7077";
52 };
53 restartIfChanged = lib.mkOption {
54 type = lib.types.bool;
55 description = ''
56 Automatically restart worker service on config change.
57 This can be set to false to defer restarts on clusters running critical applications.
58 Please consider the security implications of inadvertently running an older version,
59 and the possibility of unexpected behavior caused by inconsistent versions across a cluster when disabling this option.
60 '';
61 default = true;
62 };
63 extraEnvironment = lib.mkOption {
64 type = lib.types.attrsOf lib.types.str;
65 description = "Extra environment variables to pass to spark worker.";
66 default = { };
67 example = {
68 SPARK_WORKER_CORES = 5;
69 SPARK_WORKER_MEMORY = "2g";
70 };
71 };
72 };
73 confDir = lib.mkOption {
74 type = lib.types.path;
75 description = "Spark configuration directory. Spark will use the configuration files (spark-defaults.conf, spark-env.sh, log4j.properties, etc) from this directory.";
76 default = "${cfg.package}/conf";
77 defaultText = lib.literalExpression ''"''${package}/conf"'';
78 };
79 logDir = lib.mkOption {
80 type = lib.types.path;
81 description = "Spark log directory.";
82 default = "/var/log/spark";
83 };
84 package = lib.mkPackageOption pkgs "spark" {
85 example = ''
86 spark.overrideAttrs (super: rec {
87 pname = "spark";
88 version = "2.4.4";
89
90 src = pkgs.fetchzip {
91 url = "mirror://apache/spark/"''${pname}-''${version}/''${pname}-''${version}-bin-without-hadoop.tgz";
92 sha256 = "1a9w5k0207fysgpxx6db3a00fs5hdc2ncx99x4ccy2s0v5ndc66g";
93 };
94 })
95 '';
96 };
97 };
98 };
99 config = lib.mkIf (cfg.worker.enable || cfg.master.enable) {
100 environment.systemPackages = [ cfg.package ];
101 systemd = {
102 services = {
103 spark-master = lib.mkIf cfg.master.enable {
104 path = with pkgs; [
105 procps
106 openssh
107 net-tools
108 ];
109 description = "spark master service.";
110 after = [ "network.target" ];
111 wantedBy = [ "multi-user.target" ];
112 restartIfChanged = cfg.master.restartIfChanged;
113 environment = cfg.master.extraEnvironment // {
114 SPARK_MASTER_HOST = cfg.master.bind;
115 SPARK_CONF_DIR = cfg.confDir;
116 SPARK_LOG_DIR = cfg.logDir;
117 };
118 serviceConfig = {
119 Type = "forking";
120 User = "spark";
121 Group = "spark";
122 WorkingDirectory = "${cfg.package}/";
123 ExecStart = "${cfg.package}/sbin/start-master.sh";
124 ExecStop = "${cfg.package}/sbin/stop-master.sh";
125 TimeoutSec = 300;
126 Restart = "always";
127 };
128 unitConfig = {
129 StartLimitBurst = 10;
130 };
131 };
132 spark-worker = lib.mkIf cfg.worker.enable {
133 path = with pkgs; [
134 procps
135 openssh
136 net-tools
137 rsync
138 ];
139 description = "spark master service.";
140 after = [ "network.target" ];
141 wantedBy = [ "multi-user.target" ];
142 restartIfChanged = cfg.worker.restartIfChanged;
143 environment = cfg.worker.extraEnvironment // {
144 SPARK_MASTER = cfg.worker.master;
145 SPARK_CONF_DIR = cfg.confDir;
146 SPARK_LOG_DIR = cfg.logDir;
147 SPARK_WORKER_DIR = cfg.worker.workDir;
148 };
149 serviceConfig = {
150 Type = "forking";
151 User = "spark";
152 WorkingDirectory = "${cfg.package}/";
153 ExecStart = "${cfg.package}/sbin/start-worker.sh spark://${cfg.worker.master}";
154 ExecStop = "${cfg.package}/sbin/stop-worker.sh";
155 TimeoutSec = 300;
156 Restart = "always";
157 };
158 unitConfig = {
159 StartLimitBurst = 10;
160 };
161 };
162 };
163 tmpfiles.rules = [
164 "d '${cfg.worker.workDir}' - spark spark - -"
165 "d '${cfg.logDir}' - spark spark - -"
166 ];
167 };
168 users = {
169 users.spark = {
170 description = "spark user.";
171 group = "spark";
172 isSystemUser = true;
173 };
174 groups.spark = { };
175 };
176 };
177}