at master 8.1 kB view raw
1{ 2 config, 3 lib, 4 pkgs, 5 ... 6}: 7let 8 cfg = config.services.apache-kafka; 9 10 # The `javaProperties` generator takes care of various escaping rules and 11 # generation of the properties file, but we'll handle stringly conversion 12 # ourselves in mkPropertySettings and stringlySettings, since we know more 13 # about the specifically allowed format eg. for lists of this type, and we 14 # don't want to coerce-downsample values to str too early by having the 15 # coercedTypes from javaProperties directly in our NixOS option types. 16 # 17 # Make sure every `freeformType` and any specific option type in `settings` is 18 # supported here. 19 20 mkPropertyString = 21 let 22 render = { 23 bool = lib.boolToString; 24 int = toString; 25 list = lib.concatMapStringsSep "," mkPropertyString; 26 string = lib.id; 27 }; 28 in 29 v: render.${builtins.typeOf v} v; 30 31 stringlySettings = lib.mapAttrs (_: mkPropertyString) ( 32 lib.filterAttrs (_: v: v != null) cfg.settings 33 ); 34 35 generator = (pkgs.formats.javaProperties { }).generate; 36in 37{ 38 39 options.services.apache-kafka = { 40 enable = lib.mkEnableOption "Apache Kafka event streaming broker"; 41 42 settings = lib.mkOption { 43 description = '' 44 [Kafka broker configuration](https://kafka.apache.org/documentation.html#brokerconfigs) 45 {file}`server.properties`. 46 47 Note that .properties files contain mappings from string to string. 48 Keys with dots are NOT represented by nested attrs in these settings, 49 but instead as quoted strings (ie. `settings."broker.id"`, NOT 50 `settings.broker.id`). 51 ''; 52 type = lib.types.submodule { 53 freeformType = 54 with lib.types; 55 let 56 primitive = oneOf [ 57 bool 58 int 59 str 60 ]; 61 in 62 lazyAttrsOf (nullOr (either primitive (listOf primitive))); 63 64 options = { 65 "broker.id" = lib.mkOption { 66 description = "Broker ID. -1 or null to auto-allocate in zookeeper mode."; 67 default = null; 68 type = with lib.types; nullOr int; 69 }; 70 71 "log.dirs" = lib.mkOption { 72 description = "Log file directories."; 73 # Deliberaly leave out old default and use the rewrite opportunity 74 # to have users choose a safer value -- /tmp might be volatile and is a 75 # slightly scary default choice. 76 # default = [ "/tmp/apache-kafka" ]; 77 type = with lib.types; listOf path; 78 }; 79 80 "listeners" = lib.mkOption { 81 description = '' 82 Kafka Listener List. 83 See [listeners](https://kafka.apache.org/documentation/#brokerconfigs_listeners). 84 ''; 85 type = lib.types.listOf lib.types.str; 86 default = [ "PLAINTEXT://localhost:9092" ]; 87 }; 88 }; 89 }; 90 }; 91 92 clusterId = lib.mkOption { 93 description = '' 94 KRaft mode ClusterId used for formatting log directories. Can be generated with `kafka-storage.sh random-uuid` 95 ''; 96 type = with lib.types; nullOr str; 97 default = null; 98 }; 99 100 configFiles.serverProperties = lib.mkOption { 101 description = '' 102 Kafka server.properties configuration file path. 103 Defaults to the rendered `settings`. 104 ''; 105 type = lib.types.path; 106 }; 107 108 configFiles.log4jProperties = lib.mkOption { 109 description = "Kafka log4j property configuration file path"; 110 type = lib.types.path; 111 default = pkgs.writeText "log4j.properties" cfg.log4jProperties; 112 defaultText = ''pkgs.writeText "log4j.properties" cfg.log4jProperties''; 113 }; 114 115 formatLogDirs = lib.mkOption { 116 description = '' 117 Whether to format log dirs in KRaft mode if all log dirs are 118 unformatted, ie. they contain no meta.properties. 119 ''; 120 type = lib.types.bool; 121 default = false; 122 }; 123 124 formatLogDirsIgnoreFormatted = lib.mkOption { 125 description = '' 126 Whether to ignore already formatted log dirs when formatting log dirs, 127 instead of failing. Useful when replacing or adding disks. 128 ''; 129 type = lib.types.bool; 130 default = false; 131 }; 132 133 log4jProperties = lib.mkOption { 134 description = "Kafka log4j property configuration."; 135 default = '' 136 log4j.rootLogger=INFO, stdout 137 138 log4j.appender.stdout=org.apache.log4j.ConsoleAppender 139 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 140 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n 141 ''; 142 type = lib.types.lines; 143 }; 144 145 jvmOptions = lib.mkOption { 146 description = "Extra command line options for the JVM running Kafka."; 147 default = [ ]; 148 type = lib.types.listOf lib.types.str; 149 example = [ 150 "-Djava.net.preferIPv4Stack=true" 151 "-Dcom.sun.management.jmxremote" 152 "-Dcom.sun.management.jmxremote.local.only=true" 153 ]; 154 }; 155 156 package = lib.mkPackageOption pkgs "apacheKafka" { }; 157 158 jre = lib.mkOption { 159 description = "The JRE with which to run Kafka"; 160 default = cfg.package.passthru.jre; 161 defaultText = lib.literalExpression "pkgs.apacheKafka.passthru.jre"; 162 type = lib.types.package; 163 }; 164 }; 165 166 imports = [ 167 (lib.mkRenamedOptionModule 168 [ "services" "apache-kafka" "brokerId" ] 169 [ "services" "apache-kafka" "settings" ''broker.id'' ] 170 ) 171 (lib.mkRenamedOptionModule 172 [ "services" "apache-kafka" "logDirs" ] 173 [ "services" "apache-kafka" "settings" ''log.dirs'' ] 174 ) 175 (lib.mkRenamedOptionModule 176 [ "services" "apache-kafka" "zookeeper" ] 177 [ "services" "apache-kafka" "settings" ''zookeeper.connect'' ] 178 ) 179 180 (lib.mkRemovedOptionModule [ 181 "services" 182 "apache-kafka" 183 "port" 184 ] "Please see services.apache-kafka.settings.listeners and its documentation instead") 185 (lib.mkRemovedOptionModule [ 186 "services" 187 "apache-kafka" 188 "hostname" 189 ] "Please see services.apache-kafka.settings.listeners and its documentation instead") 190 (lib.mkRemovedOptionModule [ 191 "services" 192 "apache-kafka" 193 "extraProperties" 194 ] "Please see services.apache-kafka.settings and its documentation instead") 195 (lib.mkRemovedOptionModule [ 196 "services" 197 "apache-kafka" 198 "serverProperties" 199 ] "Please see services.apache-kafka.settings and its documentation instead") 200 ]; 201 202 config = lib.mkIf cfg.enable { 203 services.apache-kafka.configFiles.serverProperties = generator "server.properties" stringlySettings; 204 205 users.users.apache-kafka = { 206 isSystemUser = true; 207 group = "apache-kafka"; 208 description = "Apache Kafka daemon user"; 209 }; 210 users.groups.apache-kafka = { }; 211 212 systemd.tmpfiles.rules = map ( 213 logDir: "d '${logDir}' 0700 apache-kafka - - -" 214 ) cfg.settings."log.dirs"; 215 216 systemd.services.apache-kafka = { 217 description = "Apache Kafka Daemon"; 218 wantedBy = [ "multi-user.target" ]; 219 after = [ "network.target" ]; 220 preStart = lib.mkIf cfg.formatLogDirs ( 221 if cfg.formatLogDirsIgnoreFormatted then 222 '' 223 ${cfg.package}/bin/kafka-storage.sh format -t "${cfg.clusterId}" -c ${cfg.configFiles.serverProperties} --ignore-formatted 224 '' 225 else 226 '' 227 if ${ 228 lib.concatMapStringsSep " && " (l: ''[ ! -f "${l}/meta.properties" ]'') cfg.settings."log.dirs" 229 }; then 230 ${cfg.package}/bin/kafka-storage.sh format -t "${cfg.clusterId}" -c ${cfg.configFiles.serverProperties} 231 fi 232 '' 233 ); 234 serviceConfig = { 235 ExecStart = '' 236 ${cfg.jre}/bin/java \ 237 -cp "${cfg.package}/libs/*" \ 238 -Dlog4j.configuration=file:${cfg.configFiles.log4jProperties} \ 239 ${toString cfg.jvmOptions} \ 240 kafka.Kafka \ 241 ${cfg.configFiles.serverProperties} 242 ''; 243 User = "apache-kafka"; 244 SuccessExitStatus = "0 143"; 245 }; 246 }; 247 }; 248 249 meta.doc = ./kafka.md; 250 meta.maintainers = with lib.maintainers; [ 251 srhb 252 ]; 253}