diff --git a/build.gradle b/build.gradle
index 90e79f7fe73..1f47d70af55 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1087,6 +1087,7 @@ project(':core') {
implementation libs.jacksonModuleScala
implementation libs.jacksonDataformatCsv
implementation libs.jacksonJDK8Datatypes
+ implementation libs.jacksonDatabindYaml
implementation libs.joptSimple
implementation libs.jose4j
implementation libs.metrics
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index 4d804a2c723..8de4f5ac536 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -74,6 +74,14 @@
+
+
+
+
+
+
+
+
diff --git a/core/src/main/java/kafka/docker/Log4jConfiguration.java b/core/src/main/java/kafka/docker/Log4jConfiguration.java
new file mode 100644
index 00000000000..45b06760066
--- /dev/null
+++ b/core/src/main/java/kafka/docker/Log4jConfiguration.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.docker;
+
+import com.fasterxml.jackson.annotation.JsonAnyGetter;
+import com.fasterxml.jackson.annotation.JsonAnySetter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Log4jConfiguration {
+ private Configuration configuration;
+
+ @JsonProperty("Configuration")
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ public void setConfiguration(Configuration configuration) {
+ this.configuration = configuration;
+ }
+}
+
+@JsonPropertyOrder({ "Properties", "Appenders", "Loggers" })
+@JsonIgnoreProperties(ignoreUnknown = true)
+class Configuration {
+ private Properties properties;
+ private Appenders appenders;
+ private Loggers loggers;
+ private final Map additionalProperties = new LinkedHashMap<>();
+
+ @JsonProperty("Properties")
+ public Properties getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Properties properties) {
+ this.properties = properties;
+ }
+
+ @JsonProperty("Appenders")
+ public Appenders getAppenders() {
+ return appenders;
+ }
+
+ public void setAppenders(Appenders appenders) {
+ this.appenders = appenders;
+ }
+
+ @JsonProperty("Loggers")
+ public Loggers getLoggers() {
+ return loggers;
+ }
+
+ public void setLoggers(Loggers loggers) {
+ this.loggers = loggers;
+ }
+
+ @JsonAnyGetter
+ public Map getAdditionalProperties() {
+ return additionalProperties;
+ }
+
+ @JsonAnySetter
+ public void setAdditionalProperties(String key, Object value) {
+ additionalProperties.put(key, value);
+ }
+}
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+class Properties {
+ private final Map properties = new LinkedHashMap<>();
+
+ @JsonAnyGetter
+ public Map getProperties() {
+ return properties;
+ }
+
+ @JsonAnySetter
+ public void setProperties(String key, Object value) {
+ properties.put(key, value);
+ }
+}
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+class Appenders {
+ private final Map properties = new LinkedHashMap<>();
+
+ @JsonAnyGetter
+ public Map getProperties() {
+ return properties;
+ }
+
+ @JsonAnySetter
+ public void setProperties(String key, Object value) {
+ properties.put(key, value);
+ }
+}
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+class Loggers {
+ private Root root;
+ private List logger = Collections.emptyList();
+
+ @JsonProperty("Root")
+ public Root getRoot() {
+ return root;
+ }
+
+ public void setRoot(Root root) {
+ this.root = root;
+ }
+
+ @JsonProperty("Logger")
+ public List getLogger() {
+ return logger;
+ }
+
+ public void setLogger(List logger) {
+ this.logger = logger;
+ }
+}
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+class Root {
+ private String level;
+ private final Map otherProperties = new LinkedHashMap<>();
+
+ public String getLevel() {
+ return level;
+ }
+
+ public void setLevel(String level) {
+ this.level = level;
+ }
+
+ @JsonAnyGetter
+ public Map getOtherProperties() {
+ return otherProperties;
+ }
+
+ @JsonAnySetter
+ public void setOtherProperties(String key, Object value) {
+ otherProperties.put(key, value);
+ }
+}
+
+@JsonPropertyOrder({ "name", "level" })
+@JsonIgnoreProperties(ignoreUnknown = true)
+class Logger {
+ private String name;
+ private String level;
+ private final Map otherProperties = new LinkedHashMap<>();
+
+ @JsonProperty("name")
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @JsonProperty("level")
+ public String getLevel() {
+ return level;
+ }
+
+ public void setLevel(String level) {
+ this.level = level;
+ }
+
+ @JsonAnyGetter
+ public Map getOtherProperties() {
+ return otherProperties;
+ }
+
+ @JsonAnySetter
+ public void setOtherProperties(String key, Object value) {
+ otherProperties.put(key, value);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Logger logger = (Logger) o;
+ return Objects.equals(name, logger.name) &&
+ Objects.equals(level, logger.level) &&
+ Objects.equals(otherProperties, logger.otherProperties);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hashCode(name);
+ result = 31 * result + Objects.hashCode(level);
+ result = 31 * result + Objects.hashCode(otherProperties);
+ return result;
+ }
+}
diff --git a/core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala b/core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala
index 2c0238787f8..c0b586d1bf6 100644
--- a/core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala
+++ b/core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala
@@ -16,6 +16,11 @@
*/
package kafka.docker
+import com.fasterxml.jackson.annotation.JsonInclude
+import com.fasterxml.jackson.databind.exc.MismatchedInputException
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
+import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature
import kafka.Kafka
import kafka.tools.StorageTool
import kafka.utils.Logging
@@ -26,6 +31,7 @@ import org.apache.kafka.common.utils.Exit
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths, StandardCopyOption, StandardOpenOption}
+import scala.jdk.CollectionConverters._
object KafkaDockerWrapper extends Logging {
def main(args: Array[String]): Unit = {
@@ -79,7 +85,7 @@ object KafkaDockerWrapper extends Logging {
required(true).
help(
"""Directory which holds default properties. It should contain the three file:-
- |server.properties, log4j.properties and tools-log4j.properties.
+ |server.properties, log4j2.yaml and tools-log4j2.yaml.
|""".stripMargin)
setupParser.addArgument("--mounted-configs-dir", "-M").
@@ -87,7 +93,7 @@ object KafkaDockerWrapper extends Logging {
required(true).
help(
"""Directory which holds user mounted properties. It can contain none to all the three files:-
- |server.properties, log4j.properties and tools-log4j.properties.""".stripMargin)
+ |server.properties, log4j2.yaml and tools-log4j2.yaml.""".stripMargin)
setupParser.addArgument("--final-configs-dir", "-F").
action(store()).
@@ -109,8 +115,8 @@ object KafkaDockerWrapper extends Logging {
private def prepareConfigs(defaultConfigsPath: Path, mountedConfigsPath: Path, finalConfigsPath: Path): Unit = {
prepareServerConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
- prepareLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
- prepareToolsLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
+ prepareLog4j2Configs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
+ prepareToolsLog4j2Configs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
}
private[docker] def prepareServerConfigs(defaultConfigsPath: Path,
@@ -137,36 +143,35 @@ object KafkaDockerWrapper extends Logging {
}
}
- private[docker] def prepareLog4jConfigs(defaultConfigsPath: Path,
- mountedConfigsPath: Path,
- finalConfigsPath: Path,
- env: Map[String, String]): Unit = {
- val propsToAdd = getLog4jConfigsFromEnv(env)
+ private[docker] def prepareLog4j2Configs(defaultConfigsPath: Path,
+ mountedConfigsPath: Path,
+ finalConfigsPath: Path,
+ env: Map[String, String]): Unit = {
+ val loggerFromEnv = getLog4j2ConfigsFromEnv(env)
+ val rootOption = getLog4j2RootConfigsFromEnv(env)
- val defaultFilePath = defaultConfigsPath.resolve(s"$Log4jPropsFilename")
- val mountedFilePath = mountedConfigsPath.resolve(s"$Log4jPropsFilename")
- val finalFilePath = finalConfigsPath.resolve(s"$Log4jPropsFilename")
+ val defaultFilePath = defaultConfigsPath.resolve(s"$Log4j2PropsFilename")
+ val mountedFilePath = mountedConfigsPath.resolve(s"$Log4j2PropsFilename")
+ val finalFilePath = finalConfigsPath.resolve(s"$Log4j2PropsFilename")
copyFile(defaultFilePath, finalFilePath)
copyFile(mountedFilePath, finalFilePath)
- addToFile(propsToAdd, finalFilePath, StandardOpenOption.APPEND)
+ addToYaml(loggerFromEnv, rootOption, finalFilePath)
}
- private[docker] def prepareToolsLog4jConfigs(defaultConfigsPath: Path,
- mountedConfigsPath: Path,
- finalConfigsPath: Path,
- env: Map[String, String]): Unit = {
- val propToAdd = getToolsLog4jConfigsFromEnv(env)
-
- val defaultFilePath = defaultConfigsPath.resolve(s"$ToolsLog4jFilename")
- val mountedFilePath = mountedConfigsPath.resolve(s"$ToolsLog4jFilename")
- val finalFilePath = finalConfigsPath.resolve(s"$ToolsLog4jFilename")
+ private[docker] def prepareToolsLog4j2Configs(defaultConfigsPath: Path,
+ mountedConfigsPath: Path,
+ finalConfigsPath: Path,
+ env: Map[String, String]): Unit = {
+ val defaultFilePath = defaultConfigsPath.resolve(s"$ToolsLog4j2Filename")
+ val mountedFilePath = mountedConfigsPath.resolve(s"$ToolsLog4j2Filename")
+ val finalFilePath = finalConfigsPath.resolve(s"$ToolsLog4j2Filename")
copyFile(defaultFilePath, finalFilePath)
copyFile(mountedFilePath, finalFilePath)
- addToFile(propToAdd, finalFilePath, StandardOpenOption.APPEND)
+ addToYaml(Array.empty, getToolsLog4j2ConfigsFromEnv(env), finalFilePath)
}
private[docker] def getServerConfigsFromEnv(env: Map[String, String]): List[String] = {
@@ -186,29 +191,36 @@ object KafkaDockerWrapper extends Logging {
.filterNot(_.trim.isEmpty)
}
- private[docker] def getLog4jConfigsFromEnv(env: Map[String, String]): String = {
- val kafkaLog4jRootLogLevelProp = env.get(KafkaLog4jRootLoglevelEnv)
+ private[docker] def getLog4j2RootConfigsFromEnv(env: Map[String, String]): Option[Root] = {
+ env.get(KafkaLog4jRootLoglevelEnv)
.filter(_.nonEmpty)
- .map(kafkaLog4jRootLogLevel => s"log4j.rootLogger=$kafkaLog4jRootLogLevel, stdout")
- .getOrElse("")
-
- val kafkaLog4jLoggersProp = env.get(KafkaLog4JLoggersEnv)
- .filter(_.nonEmpty)
- .map {
- kafkaLog4JLoggersString =>
- kafkaLog4JLoggersString.split(",")
- .map(kafkaLog4JLogger => s"log4j.logger.$kafkaLog4JLogger")
- .mkString(NewlineChar)
- }.getOrElse("")
-
- addNewlinePadding(kafkaLog4jRootLogLevelProp) + addNewlinePadding(kafkaLog4jLoggersProp)
+ .map(buildRootLogger).getOrElse(Option.empty)
}
- private[docker] def getToolsLog4jConfigsFromEnv(env: Map[String, String]): String = {
+ private[docker] def getToolsLog4j2ConfigsFromEnv(env: Map[String, String]): Option[Root] = {
env.get(KafkaToolsLog4jLoglevelEnv)
.filter(_.nonEmpty)
- .map(kafkaToolsLog4jLogLevel => addNewlinePadding(s"log4j.rootLogger=$kafkaToolsLog4jLogLevel, stderr"))
- .getOrElse("")
+ .map(buildRootLogger).getOrElse(Option.empty)
+ }
+
+ private def buildRootLogger(level: String) = {
+ val root = new Root
+ root.setLevel(level)
+ Option.apply(root)
+ }
+
+ private[docker] def getLog4j2ConfigsFromEnv(env: Map[String, String]): Array[Logger] = {
+ env.get(KafkaLog4JLoggersEnv)
+ .filter(_.nonEmpty)
+ .map { loggersString =>
+ loggersString.split(",").map { e =>
+ val parts = e.split("=")
+ val logger = new Logger()
+ logger.setName(parts(0).trim)
+ logger.setLevel(parts(1).trim)
+ logger
+ }
+ }.getOrElse(Array.empty[Logger])
}
private def addToFile(properties: String, filepath: Path, mode: StandardOpenOption): Unit = {
@@ -219,6 +231,68 @@ object KafkaDockerWrapper extends Logging {
Files.write(filepath, properties.getBytes(StandardCharsets.UTF_8), mode)
}
+ private def addToYaml(loggerFromEnv: Array[Logger], rootOption: Option[Root], filepath: Path): Unit = {
+ val path = filepath
+ if (!Files.exists(path)) {
+ Files.createFile(path)
+ }
+
+ val mapper = new ObjectMapper(new YAMLFactory().disable(Feature.WRITE_DOC_START_MARKER))
+ .configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, true)
+ .setSerializationInclusion(JsonInclude.Include.NON_EMPTY)
+ .findAndRegisterModules();
+
+ val yaml = try {
+ mapper.readValue(filepath.toFile, classOf[Log4jConfiguration])
+ } catch {
+ case _: MismatchedInputException => new Log4jConfiguration
+ case e: RuntimeException => throw e
+ }
+ val config = yaml.getConfiguration
+
+ if (config == null && loggerFromEnv.isEmpty && rootOption.isEmpty) {
+ return
+ }
+
+ if (config == null) {
+ generateDefaultLog4jConfig(loggerFromEnv, rootOption, filepath, mapper)
+ } else {
+ overrideLog4jConfigByEnv(loggerFromEnv, rootOption, filepath, mapper, yaml, config)
+ }
+ }
+
+ private def generateDefaultLog4jConfig(loggerFromEnv: Array[Logger], rootOption: Option[Root], filepath: Path, mapper: ObjectMapper): Unit = {
+ val log4jYaml = new Log4jConfiguration
+ val configuration = new Configuration
+ val loggers = new Loggers
+ val root = if (rootOption.isEmpty) {
+ val root = new Root
+ // log4j default root logger level
+ root.setLevel("ERROR")
+ root
+ } else rootOption.get
+ log4jYaml.setConfiguration(configuration)
+ configuration.setLoggers(loggers)
+ loggers.setRoot(root)
+ loggers.setLogger(loggerFromEnv.toList.asJava)
+ Files.write(filepath, mapper.writeValueAsString(log4jYaml).getBytes(StandardCharsets.UTF_8), StandardOpenOption.TRUNCATE_EXISTING)
+ }
+
+ private def overrideLog4jConfigByEnv(loggerFromEnv: Array[Logger],
+ rootOption: Option[Root],
+ filepath: Path,
+ mapper: ObjectMapper,
+ yaml: Log4jConfiguration,
+ config: Configuration): Unit = {
+ val nameToLoggers = config.getLoggers.getLogger.asScala.map(logger => (logger.getName, logger)).to(collection.mutable.Map)
+ loggerFromEnv.foreach(logger => nameToLoggers.put(logger.getName, logger))
+ config.getLoggers.setLogger(nameToLoggers.values.toList.asJava)
+ if (rootOption.isDefined) {
+ config.getLoggers.setRoot(rootOption.get)
+ }
+ Files.write(filepath, mapper.writeValueAsString(yaml).getBytes(StandardCharsets.UTF_8), StandardOpenOption.TRUNCATE_EXISTING)
+ }
+
private def copyFile(source: Path, destination: Path) = {
if (Files.exists(source)) {
Files.copy(source, destination, StandardCopyOption.REPLACE_EXISTING)
@@ -238,8 +312,8 @@ object KafkaDockerWrapper extends Logging {
private object Constants {
val ServerPropsFilename = "server.properties"
- val Log4jPropsFilename = "log4j.properties"
- val ToolsLog4jFilename = "tools-log4j.properties"
+ val Log4j2PropsFilename = "log4j2.yaml"
+ val ToolsLog4j2Filename = "tools-log4j2.yaml"
val KafkaLog4JLoggersEnv = "KAFKA_LOG4J_LOGGERS"
val KafkaLog4jRootLoglevelEnv = "KAFKA_LOG4J_ROOT_LOGLEVEL"
val KafkaToolsLog4jLoglevelEnv = "KAFKA_TOOLS_LOG4J_LOGLEVEL"
diff --git a/core/src/test/scala/unit/kafka/docker/KafkaDockerWrapperTest.scala b/core/src/test/scala/unit/kafka/docker/KafkaDockerWrapperTest.scala
index d01644de1fd..81dcad01a99 100644
--- a/core/src/test/scala/unit/kafka/docker/KafkaDockerWrapperTest.scala
+++ b/core/src/test/scala/unit/kafka/docker/KafkaDockerWrapperTest.scala
@@ -16,7 +16,7 @@
*/
package kafka.docker
-import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, assertThrows}
+import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.Test
import java.nio.charset.StandardCharsets
@@ -150,31 +150,35 @@ class KafkaDockerWrapperTest {
"KAFKA_LOG4J_ROOT_LOGLEVEL" -> "ERROR",
"SOME_VARIABLE" -> "Some Value"
)
- val expected = "\n" + "log4j.rootLogger=ERROR, stdout" + "\n" +
- "log4j.logger.kafka=INFO" + "\n" +
- "log4j.logger.kafka.network.RequestChannel$=WARN" + "\n" +
- "log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG"
- val actual = KafkaDockerWrapper.getLog4jConfigsFromEnv(envVars)
- assertEquals(expected, actual)
+ val kafkaLogger = new Logger
+ kafkaLogger.setName("kafka")
+ kafkaLogger.setLevel("INFO")
+
+ val requestChannelLogger = new Logger
+ requestChannelLogger.setName("kafka.network.RequestChannel$")
+ requestChannelLogger.setLevel("WARN")
+
+ val defaultEventHandlerLogger = new Logger
+ defaultEventHandlerLogger.setName("kafka.producer.async.DefaultEventHandler")
+ defaultEventHandlerLogger.setLevel("DEBUG")
+
+ val actual = KafkaDockerWrapper.getLog4j2ConfigsFromEnv(envVars)
+ assertEquals(List.apply(kafkaLogger, requestChannelLogger, defaultEventHandlerLogger), actual.toList)
}
@Test
def testGetLog4jConfigsFromEnvInvalidEnvVariable(): Unit = {
val envVars = Map("SOME_VARIABLE" -> "Some Value")
- val expected = ""
-
- val actual = KafkaDockerWrapper.getLog4jConfigsFromEnv(envVars)
- assertEquals(expected, actual)
+ val actual = KafkaDockerWrapper.getLog4j2ConfigsFromEnv(envVars)
+ assertTrue(actual.isEmpty)
}
@Test
def testGetLog4jConfigsFromEnvWithEmptyEnvVariable(): Unit = {
val envVars = Map("SOME_VARIABLE" -> "Some Value", "KAFKA_LOG4J_LOGGERS" -> "", "KAFKA_LOG4J_ROOT_LOGLEVEL" -> "")
- val expected = ""
-
- val actual = KafkaDockerWrapper.getLog4jConfigsFromEnv(envVars)
- assertEquals(expected, actual)
+ val actual = KafkaDockerWrapper.getLog4j2ConfigsFromEnv(envVars)
+ assertTrue(actual.isEmpty)
}
@Test
@@ -187,18 +191,26 @@ class KafkaDockerWrapperTest {
"SOME_VARIABLE" -> "Some Value"
)
- Files.write(defaultConfigsPath.resolve("log4j.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
- Files.write(mountedConfigsPath.resolve("log4j.properties"), "mounted.config=mounted value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
- Files.write(finalConfigsPath.resolve("log4j.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
+ Files.write(defaultConfigsPath.resolve("log4j2.yaml"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
+ Files.write(mountedConfigsPath.resolve("log4j2.yaml"), "mounted.config=mounted value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
+ Files.write(finalConfigsPath.resolve("log4j2.yaml"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
- KafkaDockerWrapper.prepareLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
+ KafkaDockerWrapper.prepareLog4j2Configs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
- val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/log4j.properties")
- val actual = try source.mkString finally source.close()
- val expected = "mounted.config=mounted value" + "\n" + "log4j.rootLogger=ERROR, stdout" + "\n" +
- "log4j.logger.kafka=INFO" + "\n" +
- "log4j.logger.kafka.network.RequestChannel$=WARN" + "\n" +
- "log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG"
+ val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/log4j2.yaml")
+ val actual = try source.mkString.trim finally source.close()
+ val expected =
+ """Configuration:
+ | Loggers:
+ | Root:
+ | level: "ERROR"
+ | Logger:
+ | - name: "kafka"
+ | level: "INFO"
+ | - name: "kafka.network.RequestChannel$"
+ | level: "WARN"
+ | - name: "kafka.producer.async.DefaultEventHandler"
+ | level: "DEBUG"""".stripMargin
assertEquals(expected, actual)
}
@@ -213,17 +225,47 @@ class KafkaDockerWrapperTest {
"SOME_VARIABLE" -> "Some Value"
)
- Files.write(defaultConfigsPath.resolve("log4j.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
- Files.write(finalConfigsPath.resolve("log4j.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
+ val default =
+ """Configuration:
+ | Appenders:
+ | Console:
+ | name: "Console"
+ | target: "SYSTEM_OUT"
+ | PatternLayout:
+ | pattern: "123"
+ | Loggers:
+ | Root:
+ | level: "123"
+ | Logger:
+ | - name: kafka
+ | level: 123""".stripMargin
- KafkaDockerWrapper.prepareLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
+ Files.write(defaultConfigsPath.resolve("log4j2.yaml"), default.getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
+ Files.write(finalConfigsPath.resolve("log4j2.yaml"), "".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
- val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/log4j.properties")
- val actual = try source.mkString finally source.close()
- val expected = "default.config=default value" + "\n" + "log4j.rootLogger=ERROR, stdout" + "\n" +
- "log4j.logger.kafka=INFO" + "\n" +
- "log4j.logger.kafka.network.RequestChannel$=WARN" + "\n" +
- "log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG"
+ KafkaDockerWrapper.prepareLog4j2Configs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
+
+ val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/log4j2.yaml")
+ val actual = try source.mkString.trim finally source.close()
+
+ val expected =
+ """Configuration:
+ | Appenders:
+ | Console:
+ | name: "Console"
+ | target: "SYSTEM_OUT"
+ | PatternLayout:
+ | pattern: "123"
+ | Loggers:
+ | Root:
+ | level: "ERROR"
+ | Logger:
+ | - name: "kafka.network.RequestChannel$"
+ | level: "WARN"
+ | - name: "kafka.producer.async.DefaultEventHandler"
+ | level: "DEBUG"
+ | - name: "kafka"
+ | level: "INFO"""".stripMargin
assertEquals(expected, actual)
}
@@ -234,41 +276,69 @@ class KafkaDockerWrapperTest {
val envVars = Map.empty[String, String]
- Files.write(defaultConfigsPath.resolve("log4j.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
- Files.write(mountedConfigsPath.resolve("log4j.properties"), "mounted.config=mounted value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
- Files.write(finalConfigsPath.resolve("log4j.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
+ val default =
+ """
+ |Configuration:
+ | Appenders:
+ | Console:
+ | name: "Console"
+ | target: "SYSTEM_OUT"
+ | PatternLayout:
+ | pattern: "123"
+ | Loggers:
+ | Root:
+ | level: "123"
+ | Logger:
+ | - name: kafka
+ | level: 123""".stripMargin
- KafkaDockerWrapper.prepareLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
+ val mounted =
+ """Configuration:
+ | Appenders:
+ | Console:
+ | name: "Console"
+ | target: "SYSTEM_OUT"
+ | PatternLayout:
+ | pattern: "[%d] %p %m (%c)%n"
+ | Loggers:
+ | Root:
+ | level: "ERROR"
+ | Logger:
+ | - name: "kafka"
+ | level: "DEBUG"""".stripMargin
- val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/log4j.properties")
- val actual = try source.mkString finally source.close()
- val expected = "mounted.config=mounted value"
+ Files.write(defaultConfigsPath.resolve("log4j2.yaml"), default.getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
+ Files.write(mountedConfigsPath.resolve("log4j2.yaml"), mounted.getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
+ Files.write(finalConfigsPath.resolve("log4j2.yaml"), "".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
- assertEquals(expected, actual)
+ KafkaDockerWrapper.prepareLog4j2Configs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
+
+ val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/log4j2.yaml")
+ val actual = try source.mkString.trim finally source.close()
+
+ assertEquals(mounted, actual)
}
@Test
def testGetToolsLog4jConfigsFromEnv(): Unit = {
val envVars = Map("KAFKA_TOOLS_LOG4J_LOGLEVEL" -> "TRACE", "SOME_VARIABLE" -> "Some Value")
- val expected = "\n" + "log4j.rootLogger=TRACE, stderr"
- val actual = KafkaDockerWrapper.getToolsLog4jConfigsFromEnv(envVars)
- assertEquals(expected, actual)
+ val actual = KafkaDockerWrapper.getToolsLog4j2ConfigsFromEnv(envVars)
+ assertTrue(actual.isDefined)
+ assertEquals(actual.get.getLevel, "TRACE")
}
@Test
def testGetToolsLog4jConfigsFromEnvInvalidEnvVariable(): Unit = {
val envVars = Map("SOME_VARIABLE" -> "Some Value")
- val expected = ""
- val actual = KafkaDockerWrapper.getToolsLog4jConfigsFromEnv(envVars)
- assertEquals(expected, actual)
+ val actual = KafkaDockerWrapper.getToolsLog4j2ConfigsFromEnv(envVars)
+ assertTrue(actual.isEmpty)
}
@Test
def testGetToolsLog4jConfigsFromEnvWithEmptyEnvVariable(): Unit = {
val envVars = Map("SOME_VARIABLE" -> "Some Value", "KAFKA_TOOLS_LOG4J_LOGLEVEL" -> "")
- val expected = ""
- val actual = KafkaDockerWrapper.getToolsLog4jConfigsFromEnv(envVars)
- assertEquals(expected, actual)
+ val actual = KafkaDockerWrapper.getToolsLog4j2ConfigsFromEnv(envVars)
+ assertTrue(actual.isEmpty)
}
@Test
@@ -276,16 +346,59 @@ class KafkaDockerWrapperTest {
val (defaultConfigsPath, mountedConfigsPath, finalConfigsPath) = createDirs()
val envVars = Map("KAFKA_TOOLS_LOG4J_LOGLEVEL" -> "TRACE")
+ val default =
+ """
+ |Configuration:
+ | Appenders:
+ | Console:
+ | name: "Console"
+ | target: "SYSTEM_OUT"
+ | PatternLayout:
+ | pattern: "123"
+ | Loggers:
+ | Root:
+ | level: "123"
+ | Logger:
+ | - name: kafka
+ | level: 123""".stripMargin
- Files.write(defaultConfigsPath.resolve("tools-log4j.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
- Files.write(mountedConfigsPath.resolve("tools-log4j.properties"), "mounted.config=mounted value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
- Files.write(finalConfigsPath.resolve("tools-log4j.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
+ val mounted =
+ """Configuration:
+ | Appenders:
+ | Console:
+ | name: "Console"
+ | target: "SYSTEM_OUT"
+ | PatternLayout:
+ | pattern: "[%d] %p %m (%c)%n"
+ | Loggers:
+ | Root:
+ | level: "ERROR"
+ | Logger:
+ | - name: "kafka"
+ | level: "DEBUG"""".stripMargin
- KafkaDockerWrapper.prepareToolsLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
+ Files.write(defaultConfigsPath.resolve("tools-log4j2.yaml"), default.getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
+ Files.write(mountedConfigsPath.resolve("tools-log4j2.yaml"), mounted.getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
+ Files.write(finalConfigsPath.resolve("tools-log4j2.yaml"), "".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
- val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/tools-log4j.properties")
- val actual = try source.mkString finally source.close()
- val expected = "mounted.config=mounted value" + "\n" + "log4j.rootLogger=TRACE, stderr"
+ KafkaDockerWrapper.prepareToolsLog4j2Configs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
+
+ val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/tools-log4j2.yaml")
+ val actual = try source.mkString.trim finally source.close()
+ val expected =
+ """Configuration:
+ | Appenders:
+ | Console:
+ | name: "Console"
+ | target: "SYSTEM_OUT"
+ | PatternLayout:
+ | pattern: "[%d] %p %m (%c)%n"
+ | Loggers:
+ | Root:
+ | level: "TRACE"
+ | Logger:
+ | - name: "kafka"
+ | level: "DEBUG"""".stripMargin
assertEquals(expected, actual)
}
@@ -296,14 +409,18 @@ class KafkaDockerWrapperTest {
val envVars = Map("KAFKA_TOOLS_LOG4J_LOGLEVEL" -> "TRACE")
- Files.write(defaultConfigsPath.resolve("tools-log4j.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
- Files.write(finalConfigsPath.resolve("tools-log4j.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
+ Files.write(defaultConfigsPath.resolve("tools-log4j2.yaml"), "".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
+ Files.write(finalConfigsPath.resolve("tools-log4j2.yaml"), "".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
- KafkaDockerWrapper.prepareToolsLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
+ KafkaDockerWrapper.prepareToolsLog4j2Configs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
- val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/tools-log4j.properties")
- val actual = try source.mkString finally source.close()
- val expected = "default.config=default value" + "\n" + "log4j.rootLogger=TRACE, stderr"
+ val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/tools-log4j2.yaml")
+ val actual = try source.mkString.trim finally source.close()
+ val expected =
+ """Configuration:
+ | Loggers:
+ | Root:
+ | level: "TRACE"""".stripMargin
assertEquals(expected, actual)
}
@@ -313,18 +430,47 @@ class KafkaDockerWrapperTest {
val (defaultConfigsPath, mountedConfigsPath, finalConfigsPath) = createDirs()
val envVars = Map.empty[String, String]
+ val default =
+ """
+ |Configuration:
+ | Appenders:
+ | Console:
+ | name: "Console"
+ | target: "SYSTEM_OUT"
+ | PatternLayout:
+ | pattern: "123"
+ | Loggers:
+ | Root:
+ | level: "123"
+ | Logger:
+ | - name: kafka
+ | level: 123""".stripMargin
- Files.write(defaultConfigsPath.resolve("tools-log4j.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
- Files.write(mountedConfigsPath.resolve("tools-log4j.properties"), "mounted.config=mounted value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
- Files.write(finalConfigsPath.resolve("tools-log4j.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
+ val mounted =
+ """Configuration:
+ | Appenders:
+ | Console:
+ | name: "Console"
+ | target: "SYSTEM_OUT"
+ | PatternLayout:
+ | pattern: "[%d] %p %m (%c)%n"
+ | Loggers:
+ | Root:
+ | level: "ERROR"
+ | Logger:
+ | - name: "kafka"
+ | level: "DEBUG"""".stripMargin
- KafkaDockerWrapper.prepareToolsLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
+ Files.write(defaultConfigsPath.resolve("tools-log4j2.yaml"), default.getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
+ Files.write(mountedConfigsPath.resolve("tools-log4j2.yaml"), mounted.getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
+ Files.write(finalConfigsPath.resolve("tools-log4j2.yaml"), "".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
- val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/tools-log4j.properties")
- val actual = try source.mkString finally source.close()
- val expected = "mounted.config=mounted value"
+ KafkaDockerWrapper.prepareToolsLog4j2Configs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
- assertEquals(expected, actual)
+ val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/tools-log4j2.yaml")
+ val actual = try source.mkString.trim finally source.close()
+
+ assertEquals(mounted, actual)
}
private def createDirs(): (Path, Path, Path) = {
diff --git a/docker/examples/docker-compose-files/cluster/isolated/plaintext/docker-compose.yml b/docker/examples/docker-compose-files/cluster/isolated/plaintext/docker-compose.yml
index edf71567ebe..54ecc00531a 100644
--- a/docker/examples/docker-compose-files/cluster/isolated/plaintext/docker-compose.yml
+++ b/docker/examples/docker-compose-files/cluster/isolated/plaintext/docker-compose.yml
@@ -22,7 +22,6 @@ services:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093'
- KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENERS: 'CONTROLLER://:9093'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
@@ -38,7 +37,6 @@ services:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093'
- KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENERS: 'CONTROLLER://:9093'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
@@ -54,7 +52,6 @@ services:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093'
- KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENERS: 'CONTROLLER://:9093'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
diff --git a/docker/examples/docker-compose-files/cluster/isolated/ssl/docker-compose.yml b/docker/examples/docker-compose-files/cluster/isolated/ssl/docker-compose.yml
index e3590508fef..f4b1ddb382c 100644
--- a/docker/examples/docker-compose-files/cluster/isolated/ssl/docker-compose.yml
+++ b/docker/examples/docker-compose-files/cluster/isolated/ssl/docker-compose.yml
@@ -22,7 +22,6 @@ services:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:29092,2@controller-2:29092,3@controller-3:29092'
- KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENERS: 'CONTROLLER://:29092'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
@@ -38,7 +37,6 @@ services:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:29092,2@controller-2:29092,3@controller-3:29092'
- KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENERS: 'CONTROLLER://:29092'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
@@ -54,7 +52,6 @@ services:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:29092,2@controller-2:29092,3@controller-3:29092'
- KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENERS: 'CONTROLLER://:29092'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
diff --git a/docker/test/fixtures/mode/isolated/docker-compose.yml b/docker/test/fixtures/mode/isolated/docker-compose.yml
index 5e27cbe5607..0dcf8d2f876 100644
--- a/docker/test/fixtures/mode/isolated/docker-compose.yml
+++ b/docker/test/fixtures/mode/isolated/docker-compose.yml
@@ -30,7 +30,6 @@ services:
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller1:19092,2@controller2:19092,3@controller3:19092'
KAFKA_LISTENERS: 'CONTROLLER://:19092'
- KAFKA_INTER_BROKER_LISTENER_NAME: 'CONTROLLER'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
@@ -49,7 +48,6 @@ services:
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller1:19092,2@controller2:19092,3@controller3:19092'
KAFKA_LISTENERS: 'CONTROLLER://:19092'
- KAFKA_INTER_BROKER_LISTENER_NAME: 'CONTROLLER'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
@@ -68,7 +66,6 @@ services:
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller1:19092,2@controller2:19092,3@controller3:19092'
KAFKA_LISTENERS: 'CONTROLLER://:19092'
- KAFKA_INTER_BROKER_LISTENER_NAME: 'CONTROLLER'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'