1{ config, lib, pkgs, ... }: 2 3with lib; 4 5let 6 cfg = config.services.apache-kafka; 7 8 serverProperties = 9 if cfg.serverProperties != null then 10 cfg.serverProperties 11 else 12 '' 13 # Generated by nixos 14 broker.id=${toString cfg.brokerId} 15 port=${toString cfg.port} 16 host.name=${cfg.hostname} 17 log.dirs=${concatStringsSep "," cfg.logDirs} 18 zookeeper.connect=${cfg.zookeeper} 19 ${toString cfg.extraProperties} 20 ''; 21 22 configDir = pkgs.buildEnv { 23 name = "apache-kafka-conf"; 24 paths = [ 25 (pkgs.writeTextDir "server.properties" serverProperties) 26 (pkgs.writeTextDir "log4j.properties" cfg.log4jProperties) 27 ]; 28 }; 29 30in { 31 32 options.services.apache-kafka = { 33 enable = mkOption { 34 description = "Whether to enable Apache Kafka."; 35 default = false; 36 type = types.bool; 37 }; 38 39 brokerId = mkOption { 40 description = "Broker ID."; 41 default = 0; 42 type = types.int; 43 }; 44 45 port = mkOption { 46 description = "Port number the broker should listen on."; 47 default = 9092; 48 type = types.int; 49 }; 50 51 hostname = mkOption { 52 description = "Hostname the broker should bind to."; 53 default = "localhost"; 54 type = types.string; 55 }; 56 57 logDirs = mkOption { 58 description = "Log file directories"; 59 default = [ "/tmp/kafka-logs" ]; 60 type = types.listOf types.path; 61 }; 62 63 zookeeper = mkOption { 64 description = "Zookeeper connection string"; 65 default = "localhost:2181"; 66 type = types.string; 67 }; 68 69 extraProperties = mkOption { 70 description = "Extra properties for server.properties."; 71 type = types.nullOr types.lines; 72 default = null; 73 }; 74 75 serverProperties = mkOption { 76 description = '' 77 Complete server.properties content. Other server.properties config 78 options will be ignored if this option is used. 79 ''; 80 type = types.nullOr types.lines; 81 default = null; 82 }; 83 84 log4jProperties = mkOption { 85 description = "Kafka log4j property configuration."; 86 default = '' 87 log4j.rootLogger=INFO, stdout 88 89 log4j.appender.stdout=org.apache.log4j.ConsoleAppender 90 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 91 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n 92 ''; 93 type = types.lines; 94 }; 95 96 jvmOptions = mkOption { 97 description = "Extra command line options for the JVM running Kafka."; 98 default = [ 99 "-server" 100 "-Xmx1G" 101 "-Xms1G" 102 "-XX:+UseCompressedOops" 103 "-XX:+UseParNewGC" 104 "-XX:+UseConcMarkSweepGC" 105 "-XX:+CMSClassUnloadingEnabled" 106 "-XX:+CMSScavengeBeforeRemark" 107 "-XX:+DisableExplicitGC" 108 "-Djava.awt.headless=true" 109 "-Djava.net.preferIPv4Stack=true" 110 ]; 111 type = types.listOf types.str; 112 example = [ 113 "-Djava.net.preferIPv4Stack=true" 114 "-Dcom.sun.management.jmxremote" 115 "-Dcom.sun.management.jmxremote.local.only=true" 116 ]; 117 }; 118 119 package = mkOption { 120 description = "The kafka package to use"; 121 default = pkgs.apacheKafka; 122 defaultText = "pkgs.apacheKafka"; 123 type = types.package; 124 }; 125 126 }; 127 128 config = mkIf cfg.enable { 129 130 environment.systemPackages = [cfg.package]; 131 132 users.extraUsers = singleton { 133 name = "apache-kafka"; 134 uid = config.ids.uids.apache-kafka; 135 description = "Apache Kafka daemon user"; 136 home = head cfg.logDirs; 137 }; 138 139 systemd.services.apache-kafka = { 140 description = "Apache Kafka Daemon"; 141 wantedBy = [ "multi-user.target" ]; 142 after = [ "network-interfaces.target" ]; 143 serviceConfig = { 144 ExecStart = '' 145 ${pkgs.jre}/bin/java \ 146 -cp "${cfg.package}/libs/*:${configDir}" \ 147 ${toString cfg.jvmOptions} \ 148 kafka.Kafka \ 149 ${configDir}/server.properties 150 ''; 151 User = "apache-kafka"; 152 PermissionsStartOnly = true; 153 SuccessExitStatus = "0 143"; 154 }; 155 preStart = '' 156 mkdir -m 0700 -p ${concatStringsSep " " cfg.logDirs} 157 if [ "$(id -u)" = 0 ]; then 158 chown apache-kafka ${concatStringsSep " " cfg.logDirs}; 159 fi 160 ''; 161 }; 162 163 }; 164}