1{ config, lib, pkgs, ... }:
2
3with lib;
4
5let
6 cfg = config.services.apache-kafka;
7
8 serverProperties =
9 if cfg.serverProperties != null then
10 cfg.serverProperties
11 else
12 ''
13 # Generated by nixos
14 broker.id=${toString cfg.brokerId}
15 port=${toString cfg.port}
16 host.name=${cfg.hostname}
17 log.dirs=${concatStringsSep "," cfg.logDirs}
18 zookeeper.connect=${cfg.zookeeper}
19 ${toString cfg.extraProperties}
20 '';
21
22 configDir = pkgs.buildEnv {
23 name = "apache-kafka-conf";
24 paths = [
25 (pkgs.writeTextDir "server.properties" serverProperties)
26 (pkgs.writeTextDir "log4j.properties" cfg.log4jProperties)
27 ];
28 };
29
30in {
31
32 options.services.apache-kafka = {
33 enable = mkOption {
34 description = "Whether to enable Apache Kafka.";
35 default = false;
36 type = types.bool;
37 };
38
39 brokerId = mkOption {
40 description = "Broker ID.";
41 default = 0;
42 type = types.int;
43 };
44
45 port = mkOption {
46 description = "Port number the broker should listen on.";
47 default = 9092;
48 type = types.int;
49 };
50
51 hostname = mkOption {
52 description = "Hostname the broker should bind to.";
53 default = "localhost";
54 type = types.string;
55 };
56
57 logDirs = mkOption {
58 description = "Log file directories";
59 default = [ "/tmp/kafka-logs" ];
60 type = types.listOf types.path;
61 };
62
63 zookeeper = mkOption {
64 description = "Zookeeper connection string";
65 default = "localhost:2181";
66 type = types.string;
67 };
68
69 extraProperties = mkOption {
70 description = "Extra properties for server.properties.";
71 type = types.nullOr types.lines;
72 default = null;
73 };
74
75 serverProperties = mkOption {
76 description = ''
77 Complete server.properties content. Other server.properties config
78 options will be ignored if this option is used.
79 '';
80 type = types.nullOr types.lines;
81 default = null;
82 };
83
84 log4jProperties = mkOption {
85 description = "Kafka log4j property configuration.";
86 default = ''
87 log4j.rootLogger=INFO, stdout
88
89 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
90 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
91 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
92 '';
93 type = types.lines;
94 };
95
96 jvmOptions = mkOption {
97 description = "Extra command line options for the JVM running Kafka.";
98 default = [
99 "-server"
100 "-Xmx1G"
101 "-Xms1G"
102 "-XX:+UseCompressedOops"
103 "-XX:+UseParNewGC"
104 "-XX:+UseConcMarkSweepGC"
105 "-XX:+CMSClassUnloadingEnabled"
106 "-XX:+CMSScavengeBeforeRemark"
107 "-XX:+DisableExplicitGC"
108 "-Djava.awt.headless=true"
109 "-Djava.net.preferIPv4Stack=true"
110 ];
111 type = types.listOf types.str;
112 example = [
113 "-Djava.net.preferIPv4Stack=true"
114 "-Dcom.sun.management.jmxremote"
115 "-Dcom.sun.management.jmxremote.local.only=true"
116 ];
117 };
118
119 package = mkOption {
120 description = "The kafka package to use";
121 default = pkgs.apacheKafka;
122 defaultText = "pkgs.apacheKafka";
123 type = types.package;
124 };
125
126 };
127
128 config = mkIf cfg.enable {
129
130 environment.systemPackages = [cfg.package];
131
132 users.extraUsers = singleton {
133 name = "apache-kafka";
134 uid = config.ids.uids.apache-kafka;
135 description = "Apache Kafka daemon user";
136 home = head cfg.logDirs;
137 };
138
139 systemd.services.apache-kafka = {
140 description = "Apache Kafka Daemon";
141 wantedBy = [ "multi-user.target" ];
142 after = [ "network-interfaces.target" ];
143 serviceConfig = {
144 ExecStart = ''
145 ${pkgs.jre}/bin/java \
146 -cp "${cfg.package}/libs/*:${configDir}" \
147 ${toString cfg.jvmOptions} \
148 kafka.Kafka \
149 ${configDir}/server.properties
150 '';
151 User = "apache-kafka";
152 PermissionsStartOnly = true;
153 SuccessExitStatus = "0 143";
154 };
155 preStart = ''
156 mkdir -m 0700 -p ${concatStringsSep " " cfg.logDirs}
157 if [ "$(id -u)" = 0 ]; then
158 chown apache-kafka ${concatStringsSep " " cfg.logDirs};
159 fi
160 '';
161 };
162
163 };
164}