at 24.11-pre 7.7 kB view raw
1{ config, lib, pkgs, ... }: 2 3with lib; 4 5let 6 cfg = config.services.apache-kafka; 7 8 # The `javaProperties` generator takes care of various escaping rules and 9 # generation of the properties file, but we'll handle stringly conversion 10 # ourselves in mkPropertySettings and stringlySettings, since we know more 11 # about the specifically allowed format eg. for lists of this type, and we 12 # don't want to coerce-downsample values to str too early by having the 13 # coercedTypes from javaProperties directly in our NixOS option types. 14 # 15 # Make sure every `freeformType` and any specific option type in `settings` is 16 # supported here. 17 18 mkPropertyString = let 19 render = { 20 bool = boolToString; 21 int = toString; 22 list = concatMapStringsSep "," mkPropertyString; 23 string = id; 24 }; 25 in 26 v: render.${builtins.typeOf v} v; 27 28 stringlySettings = mapAttrs (_: mkPropertyString) 29 (filterAttrs (_: v: v != null) cfg.settings); 30 31 generator = (pkgs.formats.javaProperties {}).generate; 32in { 33 34 options.services.apache-kafka = { 35 enable = mkEnableOption "Apache Kafka event streaming broker"; 36 37 settings = mkOption { 38 description = '' 39 [Kafka broker configuration](https://kafka.apache.org/documentation.html#brokerconfigs) 40 {file}`server.properties`. 41 42 Note that .properties files contain mappings from string to string. 43 Keys with dots are NOT represented by nested attrs in these settings, 44 but instead as quoted strings (ie. `settings."broker.id"`, NOT 45 `settings.broker.id`). 46 ''; 47 type = types.submodule { 48 freeformType = with types; let 49 primitive = oneOf [bool int str]; 50 in lazyAttrsOf (nullOr (either primitive (listOf primitive))); 51 52 options = { 53 "broker.id" = mkOption { 54 description = "Broker ID. -1 or null to auto-allocate in zookeeper mode."; 55 default = null; 56 type = with types; nullOr int; 57 }; 58 59 "log.dirs" = mkOption { 60 description = "Log file directories."; 61 # Deliberaly leave out old default and use the rewrite opportunity 62 # to have users choose a safer value -- /tmp might be volatile and is a 63 # slightly scary default choice. 64 # default = [ "/tmp/apache-kafka" ]; 65 type = with types; listOf path; 66 }; 67 68 "listeners" = mkOption { 69 description = '' 70 Kafka Listener List. 71 See [listeners](https://kafka.apache.org/documentation/#brokerconfigs_listeners). 72 ''; 73 type = types.listOf types.str; 74 default = [ "PLAINTEXT://localhost:9092" ]; 75 }; 76 }; 77 }; 78 }; 79 80 clusterId = mkOption { 81 description = '' 82 KRaft mode ClusterId used for formatting log directories. Can be generated with `kafka-storage.sh random-uuid` 83 ''; 84 type = with types; nullOr str; 85 default = null; 86 }; 87 88 configFiles.serverProperties = mkOption { 89 description = '' 90 Kafka server.properties configuration file path. 91 Defaults to the rendered `settings`. 92 ''; 93 type = types.path; 94 }; 95 96 configFiles.log4jProperties = mkOption { 97 description = "Kafka log4j property configuration file path"; 98 type = types.path; 99 default = pkgs.writeText "log4j.properties" cfg.log4jProperties; 100 defaultText = ''pkgs.writeText "log4j.properties" cfg.log4jProperties''; 101 }; 102 103 formatLogDirs = mkOption { 104 description = '' 105 Whether to format log dirs in KRaft mode if all log dirs are 106 unformatted, ie. they contain no meta.properties. 107 ''; 108 type = types.bool; 109 default = false; 110 }; 111 112 formatLogDirsIgnoreFormatted = mkOption { 113 description = '' 114 Whether to ignore already formatted log dirs when formatting log dirs, 115 instead of failing. Useful when replacing or adding disks. 116 ''; 117 type = types.bool; 118 default = false; 119 }; 120 121 log4jProperties = mkOption { 122 description = "Kafka log4j property configuration."; 123 default = '' 124 log4j.rootLogger=INFO, stdout 125 126 log4j.appender.stdout=org.apache.log4j.ConsoleAppender 127 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 128 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n 129 ''; 130 type = types.lines; 131 }; 132 133 jvmOptions = mkOption { 134 description = "Extra command line options for the JVM running Kafka."; 135 default = []; 136 type = types.listOf types.str; 137 example = [ 138 "-Djava.net.preferIPv4Stack=true" 139 "-Dcom.sun.management.jmxremote" 140 "-Dcom.sun.management.jmxremote.local.only=true" 141 ]; 142 }; 143 144 package = mkPackageOption pkgs "apacheKafka" { }; 145 146 jre = mkOption { 147 description = "The JRE with which to run Kafka"; 148 default = cfg.package.passthru.jre; 149 defaultText = literalExpression "pkgs.apacheKafka.passthru.jre"; 150 type = types.package; 151 }; 152 }; 153 154 imports = [ 155 (mkRenamedOptionModule 156 [ "services" "apache-kafka" "brokerId" ] 157 [ "services" "apache-kafka" "settings" ''broker.id'' ]) 158 (mkRenamedOptionModule 159 [ "services" "apache-kafka" "logDirs" ] 160 [ "services" "apache-kafka" "settings" ''log.dirs'' ]) 161 (mkRenamedOptionModule 162 [ "services" "apache-kafka" "zookeeper" ] 163 [ "services" "apache-kafka" "settings" ''zookeeper.connect'' ]) 164 165 (mkRemovedOptionModule [ "services" "apache-kafka" "port" ] 166 "Please see services.apache-kafka.settings.listeners and its documentation instead") 167 (mkRemovedOptionModule [ "services" "apache-kafka" "hostname" ] 168 "Please see services.apache-kafka.settings.listeners and its documentation instead") 169 (mkRemovedOptionModule [ "services" "apache-kafka" "extraProperties" ] 170 "Please see services.apache-kafka.settings and its documentation instead") 171 (mkRemovedOptionModule [ "services" "apache-kafka" "serverProperties" ] 172 "Please see services.apache-kafka.settings and its documentation instead") 173 ]; 174 175 config = mkIf cfg.enable { 176 services.apache-kafka.configFiles.serverProperties = generator "server.properties" stringlySettings; 177 178 users.users.apache-kafka = { 179 isSystemUser = true; 180 group = "apache-kafka"; 181 description = "Apache Kafka daemon user"; 182 }; 183 users.groups.apache-kafka = {}; 184 185 systemd.tmpfiles.rules = map (logDir: "d '${logDir}' 0700 apache-kafka - - -") cfg.settings."log.dirs"; 186 187 systemd.services.apache-kafka = { 188 description = "Apache Kafka Daemon"; 189 wantedBy = [ "multi-user.target" ]; 190 after = [ "network.target" ]; 191 preStart = mkIf cfg.formatLogDirs 192 (if cfg.formatLogDirsIgnoreFormatted then '' 193 ${cfg.package}/bin/kafka-storage.sh format -t "${cfg.clusterId}" -c ${cfg.configFiles.serverProperties} --ignore-formatted 194 '' else '' 195 if ${concatMapStringsSep " && " (l: ''[ ! -f "${l}/meta.properties" ]'') cfg.settings."log.dirs"}; then 196 ${cfg.package}/bin/kafka-storage.sh format -t "${cfg.clusterId}" -c ${cfg.configFiles.serverProperties} 197 fi 198 ''); 199 serviceConfig = { 200 ExecStart = '' 201 ${cfg.jre}/bin/java \ 202 -cp "${cfg.package}/libs/*" \ 203 -Dlog4j.configuration=file:${cfg.configFiles.log4jProperties} \ 204 ${toString cfg.jvmOptions} \ 205 kafka.Kafka \ 206 ${cfg.configFiles.serverProperties} 207 ''; 208 User = "apache-kafka"; 209 SuccessExitStatus = "0 143"; 210 }; 211 }; 212 }; 213 214 meta.doc = ./kafka.md; 215 meta.maintainers = with lib.maintainers; [ 216 srhb 217 ]; 218}