KAFKA-18396: Migrate log4j1 configuration to log4j2 in KafkaDockerWrapper (#18394)

After log4j migration, we need to update the logging configuration in KafkaDockerWrapper from log4j1 to log4j2.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
TengYao Chi 2025-02-11 15:55:23 +08:00 committed by Manikumar Reddy
parent 8e8423fed0
commit 438fbf5f8e
8 changed files with 565 additions and 124 deletions

View File

@ -1087,6 +1087,7 @@ project(':core') {
implementation libs.jacksonModuleScala
implementation libs.jacksonDataformatCsv
implementation libs.jacksonJDK8Datatypes
implementation libs.jacksonDatabindYaml
implementation libs.joptSimple
implementation libs.jose4j
implementation libs.metrics

View File

@ -74,6 +74,14 @@
<allow class="kafka.server.MetadataCache" />
</subpackage>
<subpackage name="docker">
<allow class="com.fasterxml.jackson.annotation.JsonAnyGetter" />
<allow class="com.fasterxml.jackson.annotation.JsonAnySetter" />
<allow class="com.fasterxml.jackson.annotation.JsonProperty" />
<allow class="com.fasterxml.jackson.annotation.JsonPropertyOrder" />
<allow class="com.fasterxml.jackson.annotation.JsonIgnoreProperties" />
</subpackage>
<subpackage name="examples">
<allow pkg="org.apache.kafka.clients" />
</subpackage>

View File

@ -0,0 +1,221 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.docker;
import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@JsonIgnoreProperties(ignoreUnknown = true)
public class Log4jConfiguration {
private Configuration configuration;
@JsonProperty("Configuration")
public Configuration getConfiguration() {
return configuration;
}
public void setConfiguration(Configuration configuration) {
this.configuration = configuration;
}
}
@JsonPropertyOrder({ "Properties", "Appenders", "Loggers" })
@JsonIgnoreProperties(ignoreUnknown = true)
class Configuration {
private Properties properties;
private Appenders appenders;
private Loggers loggers;
private final Map<String, Object> additionalProperties = new LinkedHashMap<>();
@JsonProperty("Properties")
public Properties getProperties() {
return properties;
}
public void setProperties(Properties properties) {
this.properties = properties;
}
@JsonProperty("Appenders")
public Appenders getAppenders() {
return appenders;
}
public void setAppenders(Appenders appenders) {
this.appenders = appenders;
}
@JsonProperty("Loggers")
public Loggers getLoggers() {
return loggers;
}
public void setLoggers(Loggers loggers) {
this.loggers = loggers;
}
@JsonAnyGetter
public Map<String, Object> getAdditionalProperties() {
return additionalProperties;
}
@JsonAnySetter
public void setAdditionalProperties(String key, Object value) {
additionalProperties.put(key, value);
}
}
@JsonIgnoreProperties(ignoreUnknown = true)
class Properties {
private final Map<String, Object> properties = new LinkedHashMap<>();
@JsonAnyGetter
public Map<String, Object> getProperties() {
return properties;
}
@JsonAnySetter
public void setProperties(String key, Object value) {
properties.put(key, value);
}
}
@JsonIgnoreProperties(ignoreUnknown = true)
class Appenders {
private final Map<String, Object> properties = new LinkedHashMap<>();
@JsonAnyGetter
public Map<String, Object> getProperties() {
return properties;
}
@JsonAnySetter
public void setProperties(String key, Object value) {
properties.put(key, value);
}
}
@JsonIgnoreProperties(ignoreUnknown = true)
class Loggers {
private Root root;
private List<Logger> logger = Collections.emptyList();
@JsonProperty("Root")
public Root getRoot() {
return root;
}
public void setRoot(Root root) {
this.root = root;
}
@JsonProperty("Logger")
public List<Logger> getLogger() {
return logger;
}
public void setLogger(List<Logger> logger) {
this.logger = logger;
}
}
@JsonIgnoreProperties(ignoreUnknown = true)
class Root {
private String level;
private final Map<String, Object> otherProperties = new LinkedHashMap<>();
public String getLevel() {
return level;
}
public void setLevel(String level) {
this.level = level;
}
@JsonAnyGetter
public Map<String, Object> getOtherProperties() {
return otherProperties;
}
@JsonAnySetter
public void setOtherProperties(String key, Object value) {
otherProperties.put(key, value);
}
}
@JsonPropertyOrder({ "name", "level" })
@JsonIgnoreProperties(ignoreUnknown = true)
class Logger {
private String name;
private String level;
private final Map<String, Object> otherProperties = new LinkedHashMap<>();
@JsonProperty("name")
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@JsonProperty("level")
public String getLevel() {
return level;
}
public void setLevel(String level) {
this.level = level;
}
@JsonAnyGetter
public Map<String, Object> getOtherProperties() {
return otherProperties;
}
@JsonAnySetter
public void setOtherProperties(String key, Object value) {
otherProperties.put(key, value);
}
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
Logger logger = (Logger) o;
return Objects.equals(name, logger.name) &&
Objects.equals(level, logger.level) &&
Objects.equals(otherProperties, logger.otherProperties);
}
@Override
public int hashCode() {
int result = Objects.hashCode(name);
result = 31 * result + Objects.hashCode(level);
result = 31 * result + Objects.hashCode(otherProperties);
return result;
}
}

View File

@ -16,6 +16,11 @@
*/
package kafka.docker
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.databind.exc.MismatchedInputException
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature
import kafka.Kafka
import kafka.tools.StorageTool
import kafka.utils.Logging
@ -26,6 +31,7 @@ import org.apache.kafka.common.utils.Exit
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths, StandardCopyOption, StandardOpenOption}
import scala.jdk.CollectionConverters._
object KafkaDockerWrapper extends Logging {
def main(args: Array[String]): Unit = {
@ -79,7 +85,7 @@ object KafkaDockerWrapper extends Logging {
required(true).
help(
"""Directory which holds default properties. It should contain the three file:-
|server.properties, log4j.properties and tools-log4j.properties.
|server.properties, log4j2.yaml and tools-log4j2.yaml.
|""".stripMargin)
setupParser.addArgument("--mounted-configs-dir", "-M").
@ -87,7 +93,7 @@ object KafkaDockerWrapper extends Logging {
required(true).
help(
"""Directory which holds user mounted properties. It can contain none to all the three files:-
|server.properties, log4j.properties and tools-log4j.properties.""".stripMargin)
|server.properties, log4j2.yaml and tools-log4j2.yaml.""".stripMargin)
setupParser.addArgument("--final-configs-dir", "-F").
action(store()).
@ -109,8 +115,8 @@ object KafkaDockerWrapper extends Logging {
private def prepareConfigs(defaultConfigsPath: Path, mountedConfigsPath: Path, finalConfigsPath: Path): Unit = {
prepareServerConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
prepareLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
prepareToolsLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
prepareLog4j2Configs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
prepareToolsLog4j2Configs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
}
private[docker] def prepareServerConfigs(defaultConfigsPath: Path,
@ -137,36 +143,35 @@ object KafkaDockerWrapper extends Logging {
}
}
private[docker] def prepareLog4jConfigs(defaultConfigsPath: Path,
mountedConfigsPath: Path,
finalConfigsPath: Path,
env: Map[String, String]): Unit = {
val propsToAdd = getLog4jConfigsFromEnv(env)
private[docker] def prepareLog4j2Configs(defaultConfigsPath: Path,
mountedConfigsPath: Path,
finalConfigsPath: Path,
env: Map[String, String]): Unit = {
val loggerFromEnv = getLog4j2ConfigsFromEnv(env)
val rootOption = getLog4j2RootConfigsFromEnv(env)
val defaultFilePath = defaultConfigsPath.resolve(s"$Log4jPropsFilename")
val mountedFilePath = mountedConfigsPath.resolve(s"$Log4jPropsFilename")
val finalFilePath = finalConfigsPath.resolve(s"$Log4jPropsFilename")
val defaultFilePath = defaultConfigsPath.resolve(s"$Log4j2PropsFilename")
val mountedFilePath = mountedConfigsPath.resolve(s"$Log4j2PropsFilename")
val finalFilePath = finalConfigsPath.resolve(s"$Log4j2PropsFilename")
copyFile(defaultFilePath, finalFilePath)
copyFile(mountedFilePath, finalFilePath)
addToFile(propsToAdd, finalFilePath, StandardOpenOption.APPEND)
addToYaml(loggerFromEnv, rootOption, finalFilePath)
}
private[docker] def prepareToolsLog4jConfigs(defaultConfigsPath: Path,
mountedConfigsPath: Path,
finalConfigsPath: Path,
env: Map[String, String]): Unit = {
val propToAdd = getToolsLog4jConfigsFromEnv(env)
val defaultFilePath = defaultConfigsPath.resolve(s"$ToolsLog4jFilename")
val mountedFilePath = mountedConfigsPath.resolve(s"$ToolsLog4jFilename")
val finalFilePath = finalConfigsPath.resolve(s"$ToolsLog4jFilename")
private[docker] def prepareToolsLog4j2Configs(defaultConfigsPath: Path,
mountedConfigsPath: Path,
finalConfigsPath: Path,
env: Map[String, String]): Unit = {
val defaultFilePath = defaultConfigsPath.resolve(s"$ToolsLog4j2Filename")
val mountedFilePath = mountedConfigsPath.resolve(s"$ToolsLog4j2Filename")
val finalFilePath = finalConfigsPath.resolve(s"$ToolsLog4j2Filename")
copyFile(defaultFilePath, finalFilePath)
copyFile(mountedFilePath, finalFilePath)
addToFile(propToAdd, finalFilePath, StandardOpenOption.APPEND)
addToYaml(Array.empty, getToolsLog4j2ConfigsFromEnv(env), finalFilePath)
}
private[docker] def getServerConfigsFromEnv(env: Map[String, String]): List[String] = {
@ -186,29 +191,36 @@ object KafkaDockerWrapper extends Logging {
.filterNot(_.trim.isEmpty)
}
private[docker] def getLog4jConfigsFromEnv(env: Map[String, String]): String = {
val kafkaLog4jRootLogLevelProp = env.get(KafkaLog4jRootLoglevelEnv)
private[docker] def getLog4j2RootConfigsFromEnv(env: Map[String, String]): Option[Root] = {
env.get(KafkaLog4jRootLoglevelEnv)
.filter(_.nonEmpty)
.map(kafkaLog4jRootLogLevel => s"log4j.rootLogger=$kafkaLog4jRootLogLevel, stdout")
.getOrElse("")
val kafkaLog4jLoggersProp = env.get(KafkaLog4JLoggersEnv)
.filter(_.nonEmpty)
.map {
kafkaLog4JLoggersString =>
kafkaLog4JLoggersString.split(",")
.map(kafkaLog4JLogger => s"log4j.logger.$kafkaLog4JLogger")
.mkString(NewlineChar)
}.getOrElse("")
addNewlinePadding(kafkaLog4jRootLogLevelProp) + addNewlinePadding(kafkaLog4jLoggersProp)
.map(buildRootLogger).getOrElse(Option.empty)
}
private[docker] def getToolsLog4jConfigsFromEnv(env: Map[String, String]): String = {
private[docker] def getToolsLog4j2ConfigsFromEnv(env: Map[String, String]): Option[Root] = {
env.get(KafkaToolsLog4jLoglevelEnv)
.filter(_.nonEmpty)
.map(kafkaToolsLog4jLogLevel => addNewlinePadding(s"log4j.rootLogger=$kafkaToolsLog4jLogLevel, stderr"))
.getOrElse("")
.map(buildRootLogger).getOrElse(Option.empty)
}
private def buildRootLogger(level: String) = {
val root = new Root
root.setLevel(level)
Option.apply(root)
}
private[docker] def getLog4j2ConfigsFromEnv(env: Map[String, String]): Array[Logger] = {
env.get(KafkaLog4JLoggersEnv)
.filter(_.nonEmpty)
.map { loggersString =>
loggersString.split(",").map { e =>
val parts = e.split("=")
val logger = new Logger()
logger.setName(parts(0).trim)
logger.setLevel(parts(1).trim)
logger
}
}.getOrElse(Array.empty[Logger])
}
private def addToFile(properties: String, filepath: Path, mode: StandardOpenOption): Unit = {
@ -219,6 +231,68 @@ object KafkaDockerWrapper extends Logging {
Files.write(filepath, properties.getBytes(StandardCharsets.UTF_8), mode)
}
private def addToYaml(loggerFromEnv: Array[Logger], rootOption: Option[Root], filepath: Path): Unit = {
val path = filepath
if (!Files.exists(path)) {
Files.createFile(path)
}
val mapper = new ObjectMapper(new YAMLFactory().disable(Feature.WRITE_DOC_START_MARKER))
.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, true)
.setSerializationInclusion(JsonInclude.Include.NON_EMPTY)
.findAndRegisterModules();
val yaml = try {
mapper.readValue(filepath.toFile, classOf[Log4jConfiguration])
} catch {
case _: MismatchedInputException => new Log4jConfiguration
case e: RuntimeException => throw e
}
val config = yaml.getConfiguration
if (config == null && loggerFromEnv.isEmpty && rootOption.isEmpty) {
return
}
if (config == null) {
generateDefaultLog4jConfig(loggerFromEnv, rootOption, filepath, mapper)
} else {
overrideLog4jConfigByEnv(loggerFromEnv, rootOption, filepath, mapper, yaml, config)
}
}
private def generateDefaultLog4jConfig(loggerFromEnv: Array[Logger], rootOption: Option[Root], filepath: Path, mapper: ObjectMapper): Unit = {
val log4jYaml = new Log4jConfiguration
val configuration = new Configuration
val loggers = new Loggers
val root = if (rootOption.isEmpty) {
val root = new Root
// log4j default root logger level
root.setLevel("ERROR")
root
} else rootOption.get
log4jYaml.setConfiguration(configuration)
configuration.setLoggers(loggers)
loggers.setRoot(root)
loggers.setLogger(loggerFromEnv.toList.asJava)
Files.write(filepath, mapper.writeValueAsString(log4jYaml).getBytes(StandardCharsets.UTF_8), StandardOpenOption.TRUNCATE_EXISTING)
}
private def overrideLog4jConfigByEnv(loggerFromEnv: Array[Logger],
rootOption: Option[Root],
filepath: Path,
mapper: ObjectMapper,
yaml: Log4jConfiguration,
config: Configuration): Unit = {
val nameToLoggers = config.getLoggers.getLogger.asScala.map(logger => (logger.getName, logger)).to(collection.mutable.Map)
loggerFromEnv.foreach(logger => nameToLoggers.put(logger.getName, logger))
config.getLoggers.setLogger(nameToLoggers.values.toList.asJava)
if (rootOption.isDefined) {
config.getLoggers.setRoot(rootOption.get)
}
Files.write(filepath, mapper.writeValueAsString(yaml).getBytes(StandardCharsets.UTF_8), StandardOpenOption.TRUNCATE_EXISTING)
}
private def copyFile(source: Path, destination: Path) = {
if (Files.exists(source)) {
Files.copy(source, destination, StandardCopyOption.REPLACE_EXISTING)
@ -238,8 +312,8 @@ object KafkaDockerWrapper extends Logging {
private object Constants {
val ServerPropsFilename = "server.properties"
val Log4jPropsFilename = "log4j.properties"
val ToolsLog4jFilename = "tools-log4j.properties"
val Log4j2PropsFilename = "log4j2.yaml"
val ToolsLog4j2Filename = "tools-log4j2.yaml"
val KafkaLog4JLoggersEnv = "KAFKA_LOG4J_LOGGERS"
val KafkaLog4jRootLoglevelEnv = "KAFKA_LOG4J_ROOT_LOGLEVEL"
val KafkaToolsLog4jLoglevelEnv = "KAFKA_TOOLS_LOG4J_LOGLEVEL"

View File

@ -16,7 +16,7 @@
*/
package kafka.docker
import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, assertThrows}
import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.Test
import java.nio.charset.StandardCharsets
@ -150,31 +150,35 @@ class KafkaDockerWrapperTest {
"KAFKA_LOG4J_ROOT_LOGLEVEL" -> "ERROR",
"SOME_VARIABLE" -> "Some Value"
)
val expected = "\n" + "log4j.rootLogger=ERROR, stdout" + "\n" +
"log4j.logger.kafka=INFO" + "\n" +
"log4j.logger.kafka.network.RequestChannel$=WARN" + "\n" +
"log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG"
val actual = KafkaDockerWrapper.getLog4jConfigsFromEnv(envVars)
assertEquals(expected, actual)
val kafkaLogger = new Logger
kafkaLogger.setName("kafka")
kafkaLogger.setLevel("INFO")
val requestChannelLogger = new Logger
requestChannelLogger.setName("kafka.network.RequestChannel$")
requestChannelLogger.setLevel("WARN")
val defaultEventHandlerLogger = new Logger
defaultEventHandlerLogger.setName("kafka.producer.async.DefaultEventHandler")
defaultEventHandlerLogger.setLevel("DEBUG")
val actual = KafkaDockerWrapper.getLog4j2ConfigsFromEnv(envVars)
assertEquals(List.apply(kafkaLogger, requestChannelLogger, defaultEventHandlerLogger), actual.toList)
}
@Test
def testGetLog4jConfigsFromEnvInvalidEnvVariable(): Unit = {
val envVars = Map("SOME_VARIABLE" -> "Some Value")
val expected = ""
val actual = KafkaDockerWrapper.getLog4jConfigsFromEnv(envVars)
assertEquals(expected, actual)
val actual = KafkaDockerWrapper.getLog4j2ConfigsFromEnv(envVars)
assertTrue(actual.isEmpty)
}
@Test
def testGetLog4jConfigsFromEnvWithEmptyEnvVariable(): Unit = {
val envVars = Map("SOME_VARIABLE" -> "Some Value", "KAFKA_LOG4J_LOGGERS" -> "", "KAFKA_LOG4J_ROOT_LOGLEVEL" -> "")
val expected = ""
val actual = KafkaDockerWrapper.getLog4jConfigsFromEnv(envVars)
assertEquals(expected, actual)
val actual = KafkaDockerWrapper.getLog4j2ConfigsFromEnv(envVars)
assertTrue(actual.isEmpty)
}
@Test
@ -187,18 +191,26 @@ class KafkaDockerWrapperTest {
"SOME_VARIABLE" -> "Some Value"
)
Files.write(defaultConfigsPath.resolve("log4j.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(mountedConfigsPath.resolve("log4j.properties"), "mounted.config=mounted value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(finalConfigsPath.resolve("log4j.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(defaultConfigsPath.resolve("log4j2.yaml"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(mountedConfigsPath.resolve("log4j2.yaml"), "mounted.config=mounted value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(finalConfigsPath.resolve("log4j2.yaml"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
KafkaDockerWrapper.prepareLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
KafkaDockerWrapper.prepareLog4j2Configs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/log4j.properties")
val actual = try source.mkString finally source.close()
val expected = "mounted.config=mounted value" + "\n" + "log4j.rootLogger=ERROR, stdout" + "\n" +
"log4j.logger.kafka=INFO" + "\n" +
"log4j.logger.kafka.network.RequestChannel$=WARN" + "\n" +
"log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG"
val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/log4j2.yaml")
val actual = try source.mkString.trim finally source.close()
val expected =
"""Configuration:
| Loggers:
| Root:
| level: "ERROR"
| Logger:
| - name: "kafka"
| level: "INFO"
| - name: "kafka.network.RequestChannel$"
| level: "WARN"
| - name: "kafka.producer.async.DefaultEventHandler"
| level: "DEBUG"""".stripMargin
assertEquals(expected, actual)
}
@ -213,17 +225,47 @@ class KafkaDockerWrapperTest {
"SOME_VARIABLE" -> "Some Value"
)
Files.write(defaultConfigsPath.resolve("log4j.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(finalConfigsPath.resolve("log4j.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
val default =
"""Configuration:
| Appenders:
| Console:
| name: "Console"
| target: "SYSTEM_OUT"
| PatternLayout:
| pattern: "123"
| Loggers:
| Root:
| level: "123"
| Logger:
| - name: kafka
| level: 123""".stripMargin
KafkaDockerWrapper.prepareLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
Files.write(defaultConfigsPath.resolve("log4j2.yaml"), default.getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(finalConfigsPath.resolve("log4j2.yaml"), "".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/log4j.properties")
val actual = try source.mkString finally source.close()
val expected = "default.config=default value" + "\n" + "log4j.rootLogger=ERROR, stdout" + "\n" +
"log4j.logger.kafka=INFO" + "\n" +
"log4j.logger.kafka.network.RequestChannel$=WARN" + "\n" +
"log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG"
KafkaDockerWrapper.prepareLog4j2Configs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/log4j2.yaml")
val actual = try source.mkString.trim finally source.close()
val expected =
"""Configuration:
| Appenders:
| Console:
| name: "Console"
| target: "SYSTEM_OUT"
| PatternLayout:
| pattern: "123"
| Loggers:
| Root:
| level: "ERROR"
| Logger:
| - name: "kafka.network.RequestChannel$"
| level: "WARN"
| - name: "kafka.producer.async.DefaultEventHandler"
| level: "DEBUG"
| - name: "kafka"
| level: "INFO"""".stripMargin
assertEquals(expected, actual)
}
@ -234,41 +276,69 @@ class KafkaDockerWrapperTest {
val envVars = Map.empty[String, String]
Files.write(defaultConfigsPath.resolve("log4j.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(mountedConfigsPath.resolve("log4j.properties"), "mounted.config=mounted value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(finalConfigsPath.resolve("log4j.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
val default =
"""
|Configuration:
| Appenders:
| Console:
| name: "Console"
| target: "SYSTEM_OUT"
| PatternLayout:
| pattern: "123"
| Loggers:
| Root:
| level: "123"
| Logger:
| - name: kafka
| level: 123""".stripMargin
KafkaDockerWrapper.prepareLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
val mounted =
"""Configuration:
| Appenders:
| Console:
| name: "Console"
| target: "SYSTEM_OUT"
| PatternLayout:
| pattern: "[%d] %p %m (%c)%n"
| Loggers:
| Root:
| level: "ERROR"
| Logger:
| - name: "kafka"
| level: "DEBUG"""".stripMargin
val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/log4j.properties")
val actual = try source.mkString finally source.close()
val expected = "mounted.config=mounted value"
Files.write(defaultConfigsPath.resolve("log4j2.yaml"), default.getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(mountedConfigsPath.resolve("log4j2.yaml"), mounted.getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(finalConfigsPath.resolve("log4j2.yaml"), "".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
assertEquals(expected, actual)
KafkaDockerWrapper.prepareLog4j2Configs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/log4j2.yaml")
val actual = try source.mkString.trim finally source.close()
assertEquals(mounted, actual)
}
@Test
def testGetToolsLog4jConfigsFromEnv(): Unit = {
val envVars = Map("KAFKA_TOOLS_LOG4J_LOGLEVEL" -> "TRACE", "SOME_VARIABLE" -> "Some Value")
val expected = "\n" + "log4j.rootLogger=TRACE, stderr"
val actual = KafkaDockerWrapper.getToolsLog4jConfigsFromEnv(envVars)
assertEquals(expected, actual)
val actual = KafkaDockerWrapper.getToolsLog4j2ConfigsFromEnv(envVars)
assertTrue(actual.isDefined)
assertEquals(actual.get.getLevel, "TRACE")
}
@Test
def testGetToolsLog4jConfigsFromEnvInvalidEnvVariable(): Unit = {
val envVars = Map("SOME_VARIABLE" -> "Some Value")
val expected = ""
val actual = KafkaDockerWrapper.getToolsLog4jConfigsFromEnv(envVars)
assertEquals(expected, actual)
val actual = KafkaDockerWrapper.getToolsLog4j2ConfigsFromEnv(envVars)
assertTrue(actual.isEmpty)
}
@Test
def testGetToolsLog4jConfigsFromEnvWithEmptyEnvVariable(): Unit = {
val envVars = Map("SOME_VARIABLE" -> "Some Value", "KAFKA_TOOLS_LOG4J_LOGLEVEL" -> "")
val expected = ""
val actual = KafkaDockerWrapper.getToolsLog4jConfigsFromEnv(envVars)
assertEquals(expected, actual)
val actual = KafkaDockerWrapper.getToolsLog4j2ConfigsFromEnv(envVars)
assertTrue(actual.isEmpty)
}
@Test
@ -276,16 +346,59 @@ class KafkaDockerWrapperTest {
val (defaultConfigsPath, mountedConfigsPath, finalConfigsPath) = createDirs()
val envVars = Map("KAFKA_TOOLS_LOG4J_LOGLEVEL" -> "TRACE")
val default =
"""
|Configuration:
| Appenders:
| Console:
| name: "Console"
| target: "SYSTEM_OUT"
| PatternLayout:
| pattern: "123"
| Loggers:
| Root:
| level: "123"
| Logger:
| - name: kafka
| level: 123""".stripMargin
Files.write(defaultConfigsPath.resolve("tools-log4j.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(mountedConfigsPath.resolve("tools-log4j.properties"), "mounted.config=mounted value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(finalConfigsPath.resolve("tools-log4j.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
val mounted =
"""Configuration:
| Appenders:
| Console:
| name: "Console"
| target: "SYSTEM_OUT"
| PatternLayout:
| pattern: "[%d] %p %m (%c)%n"
| Loggers:
| Root:
| level: "ERROR"
| Logger:
| - name: "kafka"
| level: "DEBUG"""".stripMargin
KafkaDockerWrapper.prepareToolsLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
Files.write(defaultConfigsPath.resolve("tools-log4j2.yaml"), default.getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(mountedConfigsPath.resolve("tools-log4j2.yaml"), mounted.getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(finalConfigsPath.resolve("tools-log4j2.yaml"), "".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/tools-log4j.properties")
val actual = try source.mkString finally source.close()
val expected = "mounted.config=mounted value" + "\n" + "log4j.rootLogger=TRACE, stderr"
KafkaDockerWrapper.prepareToolsLog4j2Configs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/tools-log4j2.yaml")
val actual = try source.mkString.trim finally source.close()
val expected =
"""Configuration:
| Appenders:
| Console:
| name: "Console"
| target: "SYSTEM_OUT"
| PatternLayout:
| pattern: "[%d] %p %m (%c)%n"
| Loggers:
| Root:
| level: "TRACE"
| Logger:
| - name: "kafka"
| level: "DEBUG"""".stripMargin
assertEquals(expected, actual)
}
@ -296,14 +409,18 @@ class KafkaDockerWrapperTest {
val envVars = Map("KAFKA_TOOLS_LOG4J_LOGLEVEL" -> "TRACE")
Files.write(defaultConfigsPath.resolve("tools-log4j.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(finalConfigsPath.resolve("tools-log4j.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(defaultConfigsPath.resolve("tools-log4j2.yaml"), "".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(finalConfigsPath.resolve("tools-log4j2.yaml"), "".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
KafkaDockerWrapper.prepareToolsLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
KafkaDockerWrapper.prepareToolsLog4j2Configs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/tools-log4j.properties")
val actual = try source.mkString finally source.close()
val expected = "default.config=default value" + "\n" + "log4j.rootLogger=TRACE, stderr"
val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/tools-log4j2.yaml")
val actual = try source.mkString.trim finally source.close()
val expected =
"""Configuration:
| Loggers:
| Root:
| level: "TRACE"""".stripMargin
assertEquals(expected, actual)
}
@ -313,18 +430,47 @@ class KafkaDockerWrapperTest {
val (defaultConfigsPath, mountedConfigsPath, finalConfigsPath) = createDirs()
val envVars = Map.empty[String, String]
val default =
"""
|Configuration:
| Appenders:
| Console:
| name: "Console"
| target: "SYSTEM_OUT"
| PatternLayout:
| pattern: "123"
| Loggers:
| Root:
| level: "123"
| Logger:
| - name: kafka
| level: 123""".stripMargin
Files.write(defaultConfigsPath.resolve("tools-log4j.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(mountedConfigsPath.resolve("tools-log4j.properties"), "mounted.config=mounted value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(finalConfigsPath.resolve("tools-log4j.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
val mounted =
"""Configuration:
| Appenders:
| Console:
| name: "Console"
| target: "SYSTEM_OUT"
| PatternLayout:
| pattern: "[%d] %p %m (%c)%n"
| Loggers:
| Root:
| level: "ERROR"
| Logger:
| - name: "kafka"
| level: "DEBUG"""".stripMargin
KafkaDockerWrapper.prepareToolsLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
Files.write(defaultConfigsPath.resolve("tools-log4j2.yaml"), default.getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(mountedConfigsPath.resolve("tools-log4j2.yaml"), mounted.getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
Files.write(finalConfigsPath.resolve("tools-log4j2.yaml"), "".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit()
val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/tools-log4j.properties")
val actual = try source.mkString finally source.close()
val expected = "mounted.config=mounted value"
KafkaDockerWrapper.prepareToolsLog4j2Configs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
assertEquals(expected, actual)
val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/tools-log4j2.yaml")
val actual = try source.mkString.trim finally source.close()
assertEquals(mounted, actual)
}
private def createDirs(): (Path, Path, Path) = {

View File

@ -22,7 +22,6 @@ services:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENERS: 'CONTROLLER://:9093'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
@ -38,7 +37,6 @@ services:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENERS: 'CONTROLLER://:9093'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
@ -54,7 +52,6 @@ services:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENERS: 'CONTROLLER://:9093'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'

View File

@ -22,7 +22,6 @@ services:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:29092,2@controller-2:29092,3@controller-3:29092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENERS: 'CONTROLLER://:29092'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
@ -38,7 +37,6 @@ services:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:29092,2@controller-2:29092,3@controller-3:29092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENERS: 'CONTROLLER://:29092'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
@ -54,7 +52,6 @@ services:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:29092,2@controller-2:29092,3@controller-3:29092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENERS: 'CONTROLLER://:29092'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'

View File

@ -30,7 +30,6 @@ services:
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller1:19092,2@controller2:19092,3@controller3:19092'
KAFKA_LISTENERS: 'CONTROLLER://:19092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'CONTROLLER'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
@ -49,7 +48,6 @@ services:
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller1:19092,2@controller2:19092,3@controller3:19092'
KAFKA_LISTENERS: 'CONTROLLER://:19092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'CONTROLLER'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
@ -68,7 +66,6 @@ services:
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller1:19092,2@controller2:19092,3@controller3:19092'
KAFKA_LISTENERS: 'CONTROLLER://:19092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'CONTROLLER'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'