1{ config, lib, pkgs, ... }:
2
3with lib;
4
5let
6 cfg = config.services.apache-kafka;
7
8 serverProperties =
9 if cfg.serverProperties != null then
10 cfg.serverProperties
11 else
12 ''
13 # Generated by nixos
14 broker.id=${toString cfg.brokerId}
15 port=${toString cfg.port}
16 host.name=${cfg.hostname}
17 log.dirs=${concatStringsSep "," cfg.logDirs}
18 zookeeper.connect=${cfg.zookeeper}
19 ${toString cfg.extraProperties}
20 '';
21
22 configDir = pkgs.buildEnv {
23 name = "apache-kafka-conf";
24 paths = [
25 (pkgs.writeTextDir "server.properties" serverProperties)
26 (pkgs.writeTextDir "log4j.properties" cfg.log4jProperties)
27 ];
28 };
29
30in {
31
32 options.services.apache-kafka = {
33 enable = mkOption {
34 description = "Whether to enable Apache Kafka.";
35 default = false;
36 type = types.bool;
37 };
38
39 brokerId = mkOption {
40 description = "Broker ID.";
41 default = 0;
42 type = types.int;
43 };
44
45 port = mkOption {
46 description = "Port number the broker should listen on.";
47 default = 9092;
48 type = types.int;
49 };
50
51 hostname = mkOption {
52 description = "Hostname the broker should bind to.";
53 default = "localhost";
54 type = types.string;
55 };
56
57 logDirs = mkOption {
58 description = "Log file directories";
59 default = [ "/tmp/kafka-logs" ];
60 type = types.listOf types.path;
61 };
62
63 zookeeper = mkOption {
64 description = "Zookeeper connection string";
65 default = "localhost:2181";
66 type = types.string;
67 };
68
69 extraProperties = mkOption {
70 description = "Extra properties for server.properties.";
71 type = types.nullOr types.lines;
72 default = null;
73 };
74
75 serverProperties = mkOption {
76 description = ''
77 Complete server.properties content. Other server.properties config
78 options will be ignored if this option is used.
79 '';
80 type = types.nullOr types.lines;
81 default = null;
82 };
83
84 log4jProperties = mkOption {
85 description = "Kafka log4j property configuration.";
86 default = ''
87 log4j.rootLogger=INFO, stdout
88
89 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
90 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
91 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
92 '';
93 type = types.lines;
94 };
95
96 jvmOptions = mkOption {
97 description = "Extra command line options for the JVM running Kafka.";
98 default = [
99 "-server"
100 "-Xmx1G"
101 "-Xms1G"
102 "-XX:+UseCompressedOops"
103 "-XX:+UseParNewGC"
104 "-XX:+UseConcMarkSweepGC"
105 "-XX:+CMSClassUnloadingEnabled"
106 "-XX:+CMSScavengeBeforeRemark"
107 "-XX:+DisableExplicitGC"
108 "-Djava.awt.headless=true"
109 "-Djava.net.preferIPv4Stack=true"
110 ];
111 type = types.listOf types.str;
112 example = [
113 "-Djava.net.preferIPv4Stack=true"
114 "-Dcom.sun.management.jmxremote"
115 "-Dcom.sun.management.jmxremote.local.only=true"
116 ];
117 };
118
119 package = mkOption {
120 description = "The kafka package to use";
121
122 default = pkgs.apacheKafka;
123
124 type = types.package;
125 };
126
127 };
128
129 config = mkIf cfg.enable {
130
131 environment.systemPackages = [cfg.package];
132
133 users.extraUsers = singleton {
134 name = "apache-kafka";
135 uid = config.ids.uids.apache-kafka;
136 description = "Apache Kafka daemon user";
137 home = head cfg.logDirs;
138 };
139
140 systemd.services.apache-kafka = {
141 description = "Apache Kafka Daemon";
142 wantedBy = [ "multi-user.target" ];
143 after = [ "network-interfaces.target" ];
144 serviceConfig = {
145 ExecStart = ''
146 ${pkgs.jre}/bin/java \
147 -cp "${cfg.package}/libs/*:${configDir}" \
148 ${toString cfg.jvmOptions} \
149 kafka.Kafka \
150 ${configDir}/server.properties
151 '';
152 User = "apache-kafka";
153 PermissionsStartOnly = true;
154 SuccessExitStatus = "0 143";
155 };
156 preStart = ''
157 mkdir -m 0700 -p ${concatStringsSep " " cfg.logDirs}
158 if [ "$(id -u)" = 0 ]; then
159 chown apache-kafka ${concatStringsSep " " cfg.logDirs};
160 fi
161 '';
162 };
163
164 };
165}