1{ config, lib, pkgs, ... }:
2
3with lib;
4
5let
6 cfg = config.services.apache-kafka;
7
8 serverProperties =
9 if cfg.serverProperties != null then
10 cfg.serverProperties
11 else
12 ''
13 # Generated by nixos
14 broker.id=${toString cfg.brokerId}
15 port=${toString cfg.port}
16 host.name=${cfg.hostname}
17 log.dirs=${concatStringsSep "," cfg.logDirs}
18 zookeeper.connect=${cfg.zookeeper}
19 ${toString cfg.extraProperties}
20 '';
21
22 serverConfig = pkgs.writeText "server.properties" serverProperties;
23 logConfig = pkgs.writeText "log4j.properties" cfg.log4jProperties;
24
25in {
26
27 options.services.apache-kafka = {
28 enable = mkOption {
29 description = "Whether to enable Apache Kafka.";
30 default = false;
31 type = types.bool;
32 };
33
34 brokerId = mkOption {
35 description = "Broker ID.";
36 default = -1;
37 type = types.int;
38 };
39
40 port = mkOption {
41 description = "Port number the broker should listen on.";
42 default = 9092;
43 type = types.int;
44 };
45
46 hostname = mkOption {
47 description = "Hostname the broker should bind to.";
48 default = "localhost";
49 type = types.str;
50 };
51
52 logDirs = mkOption {
53 description = "Log file directories";
54 default = [ "/tmp/kafka-logs" ];
55 type = types.listOf types.path;
56 };
57
58 zookeeper = mkOption {
59 description = "Zookeeper connection string";
60 default = "localhost:2181";
61 type = types.str;
62 };
63
64 extraProperties = mkOption {
65 description = "Extra properties for server.properties.";
66 type = types.nullOr types.lines;
67 default = null;
68 };
69
70 serverProperties = mkOption {
71 description = ''
72 Complete server.properties content. Other server.properties config
73 options will be ignored if this option is used.
74 '';
75 type = types.nullOr types.lines;
76 default = null;
77 };
78
79 log4jProperties = mkOption {
80 description = "Kafka log4j property configuration.";
81 default = ''
82 log4j.rootLogger=INFO, stdout
83
84 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
85 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
86 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
87 '';
88 type = types.lines;
89 };
90
91 jvmOptions = mkOption {
92 description = "Extra command line options for the JVM running Kafka.";
93 default = [];
94 type = types.listOf types.str;
95 example = [
96 "-Djava.net.preferIPv4Stack=true"
97 "-Dcom.sun.management.jmxremote"
98 "-Dcom.sun.management.jmxremote.local.only=true"
99 ];
100 };
101
102 package = mkOption {
103 description = "The kafka package to use";
104 default = pkgs.apacheKafka;
105 defaultText = literalExpression "pkgs.apacheKafka";
106 type = types.package;
107 };
108
109 jre = mkOption {
110 description = "The JRE with which to run Kafka";
111 default = cfg.package.passthru.jre;
112 defaultText = literalExpression "pkgs.apacheKafka.passthru.jre";
113 type = types.package;
114 };
115
116 };
117
118 config = mkIf cfg.enable {
119
120 environment.systemPackages = [cfg.package];
121
122 users.users.apache-kafka = {
123 isSystemUser = true;
124 group = "apache-kafka";
125 description = "Apache Kafka daemon user";
126 home = head cfg.logDirs;
127 };
128 users.groups.apache-kafka = {};
129
130 systemd.tmpfiles.rules = map (logDir: "d '${logDir}' 0700 apache-kafka - - -") cfg.logDirs;
131
132 systemd.services.apache-kafka = {
133 description = "Apache Kafka Daemon";
134 wantedBy = [ "multi-user.target" ];
135 after = [ "network.target" ];
136 serviceConfig = {
137 ExecStart = ''
138 ${cfg.jre}/bin/java \
139 -cp "${cfg.package}/libs/*" \
140 -Dlog4j.configuration=file:${logConfig} \
141 ${toString cfg.jvmOptions} \
142 kafka.Kafka \
143 ${serverConfig}
144 '';
145 User = "apache-kafka";
146 SuccessExitStatus = "0 143";
147 };
148 };
149
150 };
151}