1{ config, lib, pkgs, ... }:
2
3with lib;
4
5let
6 cfg = config.services.apache-kafka;
7
8 # The `javaProperties` generator takes care of various escaping rules and
9 # generation of the properties file, but we'll handle stringly conversion
10 # ourselves in mkPropertySettings and stringlySettings, since we know more
11 # about the specifically allowed format eg. for lists of this type, and we
12 # don't want to coerce-downsample values to str too early by having the
13 # coercedTypes from javaProperties directly in our NixOS option types.
14 #
15 # Make sure every `freeformType` and any specific option type in `settings` is
16 # supported here.
17
18 mkPropertyString = let
19 render = {
20 bool = boolToString;
21 int = toString;
22 list = concatMapStringsSep "," mkPropertyString;
23 string = id;
24 };
25 in
26 v: render.${builtins.typeOf v} v;
27
28 stringlySettings = mapAttrs (_: mkPropertyString)
29 (filterAttrs (_: v: v != null) cfg.settings);
30
31 generator = (pkgs.formats.javaProperties {}).generate;
32in {
33
34 options.services.apache-kafka = {
35 enable = mkEnableOption "Apache Kafka event streaming broker";
36
37 settings = mkOption {
38 description = ''
39 [Kafka broker configuration](https://kafka.apache.org/documentation.html#brokerconfigs)
40 {file}`server.properties`.
41
42 Note that .properties files contain mappings from string to string.
43 Keys with dots are NOT represented by nested attrs in these settings,
44 but instead as quoted strings (ie. `settings."broker.id"`, NOT
45 `settings.broker.id`).
46 '';
47 type = types.submodule {
48 freeformType = with types; let
49 primitive = oneOf [bool int str];
50 in lazyAttrsOf (nullOr (either primitive (listOf primitive)));
51
52 options = {
53 "broker.id" = mkOption {
54 description = "Broker ID. -1 or null to auto-allocate in zookeeper mode.";
55 default = null;
56 type = with types; nullOr int;
57 };
58
59 "log.dirs" = mkOption {
60 description = "Log file directories.";
61 # Deliberaly leave out old default and use the rewrite opportunity
62 # to have users choose a safer value -- /tmp might be volatile and is a
63 # slightly scary default choice.
64 # default = [ "/tmp/apache-kafka" ];
65 type = with types; listOf path;
66 };
67
68 "listeners" = mkOption {
69 description = ''
70 Kafka Listener List.
71 See [listeners](https://kafka.apache.org/documentation/#brokerconfigs_listeners).
72 '';
73 type = types.listOf types.str;
74 default = [ "PLAINTEXT://localhost:9092" ];
75 };
76 };
77 };
78 };
79
80 clusterId = mkOption {
81 description = ''
82 KRaft mode ClusterId used for formatting log directories. Can be generated with `kafka-storage.sh random-uuid`
83 '';
84 type = with types; nullOr str;
85 default = null;
86 };
87
88 configFiles.serverProperties = mkOption {
89 description = ''
90 Kafka server.properties configuration file path.
91 Defaults to the rendered `settings`.
92 '';
93 type = types.path;
94 };
95
96 configFiles.log4jProperties = mkOption {
97 description = "Kafka log4j property configuration file path";
98 type = types.path;
99 default = pkgs.writeText "log4j.properties" cfg.log4jProperties;
100 defaultText = ''pkgs.writeText "log4j.properties" cfg.log4jProperties'';
101 };
102
103 formatLogDirs = mkOption {
104 description = ''
105 Whether to format log dirs in KRaft mode if all log dirs are
106 unformatted, ie. they contain no meta.properties.
107 '';
108 type = types.bool;
109 default = false;
110 };
111
112 formatLogDirsIgnoreFormatted = mkOption {
113 description = ''
114 Whether to ignore already formatted log dirs when formatting log dirs,
115 instead of failing. Useful when replacing or adding disks.
116 '';
117 type = types.bool;
118 default = false;
119 };
120
121 log4jProperties = mkOption {
122 description = "Kafka log4j property configuration.";
123 default = ''
124 log4j.rootLogger=INFO, stdout
125
126 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
127 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
128 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
129 '';
130 type = types.lines;
131 };
132
133 jvmOptions = mkOption {
134 description = "Extra command line options for the JVM running Kafka.";
135 default = [];
136 type = types.listOf types.str;
137 example = [
138 "-Djava.net.preferIPv4Stack=true"
139 "-Dcom.sun.management.jmxremote"
140 "-Dcom.sun.management.jmxremote.local.only=true"
141 ];
142 };
143
144 package = mkPackageOption pkgs "apacheKafka" { };
145
146 jre = mkOption {
147 description = "The JRE with which to run Kafka";
148 default = cfg.package.passthru.jre;
149 defaultText = literalExpression "pkgs.apacheKafka.passthru.jre";
150 type = types.package;
151 };
152 };
153
154 imports = [
155 (mkRenamedOptionModule
156 [ "services" "apache-kafka" "brokerId" ]
157 [ "services" "apache-kafka" "settings" ''broker.id'' ])
158 (mkRenamedOptionModule
159 [ "services" "apache-kafka" "logDirs" ]
160 [ "services" "apache-kafka" "settings" ''log.dirs'' ])
161 (mkRenamedOptionModule
162 [ "services" "apache-kafka" "zookeeper" ]
163 [ "services" "apache-kafka" "settings" ''zookeeper.connect'' ])
164
165 (mkRemovedOptionModule [ "services" "apache-kafka" "port" ]
166 "Please see services.apache-kafka.settings.listeners and its documentation instead")
167 (mkRemovedOptionModule [ "services" "apache-kafka" "hostname" ]
168 "Please see services.apache-kafka.settings.listeners and its documentation instead")
169 (mkRemovedOptionModule [ "services" "apache-kafka" "extraProperties" ]
170 "Please see services.apache-kafka.settings and its documentation instead")
171 (mkRemovedOptionModule [ "services" "apache-kafka" "serverProperties" ]
172 "Please see services.apache-kafka.settings and its documentation instead")
173 ];
174
175 config = mkIf cfg.enable {
176 services.apache-kafka.configFiles.serverProperties = generator "server.properties" stringlySettings;
177
178 users.users.apache-kafka = {
179 isSystemUser = true;
180 group = "apache-kafka";
181 description = "Apache Kafka daemon user";
182 };
183 users.groups.apache-kafka = {};
184
185 systemd.tmpfiles.rules = map (logDir: "d '${logDir}' 0700 apache-kafka - - -") cfg.settings."log.dirs";
186
187 systemd.services.apache-kafka = {
188 description = "Apache Kafka Daemon";
189 wantedBy = [ "multi-user.target" ];
190 after = [ "network.target" ];
191 preStart = mkIf cfg.formatLogDirs
192 (if cfg.formatLogDirsIgnoreFormatted then ''
193 ${cfg.package}/bin/kafka-storage.sh format -t "${cfg.clusterId}" -c ${cfg.configFiles.serverProperties} --ignore-formatted
194 '' else ''
195 if ${concatMapStringsSep " && " (l: ''[ ! -f "${l}/meta.properties" ]'') cfg.settings."log.dirs"}; then
196 ${cfg.package}/bin/kafka-storage.sh format -t "${cfg.clusterId}" -c ${cfg.configFiles.serverProperties}
197 fi
198 '');
199 serviceConfig = {
200 ExecStart = ''
201 ${cfg.jre}/bin/java \
202 -cp "${cfg.package}/libs/*" \
203 -Dlog4j.configuration=file:${cfg.configFiles.log4jProperties} \
204 ${toString cfg.jvmOptions} \
205 kafka.Kafka \
206 ${cfg.configFiles.serverProperties}
207 '';
208 User = "apache-kafka";
209 SuccessExitStatus = "0 143";
210 };
211 };
212 };
213
214 meta.doc = ./kafka.md;
215 meta.maintainers = with lib.maintainers; [
216 srhb
217 ];
218}