1{ config, lib, pkgs, ... }:
2
3with lib;
4
5let
6 cfg = config.services.apache-kafka;
7
8 serverProperties =
9 if cfg.serverProperties != null then
10 cfg.serverProperties
11 else
12 ''
13 # Generated by nixos
14 broker.id=${toString cfg.brokerId}
15 port=${toString cfg.port}
16 host.name=${cfg.hostname}
17 log.dirs=${concatStringsSep "," cfg.logDirs}
18 zookeeper.connect=${cfg.zookeeper}
19 ${toString cfg.extraProperties}
20 '';
21
22 serverConfig = pkgs.writeText "server.properties" serverProperties;
23 logConfig = pkgs.writeText "log4j.properties" cfg.log4jProperties;
24
25in {
26
27 options.services.apache-kafka = {
28 enable = mkOption {
29 description = "Whether to enable Apache Kafka.";
30 default = false;
31 type = types.bool;
32 };
33
34 brokerId = mkOption {
35 description = "Broker ID.";
36 default = -1;
37 type = types.int;
38 };
39
40 port = mkOption {
41 description = "Port number the broker should listen on.";
42 default = 9092;
43 type = types.int;
44 };
45
46 hostname = mkOption {
47 description = "Hostname the broker should bind to.";
48 default = "localhost";
49 type = types.string;
50 };
51
52 logDirs = mkOption {
53 description = "Log file directories";
54 default = [ "/tmp/kafka-logs" ];
55 type = types.listOf types.path;
56 };
57
58 zookeeper = mkOption {
59 description = "Zookeeper connection string";
60 default = "localhost:2181";
61 type = types.string;
62 };
63
64 extraProperties = mkOption {
65 description = "Extra properties for server.properties.";
66 type = types.nullOr types.lines;
67 default = null;
68 };
69
70 serverProperties = mkOption {
71 description = ''
72 Complete server.properties content. Other server.properties config
73 options will be ignored if this option is used.
74 '';
75 type = types.nullOr types.lines;
76 default = null;
77 };
78
79 log4jProperties = mkOption {
80 description = "Kafka log4j property configuration.";
81 default = ''
82 log4j.rootLogger=INFO, stdout
83
84 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
85 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
86 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
87 '';
88 type = types.lines;
89 };
90
91 jvmOptions = mkOption {
92 description = "Extra command line options for the JVM running Kafka.";
93 default = [
94 "-server"
95 "-Xmx1G"
96 "-Xms1G"
97 "-XX:+UseCompressedOops"
98 "-XX:+UseParNewGC"
99 "-XX:+UseConcMarkSweepGC"
100 "-XX:+CMSClassUnloadingEnabled"
101 "-XX:+CMSScavengeBeforeRemark"
102 "-XX:+DisableExplicitGC"
103 "-Djava.awt.headless=true"
104 "-Djava.net.preferIPv4Stack=true"
105 ];
106 type = types.listOf types.str;
107 example = [
108 "-Djava.net.preferIPv4Stack=true"
109 "-Dcom.sun.management.jmxremote"
110 "-Dcom.sun.management.jmxremote.local.only=true"
111 ];
112 };
113
114 package = mkOption {
115 description = "The kafka package to use";
116 default = pkgs.apacheKafka;
117 defaultText = "pkgs.apacheKafka";
118 type = types.package;
119 };
120
121 };
122
123 config = mkIf cfg.enable {
124
125 environment.systemPackages = [cfg.package];
126
127 users.extraUsers = singleton {
128 name = "apache-kafka";
129 uid = config.ids.uids.apache-kafka;
130 description = "Apache Kafka daemon user";
131 home = head cfg.logDirs;
132 };
133
134 systemd.services.apache-kafka = {
135 description = "Apache Kafka Daemon";
136 wantedBy = [ "multi-user.target" ];
137 after = [ "network.target" ];
138 serviceConfig = {
139 ExecStart = ''
140 ${pkgs.jre}/bin/java \
141 -cp "${cfg.package}/libs/*" \
142 -Dlog4j.configuration=file:${logConfig} \
143 ${toString cfg.jvmOptions} \
144 kafka.Kafka \
145 ${serverConfig}
146 '';
147 User = "apache-kafka";
148 PermissionsStartOnly = true;
149 SuccessExitStatus = "0 143";
150 };
151 preStart = ''
152 mkdir -m 0700 -p ${concatStringsSep " " cfg.logDirs}
153 if [ "$(id -u)" = 0 ]; then
154 chown apache-kafka ${concatStringsSep " " cfg.logDirs};
155 fi
156 '';
157 };
158
159 };
160}