1{
2 config,
3 lib,
4 pkgs,
5 ...
6}:
7let
8 cfg = config.services.apache-kafka;
9
10 # The `javaProperties` generator takes care of various escaping rules and
11 # generation of the properties file, but we'll handle stringly conversion
12 # ourselves in mkPropertySettings and stringlySettings, since we know more
13 # about the specifically allowed format eg. for lists of this type, and we
14 # don't want to coerce-downsample values to str too early by having the
15 # coercedTypes from javaProperties directly in our NixOS option types.
16 #
17 # Make sure every `freeformType` and any specific option type in `settings` is
18 # supported here.
19
20 mkPropertyString =
21 let
22 render = {
23 bool = lib.boolToString;
24 int = toString;
25 list = lib.concatMapStringsSep "," mkPropertyString;
26 string = lib.id;
27 };
28 in
29 v: render.${builtins.typeOf v} v;
30
31 stringlySettings = lib.mapAttrs (_: mkPropertyString) (
32 lib.filterAttrs (_: v: v != null) cfg.settings
33 );
34
35 generator = (pkgs.formats.javaProperties { }).generate;
36in
37{
38
39 options.services.apache-kafka = {
40 enable = lib.mkEnableOption "Apache Kafka event streaming broker";
41
42 settings = lib.mkOption {
43 description = ''
44 [Kafka broker configuration](https://kafka.apache.org/documentation.html#brokerconfigs)
45 {file}`server.properties`.
46
47 Note that .properties files contain mappings from string to string.
48 Keys with dots are NOT represented by nested attrs in these settings,
49 but instead as quoted strings (ie. `settings."broker.id"`, NOT
50 `settings.broker.id`).
51 '';
52 type = lib.types.submodule {
53 freeformType =
54 with lib.types;
55 let
56 primitive = oneOf [
57 bool
58 int
59 str
60 ];
61 in
62 lazyAttrsOf (nullOr (either primitive (listOf primitive)));
63
64 options = {
65 "broker.id" = lib.mkOption {
66 description = "Broker ID. -1 or null to auto-allocate in zookeeper mode.";
67 default = null;
68 type = with lib.types; nullOr int;
69 };
70
71 "log.dirs" = lib.mkOption {
72 description = "Log file directories.";
73 # Deliberaly leave out old default and use the rewrite opportunity
74 # to have users choose a safer value -- /tmp might be volatile and is a
75 # slightly scary default choice.
76 # default = [ "/tmp/apache-kafka" ];
77 type = with lib.types; listOf path;
78 };
79
80 "listeners" = lib.mkOption {
81 description = ''
82 Kafka Listener List.
83 See [listeners](https://kafka.apache.org/documentation/#brokerconfigs_listeners).
84 '';
85 type = lib.types.listOf lib.types.str;
86 default = [ "PLAINTEXT://localhost:9092" ];
87 };
88 };
89 };
90 };
91
92 clusterId = lib.mkOption {
93 description = ''
94 KRaft mode ClusterId used for formatting log directories. Can be generated with `kafka-storage.sh random-uuid`
95 '';
96 type = with lib.types; nullOr str;
97 default = null;
98 };
99
100 configFiles.serverProperties = lib.mkOption {
101 description = ''
102 Kafka server.properties configuration file path.
103 Defaults to the rendered `settings`.
104 '';
105 type = lib.types.path;
106 };
107
108 configFiles.log4jProperties = lib.mkOption {
109 description = "Kafka log4j property configuration file path";
110 type = lib.types.path;
111 default = pkgs.writeText "log4j.properties" cfg.log4jProperties;
112 defaultText = ''pkgs.writeText "log4j.properties" cfg.log4jProperties'';
113 };
114
115 formatLogDirs = lib.mkOption {
116 description = ''
117 Whether to format log dirs in KRaft mode if all log dirs are
118 unformatted, ie. they contain no meta.properties.
119 '';
120 type = lib.types.bool;
121 default = false;
122 };
123
124 formatLogDirsIgnoreFormatted = lib.mkOption {
125 description = ''
126 Whether to ignore already formatted log dirs when formatting log dirs,
127 instead of failing. Useful when replacing or adding disks.
128 '';
129 type = lib.types.bool;
130 default = false;
131 };
132
133 log4jProperties = lib.mkOption {
134 description = "Kafka log4j property configuration.";
135 default = ''
136 log4j.rootLogger=INFO, stdout
137
138 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
139 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
140 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
141 '';
142 type = lib.types.lines;
143 };
144
145 jvmOptions = lib.mkOption {
146 description = "Extra command line options for the JVM running Kafka.";
147 default = [ ];
148 type = lib.types.listOf lib.types.str;
149 example = [
150 "-Djava.net.preferIPv4Stack=true"
151 "-Dcom.sun.management.jmxremote"
152 "-Dcom.sun.management.jmxremote.local.only=true"
153 ];
154 };
155
156 package = lib.mkPackageOption pkgs "apacheKafka" { };
157
158 jre = lib.mkOption {
159 description = "The JRE with which to run Kafka";
160 default = cfg.package.passthru.jre;
161 defaultText = lib.literalExpression "pkgs.apacheKafka.passthru.jre";
162 type = lib.types.package;
163 };
164 };
165
166 imports = [
167 (lib.mkRenamedOptionModule
168 [ "services" "apache-kafka" "brokerId" ]
169 [ "services" "apache-kafka" "settings" ''broker.id'' ]
170 )
171 (lib.mkRenamedOptionModule
172 [ "services" "apache-kafka" "logDirs" ]
173 [ "services" "apache-kafka" "settings" ''log.dirs'' ]
174 )
175 (lib.mkRenamedOptionModule
176 [ "services" "apache-kafka" "zookeeper" ]
177 [ "services" "apache-kafka" "settings" ''zookeeper.connect'' ]
178 )
179
180 (lib.mkRemovedOptionModule [
181 "services"
182 "apache-kafka"
183 "port"
184 ] "Please see services.apache-kafka.settings.listeners and its documentation instead")
185 (lib.mkRemovedOptionModule [
186 "services"
187 "apache-kafka"
188 "hostname"
189 ] "Please see services.apache-kafka.settings.listeners and its documentation instead")
190 (lib.mkRemovedOptionModule [
191 "services"
192 "apache-kafka"
193 "extraProperties"
194 ] "Please see services.apache-kafka.settings and its documentation instead")
195 (lib.mkRemovedOptionModule [
196 "services"
197 "apache-kafka"
198 "serverProperties"
199 ] "Please see services.apache-kafka.settings and its documentation instead")
200 ];
201
202 config = lib.mkIf cfg.enable {
203 services.apache-kafka.configFiles.serverProperties = generator "server.properties" stringlySettings;
204
205 users.users.apache-kafka = {
206 isSystemUser = true;
207 group = "apache-kafka";
208 description = "Apache Kafka daemon user";
209 };
210 users.groups.apache-kafka = { };
211
212 systemd.tmpfiles.rules = map (
213 logDir: "d '${logDir}' 0700 apache-kafka - - -"
214 ) cfg.settings."log.dirs";
215
216 systemd.services.apache-kafka = {
217 description = "Apache Kafka Daemon";
218 wantedBy = [ "multi-user.target" ];
219 after = [ "network.target" ];
220 preStart = lib.mkIf cfg.formatLogDirs (
221 if cfg.formatLogDirsIgnoreFormatted then
222 ''
223 ${cfg.package}/bin/kafka-storage.sh format -t "${cfg.clusterId}" -c ${cfg.configFiles.serverProperties} --ignore-formatted
224 ''
225 else
226 ''
227 if ${
228 lib.concatMapStringsSep " && " (l: ''[ ! -f "${l}/meta.properties" ]'') cfg.settings."log.dirs"
229 }; then
230 ${cfg.package}/bin/kafka-storage.sh format -t "${cfg.clusterId}" -c ${cfg.configFiles.serverProperties}
231 fi
232 ''
233 );
234 serviceConfig = {
235 ExecStart = ''
236 ${cfg.jre}/bin/java \
237 -cp "${cfg.package}/libs/*" \
238 -Dlog4j.configuration=file:${cfg.configFiles.log4jProperties} \
239 ${toString cfg.jvmOptions} \
240 kafka.Kafka \
241 ${cfg.configFiles.serverProperties}
242 '';
243 User = "apache-kafka";
244 SuccessExitStatus = "0 143";
245 };
246 };
247 };
248
249 meta.doc = ./kafka.md;
250 meta.maintainers = with lib.maintainers; [
251 srhb
252 ];
253}