KAFKA-16016: Add docker wrapper in core and remove docker utility script (#15048)

Migrates functionality provided by utility to Kafka core. This wrapper will be used to generate property files and format storage when invoked from docker container.

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
Vedarth Sharma 2024-01-08 18:07:38 +05:30 committed by GitHub
parent da2aa68269
commit 116762fdce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 625 additions and 900 deletions

View File

@ -208,7 +208,6 @@ if (repo != null) {
'licenses/*',
'**/generated/**',
'clients/src/test/resources/serializedData/*',
'docker/resources/utility/go.sum',
'docker/test/fixtures/secrets/*',
'docker/examples/fixtures/secrets/*'
])

View File

@ -0,0 +1,243 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.docker
import kafka.tools.StorageTool
import kafka.utils.Exit
import net.sourceforge.argparse4j.ArgumentParsers
import net.sourceforge.argparse4j.impl.Arguments.store
import net.sourceforge.argparse4j.inf.Namespace
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths, StandardCopyOption, StandardOpenOption}
object KafkaDockerWrapper {
def main(args: Array[String]): Unit = {
val namespace = parseArguments(args)
val command = namespace.getString("command")
command match {
case "setup" =>
val defaultConfigsPath = Paths.get(namespace.getString("default_configs_dir"))
val mountedConfigsPath = Paths.get(namespace.getString("mounted_configs_dir"))
val finalConfigsPath = Paths.get(namespace.getString("final_configs_dir"))
try {
prepareConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath)
} catch {
case e: Throwable =>
val errMsg = s"error while preparing configs: ${e.getMessage}"
System.err.println(errMsg)
Exit.exit(1, Some(errMsg))
}
val formatCmd = formatStorageCmd(finalConfigsPath, envVars)
StorageTool.main(formatCmd)
case _ =>
throw new RuntimeException(s"Unknown operation $command. " +
s"Please provide a valid operation: 'setup'.")
}
}
import Constants._
private def parseArguments(args: Array[String]): Namespace = {
val parser = ArgumentParsers.
newArgumentParser("kafka-docker-wrapper", true, "-", "@").
description("The Kafka docker wrapper.")
val subparsers = parser.addSubparsers().dest("command")
val setupParser = subparsers.addParser("setup")
setupParser.addArgument("--default-configs-dir", "-D").
action(store()).
required(true).
help(
"""Directory which holds default properties. It should contain the three file:-
|server.properties, log4j.properties and tools-log4j.properties.
|""".stripMargin)
setupParser.addArgument("--mounted-configs-dir", "-M").
action(store()).
required(true).
help(
"""Directory which holds user mounted properties. It can contain none to all the three files:-
|server.properties, log4j.properties and tools-log4j.properties.""".stripMargin)
setupParser.addArgument("--final-configs-dir", "-F").
action(store()).
required(true).
help(
"""Directory which holds final properties. It holds the final properties that will be used to boot kafka.
|""".stripMargin)
parser.parseArgsOrFail(args)
}
private def formatStorageCmd(configsPath: Path, env: Map[String, String]): Array[String] = {
Array("format", "--cluster-id=" + env.get("CLUSTER_ID"), "-c", s"${configsPath.toString}/server.properties")
}
private def prepareConfigs(defaultConfigsPath: Path, mountedConfigsPath: Path, finalConfigsPath: Path): Unit = {
prepareServerConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
prepareLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
prepareToolsLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
}
private[docker] def prepareServerConfigs(defaultConfigsPath: Path,
mountedConfigsPath: Path,
finalConfigsPath: Path,
env: Map[String, String]): Unit = {
val propsToAdd = addNewlinePadding(getServerConfigsFromEnv(env).mkString(NewlineChar))
val defaultFilePath = defaultConfigsPath.resolve(s"$ServerPropsFilename")
val mountedFilePath = mountedConfigsPath.resolve(s"$ServerPropsFilename")
val finalFilePath = finalConfigsPath.resolve(s"$ServerPropsFilename")
if (Files.exists(mountedFilePath)) {
copyFile(mountedFilePath, finalFilePath)
addToFile(propsToAdd, finalFilePath, StandardOpenOption.APPEND)
} else {
addToFile(propsToAdd, finalFilePath, StandardOpenOption.TRUNCATE_EXISTING)
}
val source = scala.io.Source.fromFile(finalFilePath.toString)
val data = try source.mkString finally source.close()
if (data.trim.isEmpty) {
copyFile(defaultFilePath, finalFilePath)
}
}
private[docker] def prepareLog4jConfigs(defaultConfigsPath: Path,
mountedConfigsPath: Path,
finalConfigsPath: Path,
env: Map[String, String]): Unit = {
val propsToAdd = getLog4jConfigsFromEnv(env)
val defaultFilePath = defaultConfigsPath.resolve(s"$Log4jPropsFilename")
val mountedFilePath = mountedConfigsPath.resolve(s"$Log4jPropsFilename")
val finalFilePath = finalConfigsPath.resolve(s"$Log4jPropsFilename")
copyFile(defaultFilePath, finalFilePath)
copyFile(mountedFilePath, finalFilePath)
addToFile(propsToAdd, finalFilePath, StandardOpenOption.APPEND)
}
private[docker] def prepareToolsLog4jConfigs(defaultConfigsPath: Path,
mountedConfigsPath: Path,
finalConfigsPath: Path,
env: Map[String, String]): Unit = {
val propToAdd = getToolsLog4jConfigsFromEnv(env)
val defaultFilePath = defaultConfigsPath.resolve(s"$ToolsLog4jFilename")
val mountedFilePath = mountedConfigsPath.resolve(s"$ToolsLog4jFilename")
val finalFilePath = finalConfigsPath.resolve(s"$ToolsLog4jFilename")
copyFile(defaultFilePath, finalFilePath)
copyFile(mountedFilePath, finalFilePath)
addToFile(propToAdd, finalFilePath, StandardOpenOption.APPEND)
}
private[docker] def getServerConfigsFromEnv(env: Map[String, String]): List[String] = {
env.map {
case (key, value) =>
if (key.startsWith("KAFKA_") && !ExcludeServerPropsEnv.contains(key)) {
val final_key = key.replace("KAFKA_", "").toLowerCase()
.replace("_", ".")
.replace("...", "-")
.replace("..", "_")
final_key + "=" + value
} else {
""
}
}
.toList
.filterNot(_.trim.isEmpty)
}
private[docker] def getLog4jConfigsFromEnv(env: Map[String, String]): String = {
val kafkaLog4jRootLogLevelProp = env.get(KafkaLog4jRootLoglevelEnv)
.filter(_.nonEmpty)
.map(kafkaLog4jRootLogLevel => s"log4j.rootLogger=$kafkaLog4jRootLogLevel, stdout")
.getOrElse("")
val kafkaLog4jLoggersProp = env.get(KafkaLog4JLoggersEnv)
.filter(_.nonEmpty)
.map {
kafkaLog4JLoggersString =>
kafkaLog4JLoggersString.split(",")
.map(kafkaLog4JLogger => s"log4j.logger.$kafkaLog4JLogger")
.mkString(NewlineChar)
}.getOrElse("")
addNewlinePadding(kafkaLog4jRootLogLevelProp) + addNewlinePadding(kafkaLog4jLoggersProp)
}
private[docker] def getToolsLog4jConfigsFromEnv(env: Map[String, String]): String = {
env.get(KafkaToolsLog4jLoglevelEnv)
.filter(_.nonEmpty)
.map(kafkaToolsLog4jLogLevel => addNewlinePadding(s"log4j.rootLogger=$kafkaToolsLog4jLogLevel, stderr"))
.getOrElse("")
}
private def addToFile(properties: String, filepath: Path, mode: StandardOpenOption): Unit = {
val path = filepath
if (!Files.exists(path)) {
Files.createFile(path)
}
Files.write(filepath, properties.getBytes(StandardCharsets.UTF_8), mode)
}
private def copyFile(source: Path, destination: Path) = {
if (Files.exists(source)) {
Files.copy(source, destination, StandardCopyOption.REPLACE_EXISTING)
}
}
private def addNewlinePadding(str: String): String = {
if (str.nonEmpty) {
NewlineChar + str
} else {
""
}
}
private def envVars: Map[String, String] = sys.env
}
private object Constants {
val ServerPropsFilename = "server.properties"
val Log4jPropsFilename = "log4j.properties"
val ToolsLog4jFilename = "tools-log4j.properties"
val KafkaLog4JLoggersEnv = "KAFKA_LOG4J_LOGGERS"
val KafkaLog4jRootLoglevelEnv = "KAFKA_LOG4J_ROOT_LOGLEVEL"
val KafkaToolsLog4jLoglevelEnv = "KAFKA_TOOLS_LOG4J_LOGLEVEL"
val ExcludeServerPropsEnv: Set[String] = Set(
"KAFKA_VERSION",
"KAFKA_HEAP_OPT",
"KAFKA_LOG4J_OPTS",
"KAFKA_OPTS",
"KAFKA_JMX_OPTS",
"KAFKA_JVM_PERFORMANCE_OPTS",
"KAFKA_GC_LOG_OPTS",
"KAFKA_LOG4J_ROOT_LOGLEVEL",
"KAFKA_LOG4J_LOGGERS",
"KAFKA_TOOLS_LOG4J_LOGLEVEL",
"KAFKA_JMX_HOSTNAME")
val NewlineChar = "\n"
}

View File

@ -0,0 +1,325 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.docker
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path}
class KafkaDockerWrapperTest {
@Test
def testGetServerConfigsFromEnv(): Unit = {
val envVars = Map("KAFKA_TOOLS_LOG4J_LOGLEVEL" -> "TRACE",
"KAFKA_VALID_PROPERTY" -> "Value",
"SOME_VARIABLE" -> "Some Value",
"KAFKA_VALID___PROPERTY__ALL_CASES" -> "All Cases Value")
val expected = List("valid.property=Value", "valid-property_all.cases=All Cases Value")
val actual = KafkaDockerWrapper.getServerConfigsFromEnv(envVars)
assertEquals(expected, actual)
}
@Test
def testPrepareServerConfigs(): Unit = {
val (defaultConfigsPath, mountedConfigsPath, finalConfigsPath) = createDirs()
val envVars = Map("KAFKA_ENV_CONFIG" -> "env value")
Files.write(defaultConfigsPath.resolve("server.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(mountedConfigsPath.resolve("server.properties"), "mounted.config=mounted value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(finalConfigsPath.resolve("server.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
KafkaDockerWrapper.prepareServerConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/server.properties")
val actual = try source.mkString finally source.close()
val expected = "mounted.config=mounted value" + "\n" + "env.config=env value"
assertEquals(expected, actual)
}
@Test
def testPrepareServerConfigsWithoutMountedFile(): Unit = {
val (defaultConfigsPath, mountedConfigsPath, finalConfigsPath) = createDirs()
val envVars = Map("KAFKA_ENV_CONFIG" -> "env value")
Files.write(defaultConfigsPath.resolve("server.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(finalConfigsPath.resolve("server.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
KafkaDockerWrapper.prepareServerConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/server.properties")
val actual = try source.mkString finally source.close()
val expected = "\n" + "env.config=env value"
assertEquals(expected, actual)
}
@Test
def testPrepareServerConfigsWithoutEnvVariables(): Unit = {
val (defaultConfigsPath, mountedConfigsPath, finalConfigsPath) = createDirs()
val envVars = Map.empty[String, String]
Files.write(defaultConfigsPath.resolve("server.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(mountedConfigsPath.resolve("server.properties"), "mounted.config=mounted value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(finalConfigsPath.resolve("server.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
KafkaDockerWrapper.prepareServerConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/server.properties")
val actual = try source.mkString finally source.close()
val expected = "mounted.config=mounted value"
assertEquals(expected, actual)
}
@Test
def testPrepareServerConfigsWithoutUserInput(): Unit = {
val (defaultConfigsPath, mountedConfigsPath, finalConfigsPath) = createDirs()
val envVars = Map.empty[String, String]
Files.write(defaultConfigsPath.resolve("server.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(finalConfigsPath.resolve("server.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
KafkaDockerWrapper.prepareServerConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/server.properties")
val actual = try source.mkString finally source.close()
val expected = "default.config=default value"
assertEquals(expected, actual)
}
@Test
def testPrepareServerConfigsWithEmptyMountedFile(): Unit = {
val (defaultConfigsPath, mountedConfigsPath, finalConfigsPath) = createDirs()
val envVars = Map.empty[String, String]
Files.write(defaultConfigsPath.resolve("server.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(mountedConfigsPath.resolve("server.properties"), " \n \n ".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(finalConfigsPath.resolve("server.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
KafkaDockerWrapper.prepareServerConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/server.properties")
val actual = try source.mkString finally source.close()
val expected = "default.config=default value"
assertEquals(expected, actual)
}
@Test
def testGetLog4jConfigsFromEnv(): Unit = {
val envVars = Map(
"KAFKA_LOG4J_LOGGERS" -> "kafka=INFO,kafka.network.RequestChannel$=WARN,kafka.producer.async.DefaultEventHandler=DEBUG,",
"KAFKA_LOG4J_ROOT_LOGLEVEL" -> "ERROR",
"SOME_VARIABLE" -> "Some Value"
)
val expected = "\n" + "log4j.rootLogger=ERROR, stdout" + "\n" +
"log4j.logger.kafka=INFO" + "\n" +
"log4j.logger.kafka.network.RequestChannel$=WARN" + "\n" +
"log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG"
val actual = KafkaDockerWrapper.getLog4jConfigsFromEnv(envVars)
assertEquals(expected, actual)
}
@Test
def testGetLog4jConfigsFromEnvInvalidEnvVariable(): Unit = {
val envVars = Map("SOME_VARIABLE" -> "Some Value")
val expected = ""
val actual = KafkaDockerWrapper.getLog4jConfigsFromEnv(envVars)
assertEquals(expected, actual)
}
@Test
def testGetLog4jConfigsFromEnvWithEmptyEnvVariable(): Unit = {
val envVars = Map("SOME_VARIABLE" -> "Some Value", "KAFKA_LOG4J_LOGGERS" -> "", "KAFKA_LOG4J_ROOT_LOGLEVEL" -> "")
val expected = ""
val actual = KafkaDockerWrapper.getLog4jConfigsFromEnv(envVars)
assertEquals(expected, actual)
}
@Test
def testPrepareLog4jConfigs(): Unit = {
val (defaultConfigsPath, mountedConfigsPath, finalConfigsPath) = createDirs()
val envVars = Map(
"KAFKA_LOG4J_LOGGERS" -> "kafka=INFO,kafka.network.RequestChannel$=WARN,kafka.producer.async.DefaultEventHandler=DEBUG,",
"KAFKA_LOG4J_ROOT_LOGLEVEL" -> "ERROR",
"SOME_VARIABLE" -> "Some Value"
)
Files.write(defaultConfigsPath.resolve("log4j.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(mountedConfigsPath.resolve("log4j.properties"), "mounted.config=mounted value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(finalConfigsPath.resolve("log4j.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
KafkaDockerWrapper.prepareLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/log4j.properties")
val actual = try source.mkString finally source.close()
val expected = "mounted.config=mounted value" + "\n" + "log4j.rootLogger=ERROR, stdout" + "\n" +
"log4j.logger.kafka=INFO" + "\n" +
"log4j.logger.kafka.network.RequestChannel$=WARN" + "\n" +
"log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG"
assertEquals(expected, actual)
}
@Test
def testPrepareLog4jConfigsWithoutMountedFile(): Unit = {
val (defaultConfigsPath, mountedConfigsPath, finalConfigsPath) = createDirs()
val envVars = Map(
"KAFKA_LOG4J_LOGGERS" -> "kafka=INFO,kafka.network.RequestChannel$=WARN,kafka.producer.async.DefaultEventHandler=DEBUG,",
"KAFKA_LOG4J_ROOT_LOGLEVEL" -> "ERROR",
"SOME_VARIABLE" -> "Some Value"
)
Files.write(defaultConfigsPath.resolve("log4j.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(finalConfigsPath.resolve("log4j.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
KafkaDockerWrapper.prepareLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/log4j.properties")
val actual = try source.mkString finally source.close()
val expected = "default.config=default value" + "\n" + "log4j.rootLogger=ERROR, stdout" + "\n" +
"log4j.logger.kafka=INFO" + "\n" +
"log4j.logger.kafka.network.RequestChannel$=WARN" + "\n" +
"log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG"
assertEquals(expected, actual)
}
@Test
def testPrepareLog4jConfigsWithoutEnvVariables(): Unit = {
val (defaultConfigsPath, mountedConfigsPath, finalConfigsPath) = createDirs()
val envVars = Map.empty[String, String]
Files.write(defaultConfigsPath.resolve("log4j.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(mountedConfigsPath.resolve("log4j.properties"), "mounted.config=mounted value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(finalConfigsPath.resolve("log4j.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
KafkaDockerWrapper.prepareLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/log4j.properties")
val actual = try source.mkString finally source.close()
val expected = "mounted.config=mounted value"
assertEquals(expected, actual)
}
@Test
def testGetToolsLog4jConfigsFromEnv(): Unit = {
val envVars = Map("KAFKA_TOOLS_LOG4J_LOGLEVEL" -> "TRACE", "SOME_VARIABLE" -> "Some Value")
val expected = "\n" + "log4j.rootLogger=TRACE, stderr"
val actual = KafkaDockerWrapper.getToolsLog4jConfigsFromEnv(envVars)
assertEquals(expected, actual)
}
@Test
def testGetToolsLog4jConfigsFromEnvInvalidEnvVariable(): Unit = {
val envVars = Map("SOME_VARIABLE" -> "Some Value")
val expected = ""
val actual = KafkaDockerWrapper.getToolsLog4jConfigsFromEnv(envVars)
assertEquals(expected, actual)
}
@Test
def testGetToolsLog4jConfigsFromEnvWithEmptyEnvVariable(): Unit = {
val envVars = Map("SOME_VARIABLE" -> "Some Value", "KAFKA_TOOLS_LOG4J_LOGLEVEL" -> "")
val expected = ""
val actual = KafkaDockerWrapper.getToolsLog4jConfigsFromEnv(envVars)
assertEquals(expected, actual)
}
@Test
def testPrepareToolsLog4jConfigs(): Unit = {
val (defaultConfigsPath, mountedConfigsPath, finalConfigsPath) = createDirs()
val envVars = Map("KAFKA_TOOLS_LOG4J_LOGLEVEL" -> "TRACE")
Files.write(defaultConfigsPath.resolve("tools-log4j.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(mountedConfigsPath.resolve("tools-log4j.properties"), "mounted.config=mounted value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(finalConfigsPath.resolve("tools-log4j.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
KafkaDockerWrapper.prepareToolsLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/tools-log4j.properties")
val actual = try source.mkString finally source.close()
val expected = "mounted.config=mounted value" + "\n" + "log4j.rootLogger=TRACE, stderr"
assertEquals(expected, actual)
}
@Test
def testPrepareToolsLog4jConfigsWithoutMountedFile(): Unit = {
val (defaultConfigsPath, mountedConfigsPath, finalConfigsPath) = createDirs()
val envVars = Map("KAFKA_TOOLS_LOG4J_LOGLEVEL" -> "TRACE")
Files.write(defaultConfigsPath.resolve("tools-log4j.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(finalConfigsPath.resolve("tools-log4j.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
KafkaDockerWrapper.prepareToolsLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/tools-log4j.properties")
val actual = try source.mkString finally source.close()
val expected = "default.config=default value" + "\n" + "log4j.rootLogger=TRACE, stderr"
assertEquals(expected, actual)
}
@Test
def testPrepareToolsLog4jConfigsWithoutEnvVariable(): Unit = {
val (defaultConfigsPath, mountedConfigsPath, finalConfigsPath) = createDirs()
val envVars = Map.empty[String, String]
Files.write(defaultConfigsPath.resolve("tools-log4j.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(mountedConfigsPath.resolve("tools-log4j.properties"), "mounted.config=mounted value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(finalConfigsPath.resolve("tools-log4j.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
KafkaDockerWrapper.prepareToolsLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/tools-log4j.properties")
val actual = try source.mkString finally source.close()
val expected = "mounted.config=mounted value"
assertEquals(expected, actual)
}
private def createDirs(): (Path, Path, Path) = {
val defaultConfigsPath = Files.createTempDirectory("tmp")
val mountedConfigsPath = Files.createTempDirectory("tmp")
val finalConfigsPath = Files.createTempDirectory("tmp")
defaultConfigsPath.toFile.deleteOnExit()
mountedConfigsPath.toFile.deleteOnExit()
finalConfigsPath.toFile.deleteOnExit()
(defaultConfigsPath, mountedConfigsPath, finalConfigsPath)
}
}

View File

@ -16,17 +16,6 @@
# limitations under the License.
###############################################################################
FROM golang:latest AS build-utility
WORKDIR /build
RUN useradd --no-log-init --create-home --shell /bin/bash appuser
COPY --chown=appuser:appuser resources/utility/ ./
# Generate utility executable for dealing with env variables
RUN go build -ldflags="-w -s" ./utility.go
USER appuser
RUN go test ./...
FROM eclipse-temurin:21-jre-alpine AS build-jsa
USER root
@ -89,13 +78,13 @@ RUN set -eux ; \
chmod -R ug+w /etc/kafka /var/lib/kafka /etc/kafka/secrets; \
cp /opt/kafka/config/log4j.properties /etc/kafka/docker/log4j.properties; \
cp /opt/kafka/config/tools-log4j.properties /etc/kafka/docker/tools-log4j.properties; \
cp /opt/kafka/config/kraft/server.properties /etc/kafka/docker/server.properties; \
rm kafka.tgz kafka.tgz.asc KEYS; \
apk del wget gpg gpg-agent; \
apk cache clean;
COPY --from=build-jsa kafka.jsa /opt/kafka/kafka.jsa
COPY --from=build-jsa storage.jsa /opt/kafka/storage.jsa
COPY --chown=appuser:appuser --from=build-utility /build/utility /usr/bin
COPY --chown=appuser:appuser resources/common-scripts /etc/kafka/docker
COPY --chown=appuser:appuser launch /etc/kafka/docker/launch

View File

@ -14,10 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# Override this section from the script to include the com.sun.management.jmxremote.rmi.port property.
if [ -z "$KAFKA_JMX_OPTS" ]; then
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "
if [ -z "${KAFKA_JMX_OPTS-}" ]; then
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false "
fi
# The JMX client needs to be able to connect to java.rmi.server.hostname.
@ -28,27 +29,34 @@ fi
# the default is to pick the first IP (or network).
export KAFKA_JMX_HOSTNAME=${KAFKA_JMX_HOSTNAME:-$(hostname -i | cut -d" " -f1)}
if [ "$KAFKA_JMX_PORT" ]; then
if [ "${KAFKA_JMX_PORT-}" ]; then
# This ensures that the "if" section for JMX_PORT in kafka launch script does not trigger.
export JMX_PORT=$KAFKA_JMX_PORT
export KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Djava.rmi.server.hostname=$KAFKA_JMX_HOSTNAME -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT -Dcom.sun.management.jmxremote.port=$JMX_PORT"
export KAFKA_JMX_OPTS="${KAFKA_JMX_OPTS-} -Djava.rmi.server.hostname=$KAFKA_JMX_HOSTNAME \
-Dcom.sun.management.jmxremote.local.only=false \
-Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT \
-Dcom.sun.management.jmxremote.port=$JMX_PORT"
fi
# Make a temp env variable to store user provided performance otps
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
if [ -z "${KAFKA_JVM_PERFORMANCE_OPTS-}" ]; then
export TEMP_KAFKA_JVM_PERFORMANCE_OPTS=""
else
export TEMP_KAFKA_JVM_PERFORMANCE_OPTS="$KAFKA_JVM_PERFORMANCE_OPTS"
fi
# We will first use CDS for storage to format storage
export KAFKA_JVM_PERFORMANCE_OPTS="$KAFKA_JVM_PERFORMANCE_OPTS -XX:SharedArchiveFile=/opt/kafka/storage.jsa"
export KAFKA_JVM_PERFORMANCE_OPTS="${KAFKA_JVM_PERFORMANCE_OPTS-} -XX:SharedArchiveFile=/opt/kafka/storage.jsa"
echo "===> Using provided cluster id $CLUSTER_ID ..."
# A bit of a hack to not error out if the storage is already formatted. Need storage-tool to support this
result=$(/opt/kafka/bin/kafka-storage.sh format --cluster-id=$CLUSTER_ID -c /opt/kafka/config/server.properties 2>&1) || \
echo $result | grep -i "already formatted" || \
{ echo $result && (exit 1) }
# Invoke the docker wrapper to setup property files and format storage
result=$(/opt/kafka/bin/kafka-run-class.sh kafka.docker.KafkaDockerWrapper setup \
--default-configs-dir /etc/kafka/docker \
--mounted-configs-dir /mnt/shared/config \
--final-configs-dir /opt/kafka/config 2>&1) || \
echo $result | grep -i "already formatted" || \
{ echo $result && (exit 1) }
# Using temp env variable to get rid of storage CDS command
export KAFKA_JVM_PERFORMANCE_OPTS="$TEMP_KAFKA_JVM_PERFORMANCE_OPTS"

View File

@ -14,11 +14,32 @@
# See the License for the specific language governing permissions and
# limitations under the License.
ensure() {
if [[ -z "${!1}" ]]; then
echo "$1 environment variable not set"
exit 1
fi
}
path() {
if [[ $2 == "writable" ]]; then
if [[ ! -w "$1" ]]; then
echo "$1 file not writable"
exit 1
fi
elif [[ $2 == "existence" ]]; then
if [[ ! -e "$1" ]]; then
echo "$1 file does not exist"
exit 1
fi
fi
}
# unset KAFKA_ADVERTISED_LISTENERS from ENV in KRaft mode when running as controller only
if [[ -n "${KAFKA_PROCESS_ROLES-}" ]]
then
echo "Running in KRaft mode..."
utility ensure CLUSTER_ID
ensure CLUSTER_ID
if [[ $KAFKA_PROCESS_ROLES == "controller" ]]
then
if [[ -n "${KAFKA_ADVERTISED_LISTENERS-}" ]]
@ -35,44 +56,44 @@ fi
# By default, LISTENERS is derived from ADVERTISED_LISTENERS by replacing
# hosts with 0.0.0.0. This is good default as it ensures that the broker
# process listens on all ports.
if [[ -z "${KAFKA_LISTENERS-}" ]] && ( [[ -z "${KAFKA_PROCESS_ROLES-}" ]] || [[ $KAFKA_PROCESS_ROLES != "controller" ]] ) && [[ -n "${KAFKA_ADVERTISED_LISTENERS}" ]]
if [[ -z "${KAFKA_LISTENERS-}" ]] && ( [[ -z "${KAFKA_PROCESS_ROLES-}" ]] || [[ $KAFKA_PROCESS_ROLES != "controller" ]] ) && [[ -n "${KAFKA_ADVERTISED_LISTENERS-}" ]]
then
export KAFKA_LISTENERS
KAFKA_LISTENERS=$(echo "$KAFKA_ADVERTISED_LISTENERS" | sed -e 's|://[^:]*:|://0.0.0.0:|g')
fi
utility path /opt/kafka/config/ writable
path /opt/kafka/config/ writable
# Set if ADVERTISED_LISTENERS has SSL:// or SASL_SSL:// endpoints.
if [[ -n "${KAFKA_ADVERTISED_LISTENERS-}" ]] && [[ $KAFKA_ADVERTISED_LISTENERS == *"SSL://"* ]]
then
echo "SSL is enabled."
utility ensure KAFKA_SSL_KEYSTORE_FILENAME
ensure KAFKA_SSL_KEYSTORE_FILENAME
export KAFKA_SSL_KEYSTORE_LOCATION="/etc/kafka/secrets/$KAFKA_SSL_KEYSTORE_FILENAME"
utility path "$KAFKA_SSL_KEYSTORE_LOCATION" existence
path "$KAFKA_SSL_KEYSTORE_LOCATION" existence
utility ensure KAFKA_SSL_KEY_CREDENTIALS
ensure KAFKA_SSL_KEY_CREDENTIALS
KAFKA_SSL_KEY_CREDENTIALS_LOCATION="/etc/kafka/secrets/$KAFKA_SSL_KEY_CREDENTIALS"
utility path "$KAFKA_SSL_KEY_CREDENTIALS_LOCATION" existence
path "$KAFKA_SSL_KEY_CREDENTIALS_LOCATION" existence
export KAFKA_SSL_KEY_PASSWORD
KAFKA_SSL_KEY_PASSWORD=$(cat "$KAFKA_SSL_KEY_CREDENTIALS_LOCATION")
utility ensure KAFKA_SSL_KEYSTORE_CREDENTIALS
ensure KAFKA_SSL_KEYSTORE_CREDENTIALS
KAFKA_SSL_KEYSTORE_CREDENTIALS_LOCATION="/etc/kafka/secrets/$KAFKA_SSL_KEYSTORE_CREDENTIALS"
utility path "$KAFKA_SSL_KEYSTORE_CREDENTIALS_LOCATION" existence
path "$KAFKA_SSL_KEYSTORE_CREDENTIALS_LOCATION" existence
export KAFKA_SSL_KEYSTORE_PASSWORD
KAFKA_SSL_KEYSTORE_PASSWORD=$(cat "$KAFKA_SSL_KEYSTORE_CREDENTIALS_LOCATION")
if [[ -n "${KAFKA_SSL_CLIENT_AUTH-}" ]] && ( [[ $KAFKA_SSL_CLIENT_AUTH == *"required"* ]] || [[ $KAFKA_SSL_CLIENT_AUTH == *"requested"* ]] )
then
utility ensure KAFKA_SSL_TRUSTSTORE_FILENAME
ensure KAFKA_SSL_TRUSTSTORE_FILENAME
export KAFKA_SSL_TRUSTSTORE_LOCATION="/etc/kafka/secrets/$KAFKA_SSL_TRUSTSTORE_FILENAME"
utility path "$KAFKA_SSL_TRUSTSTORE_LOCATION" existence
path "$KAFKA_SSL_TRUSTSTORE_LOCATION" existence
utility ensure KAFKA_SSL_TRUSTSTORE_CREDENTIALS
ensure KAFKA_SSL_TRUSTSTORE_CREDENTIALS
KAFKA_SSL_TRUSTSTORE_CREDENTIALS_LOCATION="/etc/kafka/secrets/$KAFKA_SSL_TRUSTSTORE_CREDENTIALS"
utility path "$KAFKA_SSL_TRUSTSTORE_CREDENTIALS_LOCATION" existence
path "$KAFKA_SSL_TRUSTSTORE_CREDENTIALS_LOCATION" existence
export KAFKA_SSL_TRUSTSTORE_PASSWORD
KAFKA_SSL_TRUSTSTORE_PASSWORD=$(cat "$KAFKA_SSL_TRUSTSTORE_CREDENTIALS_LOCATION")
fi
@ -83,7 +104,7 @@ if [[ -n "${KAFKA_ADVERTISED_LISTENERS-}" ]] && [[ $KAFKA_ADVERTISED_LISTENERS =
then
echo "SASL" is enabled.
utility ensure KAFKA_OPTS
ensure KAFKA_OPTS
if [[ ! $KAFKA_OPTS == *"java.security.auth.login.config"* ]]
then
@ -98,33 +119,3 @@ then
echo "KAFKA_OPTS should contain 'com.sun.management.jmxremote.rmi.port' property. It is required for accessing the JMX metrics externally."
fi
fi
# Copy the bundled log4j.properties and tools-log4j.properties. This is done to handle property modification during container restart
cp /etc/kafka/docker/log4j.properties /opt/kafka/config/log4j.properties
cp /etc/kafka/docker/tools-log4j.properties /opt/kafka/config/tools-log4j.properties
# Copy all the user provided property files through file input
cp -R /mnt/shared/config/. /opt/kafka/config/
# Check the presence of user provided kafka configs via file input
if [ -e "/mnt/shared/config/server.properties" ]
then
echo "User provided kafka configs found via file input. Any properties provided via env variables will be appended to this."
# Append configs provided via env variables.
echo -e "\n$(utility render-properties /etc/kafka/docker/kafka-propertiesSpec.json)" >> /opt/kafka/config/server.properties
else
# Create the kafka config property file using user provided environment variables.
echo -e "\n$(utility render-properties /etc/kafka/docker/kafka-propertiesSpec.json)" > /opt/kafka/config/server.properties
if grep -q '[^[:space:]]' "/opt/kafka/config/server.properties"; then
echo "User provided kafka configs found via environment variables."
fi
fi
# If no user provided kafka configs found, use default configs
if ! grep -q '[^[:space:]]' "/opt/kafka/config/server.properties"; then
echo "User provided kafka configs not found (neither via file input nor via environment variables). Falling back to default configs."
cp /opt/kafka/config/kraft/server.properties /opt/kafka/config/server.properties
fi
echo -e "\n$(utility render-template /etc/kafka/docker/kafka-log4j.properties.template)" >> /opt/kafka/config/log4j.properties
echo -e "\n$(utility render-template /etc/kafka/docker/kafka-tools-log4j.properties.template)" >> /opt/kafka/config/tools-log4j.properties

View File

@ -1,22 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
{{ with $value := getEnv "KAFKA_LOG4J_ROOT_LOGLEVEL" "INFO" }}{{ if ne $value "INFO" }}
log4j.rootLogger={{ $value }}, stdout
{{ end }}{{ end }}
{{ $loggers := getEnv "KAFKA_LOG4J_LOGGERS" "" -}}
{{ range $k, $v := splitToMapDefaults "," "" $loggers}}
log4j.logger.{{ $k }}={{ $v -}}
{{ end }}

View File

@ -1,22 +0,0 @@
{
"prefixes": {
"KAFKA": false
},
"renamed": {
},
"excludes": [
"KAFKA_VERSION",
"KAFKA_HEAP_OPT",
"KAFKA_LOG4J_OPTS",
"KAFKA_OPTS",
"KAFKA_JMX_OPTS",
"KAFKA_JVM_PERFORMANCE_OPTS",
"KAFKA_GC_LOG_OPTS",
"KAFKA_LOG4J_ROOT_LOGLEVEL",
"KAFKA_LOG4J_LOGGERS",
"KAFKA_TOOLS_LOG4J_LOGLEVEL"
],
"defaults": {
},
"excludeWithPrefix": ""
}

View File

@ -1,17 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
{{ with $value := getEnv "KAFKA_TOOLS_LOG4J_LOGLEVEL" "WARN"}} {{if ne $value "WARN"}}
log4j.rootLogger={{ $value }}, stderr
{{ end }}{{ end }}

View File

@ -32,7 +32,7 @@ echo "===> Setting default values of environment variables if not already set."
. /etc/kafka/docker/configureDefaults
echo "===> Configuring ..."
/etc/kafka/docker/configure
. /etc/kafka/docker/configure
echo "===> Launching ... "
exec /etc/kafka/docker/launch
. /etc/kafka/docker/launch

View File

@ -1,29 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
module ub
go 1.19
require (
github.com/spf13/cobra v1.7.0
golang.org/x/exp v0.0.0-20230419192730-864b3d6c5c2c
golang.org/x/sys v0.7.0
)
require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
)

View File

@ -1,14 +0,0 @@
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I=
github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
golang.org/x/exp v0.0.0-20230419192730-864b3d6c5c2c h1:HDdYQYKOkvJT/Plb5HwJJywTVyUnIctjQm6XSnZ/0CY=
golang.org/x/exp v0.0.0-20230419192730-864b3d6c5c2c/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -1,14 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -1,14 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -1,20 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootLogger={{ getEnv "KAFKA_LOG4J_ROOT_LOGLEVEL" "INFO" }}, stdout
{{$loggers := getEnv "KAFKA_LOG4J_LOGGERS" "" -}}
{{ range $k, $v := splitToMapDefaults "," "" $loggers}}
log4j.logger.{{ $k }}={{ $v -}}
{{ end }}

View File

@ -1,323 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/signal"
pt "path"
"regexp"
"sort"
"strings"
"text/template"
"github.com/spf13/cobra"
"golang.org/x/exp/slices"
"golang.org/x/sys/unix"
)
type ConfigSpec struct {
Prefixes map[string]bool `json:"prefixes"`
Excludes []string `json:"excludes"`
Renamed map[string]string `json:"renamed"`
Defaults map[string]string `json:"defaults"`
ExcludeWithPrefix string `json:"excludeWithPrefix"`
}
var (
re = regexp.MustCompile("[^_]_[^_]")
ensureCmd = &cobra.Command{
Use: "ensure <environment-variable>",
Short: "checks if environment variable is set or not",
Args: cobra.ExactArgs(1),
RunE: runEnsureCmd,
}
pathCmd = &cobra.Command{
Use: "path <path-to-file> <operation>",
Short: "checks if an operation is permitted on a file",
Args: cobra.ExactArgs(2),
RunE: runPathCmd,
}
renderTemplateCmd = &cobra.Command{
Use: "render-template <path-to-template>",
Short: "renders template to stdout",
Args: cobra.ExactArgs(1),
RunE: runRenderTemplateCmd,
}
renderPropertiesCmd = &cobra.Command{
Use: "render-properties <path-to-config-spec>",
Short: "creates and renders properties to stdout using the json config spec.",
Args: cobra.ExactArgs(1),
RunE: runRenderPropertiesCmd,
}
)
func ensure(envVar string) bool {
_, found := os.LookupEnv(envVar)
return found
}
func path(filePath string, operation string) (bool, error) {
switch operation {
case "readable":
err := unix.Access(filePath, unix.R_OK)
if err != nil {
return false, err
}
return true, nil
case "executable":
info, err := os.Stat(filePath)
if err != nil {
err = fmt.Errorf("error checking executable status of file %q: %w", filePath, err)
return false, err
}
return info.Mode()&0111 != 0, nil //check whether file is executable by anyone, use 0100 to check for execution rights for owner
case "existence":
if _, err := os.Stat(filePath); err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
return true, nil
case "writable":
err := unix.Access(filePath, unix.W_OK)
if err != nil {
return false, err
}
return true, nil
default:
err := fmt.Errorf("unknown operation %q", operation)
return false, err
}
}
func renderTemplate(templateFilePath string) error {
funcs := template.FuncMap{
"getEnv": getEnvOrDefault,
"splitToMapDefaults": splitToMapDefaults,
}
t, err := template.New(pt.Base(templateFilePath)).Funcs(funcs).ParseFiles(templateFilePath)
if err != nil {
err = fmt.Errorf("error %q: %w", templateFilePath, err)
return err
}
return buildTemplate(os.Stdout, *t)
}
func buildTemplate(writer io.Writer, template template.Template) error {
err := template.Execute(writer, GetEnvironment())
if err != nil {
err = fmt.Errorf("error building template file : %w", err)
return err
}
return nil
}
func renderConfig(writer io.Writer, configSpec ConfigSpec) error {
return writeConfig(writer, buildProperties(configSpec, GetEnvironment()))
}
// ConvertKey Converts an environment variable name to a property-name according to the following rules:
// - a single underscore (_) is replaced with a .
// - a double underscore (__) is replaced with a single underscore
// - a triple underscore (___) is replaced with a dash
// Moreover, the whole string is converted to lower-case.
// The behavior of sequences of four or more underscores is undefined.
func ConvertKey(key string) string {
singleReplaced := re.ReplaceAllStringFunc(key, replaceUnderscores)
singleTripleReplaced := strings.ReplaceAll(singleReplaced, "___", "-")
return strings.ToLower(strings.ReplaceAll(singleTripleReplaced, "__", "_"))
}
// replaceUnderscores replaces every underscore '_' by a dot '.'
func replaceUnderscores(s string) string {
return strings.ReplaceAll(s, "_", ".")
}
// ListToMap splits each and entry of the kvList argument at '=' into a key/value pair and returns a map of all the k/v pair thus obtained.
// this method will only consider values in the list formatted as key=value
func ListToMap(kvList []string) map[string]string {
m := make(map[string]string, len(kvList))
for _, l := range kvList {
parts := strings.Split(l, "=")
if len(parts) == 2 {
m[parts[0]] = parts[1]
}
}
return m
}
func splitToMapDefaults(separator string, defaultValues string, value string) map[string]string {
values := KvStringToMap(defaultValues, separator)
for k, v := range KvStringToMap(value, separator) {
values[k] = v
}
return values
}
func KvStringToMap(kvString string, sep string) map[string]string {
return ListToMap(strings.Split(kvString, sep))
}
// GetEnvironment returns the current environment as a map.
func GetEnvironment() map[string]string {
return ListToMap(os.Environ())
}
// buildProperties creates a map suitable to be output as Java properties from a ConfigSpec and a map representing an environment.
func buildProperties(spec ConfigSpec, environment map[string]string) map[string]string {
config := make(map[string]string)
for key, value := range spec.Defaults {
config[key] = value
}
for envKey, envValue := range environment {
if newKey, found := spec.Renamed[envKey]; found {
config[newKey] = envValue
} else {
if !slices.Contains(spec.Excludes, envKey) && !(len(spec.ExcludeWithPrefix) > 0 && strings.HasPrefix(envKey, spec.ExcludeWithPrefix)) {
for prefix, keep := range spec.Prefixes {
if strings.HasPrefix(envKey, prefix) {
var effectiveKey string
if keep {
effectiveKey = envKey
} else {
effectiveKey = envKey[len(prefix)+1:]
}
config[ConvertKey(effectiveKey)] = envValue
}
}
}
}
}
return config
}
func writeConfig(writer io.Writer, config map[string]string) error {
// Go randomizes iterations over map by design. We sort properties by name to ease debugging:
sortedNames := make([]string, 0, len(config))
for name := range config {
sortedNames = append(sortedNames, name)
}
sort.Strings(sortedNames)
for _, n := range sortedNames {
_, err := fmt.Fprintf(writer, "%s=%s\n", n, config[n])
if err != nil {
err = fmt.Errorf("error printing configs: %w", err)
return err
}
}
return nil
}
func loadConfigSpec(path string) (ConfigSpec, error) {
var spec ConfigSpec
bytes, err := os.ReadFile(path)
if err != nil {
err = fmt.Errorf("error reading from json file %q : %w", path, err)
return spec, err
}
errParse := json.Unmarshal(bytes, &spec)
if errParse != nil {
err = fmt.Errorf("error parsing json file %q : %w", path, errParse)
return spec, err
}
return spec, nil
}
func getEnvOrDefault(envVar string, defaultValue string) string {
val := os.Getenv(envVar)
if len(val) == 0 {
return defaultValue
}
return val
}
func runEnsureCmd(_ *cobra.Command, args []string) error {
success := ensure(args[0])
if !success {
err := fmt.Errorf("environment variable %q is not set", args[0])
return err
}
return nil
}
func runPathCmd(_ *cobra.Command, args []string) error {
success, err := path(args[0], args[1])
if err != nil {
err = fmt.Errorf("error in checking operation %q on file %q: %w", args[1], args[0], err)
return err
}
if !success {
err = fmt.Errorf("operation %q on file %q is unsuccessful", args[1], args[0])
return err
}
return nil
}
func runRenderTemplateCmd(_ *cobra.Command, args []string) error {
err := renderTemplate(args[0])
if err != nil {
err = fmt.Errorf("error in rendering template %q: %w", args[0], err)
return err
}
return nil
}
func runRenderPropertiesCmd(_ *cobra.Command, args []string) error {
configSpec, err := loadConfigSpec(args[0])
if err != nil {
err = fmt.Errorf("error in loading config from file %q: %w", args[0], err)
return err
}
err = renderConfig(os.Stdout, configSpec)
if err != nil {
err = fmt.Errorf("error in building properties from file %q: %w", args[0], err)
return err
}
return nil
}
func main() {
rootCmd := &cobra.Command{
Use: "utility",
Short: "utility commands for kafka docker images",
Run: func(cmd *cobra.Command, args []string) {},
}
rootCmd.AddCommand(pathCmd)
rootCmd.AddCommand(ensureCmd)
rootCmd.AddCommand(renderTemplateCmd)
rootCmd.AddCommand(renderPropertiesCmd)
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
if err := rootCmd.ExecuteContext(ctx); err != nil {
fmt.Fprintf(os.Stderr, "error in executing the command: %s", err)
os.Exit(1)
}
}

View File

@ -1,355 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"os"
"reflect"
"testing"
)
func assertEqual(a string, b string, t *testing.T) {
if a != b {
t.Error(a + " != " + b)
}
}
func Test_ensure(t *testing.T) {
type args struct {
envVar string
}
err := os.Setenv("ENV_VAR", "value")
if err != nil {
t.Fatal("Unable to set ENV_VAR for the test")
}
tests := []struct {
name string
args args
want bool
}{
{
name: "should exist",
args: args{
envVar: "ENV_VAR",
},
want: true,
},
{
name: "should not exist",
args: args{
envVar: "RANDOM_ENV_VAR",
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := ensure(tt.args.envVar); got != tt.want {
t.Errorf("ensure() = %v, want %v", got, tt.want)
}
})
}
}
func Test_path(t *testing.T) {
type args struct {
filePath string
operation string
}
const (
sampleFile = "testResources/sampleFile"
sampleFile2 = "testResources/sampleFile2"
fileDoesNotExist = "testResources/sampleFile3"
)
err := os.Chmod(sampleFile, 0777)
if err != nil {
t.Error("Unable to set permissions for the file")
}
err = os.Chmod(sampleFile2, 0000)
if err != nil {
t.Error("Unable to set permissions for the file")
}
tests := []struct {
name string
args args
want bool
wantErr bool
}{
{
name: "file readable",
args: args{filePath: sampleFile,
operation: "readable"},
want: true,
wantErr: false,
},
{
name: "file writable",
args: args{filePath: sampleFile,
operation: "writable"},
want: true,
wantErr: false,
},
{
name: "file executable",
args: args{filePath: sampleFile,
operation: "executable"},
want: true,
wantErr: false,
},
{
name: "file existence",
args: args{filePath: sampleFile,
operation: "existence"},
want: true,
wantErr: false,
},
{
name: "file not readable",
args: args{filePath: sampleFile2,
operation: "readable"},
want: false,
wantErr: true,
},
{
name: "file not writable",
args: args{filePath: sampleFile2,
operation: "writable"},
want: false,
wantErr: true,
},
{
name: "file not executable",
args: args{filePath: sampleFile2,
operation: "executable"},
want: false,
wantErr: false,
},
{
name: "file does not exist",
args: args{filePath: fileDoesNotExist,
operation: "existence"},
want: false,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := path(tt.args.filePath, tt.args.operation)
if (err != nil) != tt.wantErr {
t.Errorf("path() error = %v, wantErr %v", err, tt.wantErr)
}
if got != tt.want {
t.Errorf("path() = %v, want %v", got, tt.want)
}
})
}
}
func Test_renderTemplate(t *testing.T) {
type args struct {
templateFilePath string
}
const (
fileExistsAndRenderable = "testResources/sampleLog4j.template"
fileDoesNotExist = "testResources/RandomFileName"
)
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "render template success",
args: args{templateFilePath: fileExistsAndRenderable},
wantErr: false,
},
{
name: "render template failure ",
args: args{templateFilePath: fileDoesNotExist},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := renderTemplate(tt.args.templateFilePath); (err != nil) != tt.wantErr {
t.Errorf("renderTemplate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func Test_convertKey(t *testing.T) {
type args struct {
key string
}
tests := []struct {
name string
args args
wantString string
}{
{
name: "Capitals",
args: args{key: "KEY"},
wantString: "key",
},
{
name: "Capitals with underscore",
args: args{key: "KEY_FOO"},
wantString: "key.foo",
},
{
name: "Capitals with double underscore",
args: args{key: "KEY__UNDERSCORE"},
wantString: "key_underscore",
},
{
name: "Capitals with double and single underscore",
args: args{key: "KEY_WITH__UNDERSCORE_AND__MORE"},
wantString: "key.with_underscore.and_more",
},
{
name: "Capitals with triple underscore",
args: args{key: "KEY___DASH"},
wantString: "key-dash",
},
{
name: "capitals with double,triple and single underscore",
args: args{key: "KEY_WITH___DASH_AND___MORE__UNDERSCORE"},
wantString: "key.with-dash.and-more_underscore",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if result := ConvertKey(tt.args.key); result != tt.wantString {
t.Errorf("ConvertKey() result = %v, wantStr %v", result, tt.wantString)
}
})
}
}
func Test_buildProperties(t *testing.T) {
type args struct {
spec ConfigSpec
environment map[string]string
}
tests := []struct {
name string
args args
want map[string]string
}{
{
name: "only defaults",
args: args{
spec: ConfigSpec{
Defaults: map[string]string{
"default.property.key": "default.property.value",
"bootstrap.servers": "unknown",
},
},
environment: map[string]string{
"PATH": "thePath",
"KAFKA_BOOTSTRAP_SERVERS": "localhost:9092",
"KAFKA_IGNORED": "ignored",
"KAFKA_EXCLUDE_PREFIX_PROPERTY": "ignored",
},
},
want: map[string]string{"bootstrap.servers": "unknown", "default.property.key": "default.property.value"},
},
{
name: "server properties",
args: args{
spec: ConfigSpec{
Prefixes: map[string]bool{"KAFKA": false},
Excludes: []string{"KAFKA_IGNORED"},
Renamed: map[string]string{},
Defaults: map[string]string{
"default.property.key": "default.property.value",
"bootstrap.servers": "unknown",
},
ExcludeWithPrefix: "KAFKA_EXCLUDE_PREFIX_",
},
environment: map[string]string{
"PATH": "thePath",
"KAFKA_BOOTSTRAP_SERVERS": "localhost:9092",
"KAFKA_IGNORED": "ignored",
"KAFKA_EXCLUDE_PREFIX_PROPERTY": "ignored",
},
},
want: map[string]string{"bootstrap.servers": "localhost:9092", "default.property.key": "default.property.value"},
},
{
name: "kafka properties",
args: args{
spec: ConfigSpec{
Prefixes: map[string]bool{"KAFKA": false},
Excludes: []string{"KAFKA_IGNORED"},
Renamed: map[string]string{},
Defaults: map[string]string{
"default.property.key": "default.property.value",
"bootstrap.servers": "unknown",
},
ExcludeWithPrefix: "KAFKA_EXCLUDE_PREFIX_",
},
environment: map[string]string{
"KAFKA_FOO": "foo",
"KAFKA_FOO_BAR": "bar",
"KAFKA_IGNORED": "ignored",
"KAFKA_WITH__UNDERSCORE": "with underscore",
"KAFKA_WITH__UNDERSCORE_AND_MORE": "with underscore and more",
"KAFKA_WITH___DASH": "with dash",
"KAFKA_WITH___DASH_AND_MORE": "with dash and more",
},
},
want: map[string]string{"bootstrap.servers": "unknown", "default.property.key": "default.property.value", "foo": "foo", "foo.bar": "bar", "with-dash": "with dash", "with-dash.and.more": "with dash and more", "with_underscore": "with underscore", "with_underscore.and.more": "with underscore and more"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := buildProperties(tt.args.spec, tt.args.environment); !reflect.DeepEqual(got, tt.want) {
t.Errorf("buildProperties() = %v, want %v", got, tt.want)
}
})
}
}
func Test_splitToMapDefaults(t *testing.T) {
type args struct {
separator string
defaultValues string
value string
}
tests := []struct {
name string
args args
want map[string]string
}{
{
name: "split to default",
args: args{
separator: ",",
defaultValues: "kafka=INFO,kafka.producer.async.DefaultEventHandler=DEBUG,state.change.logger=TRACE",
value: "kafka.producer.async.DefaultEventHandler=ERROR,kafka.request.logger=WARN",
},
want: map[string]string{"kafka": "INFO", "kafka.producer.async.DefaultEventHandler": "ERROR", "kafka.request.logger": "WARN", "state.change.logger": "TRACE"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := splitToMapDefaults(tt.args.separator, tt.args.defaultValues, tt.args.value); !reflect.DeepEqual(got, tt.want) {
t.Errorf("splitToMapDefaults() = %v, want %v", got, tt.want)
}
})
}
}