Merge pull request #224611 from srhb/kafka-settings-module

Kafka: Settings and KRaft support

Changed files
+261 -82
nixos
doc
manual
release-notes
modules
services
tests
+7
nixos/doc/manual/release-notes/rl-2311.section.md
···
- The binary of the package `cloud-sql-proxy` has changed from `cloud_sql_proxy` to `cloud-sql-proxy`.
+
- The module `services.apache-kafka` was largely rewritten and has certain breaking changes. To be precise, this means that the following things have changed:
+
+
- Most settings have been migrated to [services.apache-kafka.settings](#opt-services.apache-kafka.settings).
+
- Care must be taken when adapting an existing cluster to these changes, see [](#module-services-apache-kafka-migrating-to-settings).
+
- By virtue of being less opinionated, it is now possible to use the module to run Apache Kafka in KRaft mode instead of Zookeeper mode.
+
- [A few options](#module-services-apache-kafka-kraft) have been added to assist in this mode.
+
- Garage has been upgraded to 0.9.x. `services.garage.package` now needs to be explicitly set, so version upgrades can be done in a controlled fashion. For this, we expose `garage_x_y` attributes which can be set here.
- `voms` and `xrootd` now moves the `$out/etc` content to the `$etc` output instead of `$out/etc.orig`, when input argument `externalEtc` is not `null`.
+130 -58
nixos/modules/services/misc/apache-kafka.nix
···
let
cfg = config.services.apache-kafka;
-
serverProperties =
-
if cfg.serverProperties != null then
-
cfg.serverProperties
-
else
-
''
-
# Generated by nixos
-
broker.id=${toString cfg.brokerId}
-
port=${toString cfg.port}
-
host.name=${cfg.hostname}
-
log.dirs=${concatStringsSep "," cfg.logDirs}
-
zookeeper.connect=${cfg.zookeeper}
-
${toString cfg.extraProperties}
-
'';
+
# The `javaProperties` generator takes care of various escaping rules and
+
# generation of the properties file, but we'll handle stringly conversion
+
# ourselves in mkPropertySettings and stringlySettings, since we know more
+
# about the specifically allowed format eg. for lists of this type, and we
+
# don't want to coerce-downsample values to str too early by having the
+
# coercedTypes from javaProperties directly in our NixOS option types.
+
#
+
# Make sure every `freeformType` and any specific option type in `settings` is
+
# supported here.
+
+
mkPropertyString = let
+
render = {
+
bool = boolToString;
+
int = toString;
+
list = concatMapStringsSep "," mkPropertyString;
+
string = id;
+
};
+
in
+
v: render.${builtins.typeOf v} v;
-
serverConfig = pkgs.writeText "server.properties" serverProperties;
-
logConfig = pkgs.writeText "log4j.properties" cfg.log4jProperties;
+
stringlySettings = mapAttrs (_: mkPropertyString)
+
(filterAttrs (_: v: v != null) cfg.settings);
+
generator = (pkgs.formats.javaProperties {}).generate;
in {
options.services.apache-kafka = {
-
enable = mkOption {
-
description = lib.mdDoc "Whether to enable Apache Kafka.";
-
default = false;
-
type = types.bool;
-
};
+
enable = mkEnableOption (lib.mdDoc "Apache Kafka event streaming broker");
-
brokerId = mkOption {
-
description = lib.mdDoc "Broker ID.";
-
default = -1;
-
type = types.int;
-
};
+
settings = mkOption {
+
description = lib.mdDoc ''
+
[Kafka broker configuration](https://kafka.apache.org/documentation.html#brokerconfigs)
+
{file}`server.properties`.
+
+
Note that .properties files contain mappings from string to string.
+
Keys with dots are NOT represented by nested attrs in these settings,
+
but instead as quoted strings (ie. `settings."broker.id"`, NOT
+
`settings.broker.id`).
+
'';
+
type = types.submodule {
+
freeformType = with types; let
+
primitive = oneOf [bool int str];
+
in lazyAttrsOf (nullOr (either primitive (listOf primitive)));
+
+
options = {
+
"broker.id" = mkOption {
+
description = lib.mdDoc "Broker ID. -1 or null to auto-allocate in zookeeper mode.";
+
default = null;
+
type = with types; nullOr int;
+
};
+
+
"log.dirs" = mkOption {
+
description = lib.mdDoc "Log file directories.";
+
# Deliberaly leave out old default and use the rewrite opportunity
+
# to have users choose a safer value -- /tmp might be volatile and is a
+
# slightly scary default choice.
+
# default = [ "/tmp/apache-kafka" ];
+
type = with types; listOf path;
+
};
-
port = mkOption {
-
description = lib.mdDoc "Port number the broker should listen on.";
-
default = 9092;
-
type = types.port;
+
"listeners" = mkOption {
+
description = lib.mdDoc ''
+
Kafka Listener List.
+
See [listeners](https://kafka.apache.org/documentation/#brokerconfigs_listeners).
+
'';
+
type = types.listOf types.str;
+
default = [ "PLAINTEXT://localhost:9092" ];
+
};
+
};
+
};
};
-
hostname = mkOption {
-
description = lib.mdDoc "Hostname the broker should bind to.";
-
default = "localhost";
-
type = types.str;
+
clusterId = mkOption {
+
description = lib.mdDoc ''
+
KRaft mode ClusterId used for formatting log directories. Can be generated with `kafka-storage.sh random-uuid`
+
'';
+
type = with types; nullOr str;
+
default = null;
};
-
logDirs = mkOption {
-
description = lib.mdDoc "Log file directories";
-
default = [ "/tmp/kafka-logs" ];
-
type = types.listOf types.path;
+
configFiles.serverProperties = mkOption {
+
description = lib.mdDoc ''
+
Kafka server.properties configuration file path.
+
Defaults to the rendered `settings`.
+
'';
+
type = types.path;
};
-
zookeeper = mkOption {
-
description = lib.mdDoc "Zookeeper connection string";
-
default = "localhost:2181";
-
type = types.str;
+
configFiles.log4jProperties = mkOption {
+
description = lib.mdDoc "Kafka log4j property configuration file path";
+
type = types.path;
+
default = pkgs.writeText "log4j.properties" cfg.log4jProperties;
+
defaultText = ''pkgs.writeText "log4j.properties" cfg.log4jProperties'';
};
-
extraProperties = mkOption {
-
description = lib.mdDoc "Extra properties for server.properties.";
-
type = types.nullOr types.lines;
-
default = null;
+
formatLogDirs = mkOption {
+
description = lib.mdDoc ''
+
Whether to format log dirs in KRaft mode if all log dirs are
+
unformatted, ie. they contain no meta.properties.
+
'';
+
type = types.bool;
+
default = false;
};
-
serverProperties = mkOption {
+
formatLogDirsIgnoreFormatted = mkOption {
description = lib.mdDoc ''
-
Complete server.properties content. Other server.properties config
-
options will be ignored if this option is used.
+
Whether to ignore already formatted log dirs when formatting log dirs,
+
instead of failing. Useful when replacing or adding disks.
'';
-
type = types.nullOr types.lines;
-
default = null;
+
type = types.bool;
+
default = false;
};
log4jProperties = mkOption {
···
defaultText = literalExpression "pkgs.apacheKafka.passthru.jre";
type = types.package;
};
+
};
+
+
imports = [
+
(mkRenamedOptionModule
+
[ "services" "apache-kafka" "brokerId" ]
+
[ "services" "apache-kafka" "settings" ''broker.id'' ])
+
(mkRenamedOptionModule
+
[ "services" "apache-kafka" "logDirs" ]
+
[ "services" "apache-kafka" "settings" ''log.dirs'' ])
+
(mkRenamedOptionModule
+
[ "services" "apache-kafka" "zookeeper" ]
+
[ "services" "apache-kafka" "settings" ''zookeeper.connect'' ])
-
};
+
(mkRemovedOptionModule [ "services" "apache-kafka" "port" ]
+
"Please see services.apache-kafka.settings.listeners and its documentation instead")
+
(mkRemovedOptionModule [ "services" "apache-kafka" "hostname" ]
+
"Please see services.apache-kafka.settings.listeners and its documentation instead")
+
(mkRemovedOptionModule [ "services" "apache-kafka" "extraProperties" ]
+
"Please see services.apache-kafka.settings and its documentation instead")
+
(mkRemovedOptionModule [ "services" "apache-kafka" "serverProperties" ]
+
"Please see services.apache-kafka.settings and its documentation instead")
+
];
config = mkIf cfg.enable {
-
-
environment.systemPackages = [cfg.package];
+
services.apache-kafka.configFiles.serverProperties = generator "server.properties" stringlySettings;
users.users.apache-kafka = {
isSystemUser = true;
group = "apache-kafka";
description = "Apache Kafka daemon user";
-
home = head cfg.logDirs;
};
users.groups.apache-kafka = {};
-
systemd.tmpfiles.rules = map (logDir: "d '${logDir}' 0700 apache-kafka - - -") cfg.logDirs;
+
systemd.tmpfiles.rules = map (logDir: "d '${logDir}' 0700 apache-kafka - - -") cfg.settings."log.dirs";
systemd.services.apache-kafka = {
description = "Apache Kafka Daemon";
wantedBy = [ "multi-user.target" ];
after = [ "network.target" ];
+
preStart = mkIf cfg.formatLogDirs
+
(if cfg.formatLogDirsIgnoreFormatted then ''
+
${cfg.package}/bin/kafka-storage.sh format -t "${cfg.clusterId}" -c ${cfg.configFiles.serverProperties} --ignore-formatted
+
'' else ''
+
if ${concatMapStringsSep " && " (l: ''[ ! -f "${l}/meta.properties" ]'') cfg.settings."log.dirs"}; then
+
${cfg.package}/bin/kafka-storage.sh format -t "${cfg.clusterId}" -c ${cfg.configFiles.serverProperties}
+
fi
+
'');
serviceConfig = {
ExecStart = ''
${cfg.jre}/bin/java \
-cp "${cfg.package}/libs/*" \
-
-Dlog4j.configuration=file:${logConfig} \
+
-Dlog4j.configuration=file:${cfg.configFiles.log4jProperties} \
${toString cfg.jvmOptions} \
kafka.Kafka \
-
${serverConfig}
+
${cfg.configFiles.serverProperties}
'';
User = "apache-kafka";
SuccessExitStatus = "0 143";
};
};
+
};
-
};
+
meta.doc = ./kafka.md;
+
meta.maintainers = with lib.maintainers; [
+
srhb
+
];
}
+63
nixos/modules/services/misc/kafka.md
···
+
# Apache Kafka {#module-services-apache-kafka}
+
+
[Apache Kafka](https://kafka.apache.org/) is an open-source distributed event
+
streaming platform
+
+
## Basic Usage {#module-services-apache-kafka-basic-usage}
+
+
The Apache Kafka service is configured almost exclusively through its
+
[settings](#opt-services.apache-kafka.settings) option, with each attribute
+
corresponding to the [upstream configuration
+
manual](https://kafka.apache.org/documentation/#configuration) broker settings.
+
+
## KRaft {#module-services-apache-kafka-kraft}
+
+
Unlike in Zookeeper mode, Kafka in
+
[KRaft](https://kafka.apache.org/documentation/#kraft) mode requires each log
+
dir to be "formatted" (which means a cluster-specific a metadata file must
+
exist in each log dir)
+
+
The upstream intention is for users to execute the [storage
+
tool](https://kafka.apache.org/documentation/#kraft_storage) to achieve this,
+
but this module contains a few extra options to automate this:
+
+
- [](#opt-services.apache-kafka.clusterId)
+
- [](#opt-services.apache-kafka.formatLogDirs)
+
- [](#opt-services.apache-kafka.formatLogDirsIgnoreFormatted)
+
+
## Migrating to settings {#module-services-apache-kafka-migrating-to-settings}
+
+
Migrating a cluster to the new `settings`-based changes requires adapting removed options to the corresponding upstream settings.
+
+
This means that the upstream [Broker Configs documentation](https://kafka.apache.org/documentation/#brokerconfigs) should be followed closely.
+
+
Note that dotted options in the upstream docs do _not_ correspond to nested Nix attrsets, but instead as quoted top level `settings` attributes, as in `services.apache-kafka.settings."broker.id"`, *NOT* `services.apache-kafka.settings.broker.id`.
+
+
Care should be taken, especially when migrating clusters from the old module, to ensure that the same intended configuration is reproduced faithfully via `settings`.
+
+
To assist in the comparison, the final config can be inspected by building the config file itself, ie. with: `nix-build <nixpkgs/nixos> -A config.services.apache-kafka.configFiles.serverProperties`.
+
+
Notable changes to be aware of include:
+
+
- Removal of `services.apache-kafka.extraProperties` and `services.apache-kafka.serverProperties`
+
- Translate using arbitrary properties using [](#opt-services.apache-kafka.settings)
+
- [Upstream docs](https://kafka.apache.org/documentation.html#brokerconfigs)
+
- The intention is for all broker properties to be fully representable via [](#opt-services.apache-kafka.settings).
+
- If this is not the case, please do consider raising an issue.
+
- Until it can be remedied, you *can* bail out by using [](#opt-services.apache-kafka.configFiles.serverProperties) to the path of a fully rendered properties file.
+
+
- Removal of `services.apache-kafka.hostname` and `services.apache-kafka.port`
+
- Translate using: `services.apache-kafka.settings.listeners`
+
- [Upstream docs](https://kafka.apache.org/documentation.html#brokerconfigs_listeners)
+
+
- Removal of `services.apache-kafka.logDirs`
+
- Translate using: `services.apache-kafka.settings."log.dirs"`
+
- [Upstream docs](https://kafka.apache.org/documentation.html#brokerconfigs_log.dirs)
+
+
- Removal of `services.apache-kafka.brokerId`
+
- Translate using: `services.apache-kafka.settings."broker.id"`
+
- [Upstream docs](https://kafka.apache.org/documentation.html#brokerconfigs_broker.id)
+
+
- Removal of `services.apache-kafka.zookeeper`
+
- Translate using: `services.apache-kafka.settings."zookeeper.connect"`
+
- [Upstream docs](https://kafka.apache.org/documentation.html#brokerconfigs_zookeeper.connect)
+61 -24
nixos/tests/kafka.nix
···
with pkgs.lib;
let
-
makeKafkaTest = name: kafkaPackage: (import ./make-test-python.nix ({
+
makeKafkaTest = name: { kafkaPackage, mode ? "zookeeper" }: (import ./make-test-python.nix ({
inherit name;
meta = with pkgs.lib.maintainers; {
maintainers = [ nequissimus ];
};
nodes = {
+
kafka = { ... }: {
+
services.apache-kafka = mkMerge [
+
({
+
enable = true;
+
package = kafkaPackage;
+
settings = {
+
"offsets.topic.replication.factor" = 1;
+
"log.dirs" = [
+
"/var/lib/kafka/logdir1"
+
"/var/lib/kafka/logdir2"
+
];
+
};
+
})
+
(mkIf (mode == "zookeeper") {
+
settings = {
+
"zookeeper.session.timeout.ms" = 600000;
+
"zookeeper.connect" = [ "zookeeper1:2181" ];
+
};
+
})
+
(mkIf (mode == "kraft") {
+
clusterId = "ak2fIHr4S8WWarOF_ODD0g";
+
formatLogDirs = true;
+
settings = {
+
"node.id" = 1;
+
"process.roles" = [
+
"broker"
+
"controller"
+
];
+
"listeners" = [
+
"PLAINTEXT://:9092"
+
"CONTROLLER://:9093"
+
];
+
"listener.security.protocol.map" = [
+
"PLAINTEXT:PLAINTEXT"
+
"CONTROLLER:PLAINTEXT"
+
];
+
"controller.quorum.voters" = [
+
"1@kafka:9093"
+
];
+
"controller.listener.names" = [ "CONTROLLER" ];
+
};
+
})
+
];
+
+
networking.firewall.allowedTCPPorts = [ 9092 9093 ];
+
# i686 tests: qemu-system-i386 can simulate max 2047MB RAM (not 2048)
+
virtualisation.memorySize = 2047;
+
};
+
} // optionalAttrs (mode == "zookeeper") {
zookeeper1 = { ... }: {
services.zookeeper = {
enable = true;
···
networking.firewall.allowedTCPPorts = [ 2181 ];
};
-
kafka = { ... }: {
-
services.apache-kafka = {
-
enable = true;
-
extraProperties = ''
-
offsets.topic.replication.factor = 1
-
zookeeper.session.timeout.ms = 600000
-
'';
-
package = kafkaPackage;
-
zookeeper = "zookeeper1:2181";
-
};
-
-
networking.firewall.allowedTCPPorts = [ 9092 ];
-
# i686 tests: qemu-system-i386 can simulate max 2047MB RAM (not 2048)
-
virtualisation.memorySize = 2047;
-
};
};
testScript = ''
start_all()
+
${optionalString (mode == "zookeeper") ''
zookeeper1.wait_for_unit("default.target")
zookeeper1.wait_for_unit("zookeeper.service")
zookeeper1.wait_for_open_port(2181)
+
''}
kafka.wait_for_unit("default.target")
kafka.wait_for_unit("apache-kafka.service")
···
}) { inherit system; });
in with pkgs; {
-
kafka_2_8 = makeKafkaTest "kafka_2_8" apacheKafka_2_8;
-
kafka_3_0 = makeKafkaTest "kafka_3_0" apacheKafka_3_0;
-
kafka_3_1 = makeKafkaTest "kafka_3_1" apacheKafka_3_1;
-
kafka_3_2 = makeKafkaTest "kafka_3_2" apacheKafka_3_2;
-
kafka_3_3 = makeKafkaTest "kafka_3_3" apacheKafka_3_3;
-
kafka_3_4 = makeKafkaTest "kafka_3_4" apacheKafka_3_4;
-
kafka_3_5 = makeKafkaTest "kafka_3_5" apacheKafka_3_5;
-
kafka = makeKafkaTest "kafka" apacheKafka;
+
kafka_2_8 = makeKafkaTest "kafka_2_8" { kafkaPackage = apacheKafka_2_8; };
+
kafka_3_0 = makeKafkaTest "kafka_3_0" { kafkaPackage = apacheKafka_3_0; };
+
kafka_3_1 = makeKafkaTest "kafka_3_1" { kafkaPackage = apacheKafka_3_1; };
+
kafka_3_2 = makeKafkaTest "kafka_3_2" { kafkaPackage = apacheKafka_3_2; };
+
kafka_3_3 = makeKafkaTest "kafka_3_3" { kafkaPackage = apacheKafka_3_3; };
+
kafka_3_4 = makeKafkaTest "kafka_3_4" { kafkaPackage = apacheKafka_3_4; };
+
kafka_3_5 = makeKafkaTest "kafka_3_5" { kafkaPackage = apacheKafka_3_5; };
+
kafka = makeKafkaTest "kafka" { kafkaPackage = apacheKafka; };
+
kafka_kraft = makeKafkaTest "kafka_kraft" { kafkaPackage = apacheKafka; mode = "kraft"; };
}