KAFKA-8779 Fix flaky tests introduced by dynamic log levels (#17382)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Kuan-Po Tseng 2024-10-24 21:39:35 +08:00 committed by GitHub
parent 553e6b4c6d
commit 140d35c545
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 88 additions and 106 deletions

View File

@ -114,6 +114,7 @@
<allow pkg="org.apache.kafka.clients.producer"/> <allow pkg="org.apache.kafka.clients.producer"/>
<allow pkg="org.apache.kafka.coordinator.group"/> <allow pkg="org.apache.kafka.coordinator.group"/>
<allow pkg="org.apache.kafka.coordinator.transaction"/> <allow pkg="org.apache.kafka.coordinator.transaction"/>
<allow pkg="org.apache.log4j" />
<subpackage name="server"> <subpackage name="server">
<allow pkg="kafka.test" /> <allow pkg="kafka.test" />
</subpackage> </subpackage>

View File

@ -41,7 +41,7 @@ import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig, SslConfig
import org.apache.kafka.common.errors._ import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter} import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter}
import org.apache.kafka.common.requests.{DeleteRecordsRequest} import org.apache.kafka.common.requests.DeleteRecordsRequest
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType} import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.{Time, Utils}
@ -53,8 +53,9 @@ import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig} import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS import org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS
import org.apache.log4j.PropertyConfigurator
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo, Timeout} import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource, ValueSource} import org.junit.jupiter.params.provider.{Arguments, MethodSource, ValueSource}
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
@ -81,7 +82,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val topicPartition = new TopicPartition(topic, partition) val topicPartition = new TopicPartition(topic, partition)
private var brokerLoggerConfigResource: ConfigResource = _ private var brokerLoggerConfigResource: ConfigResource = _
private val changedBrokerLoggers = scala.collection.mutable.Set[String]()
@BeforeEach @BeforeEach
override def setUp(testInfo: TestInfo): Unit = { override def setUp(testInfo: TestInfo): Unit = {
@ -92,7 +92,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
@AfterEach @AfterEach
override def tearDown(): Unit = { override def tearDown(): Unit = {
teardownBrokerLoggers() // Due to the fact that log4j is not re-initialized across tests, changing a logger's log level persists
// across test classes. We need to clean up the changes done after testing.
resetLogging()
super.tearDown() super.tearDown()
} }
@ -128,7 +130,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
try { try {
val alterLogLevelsEntries = Seq( val alterLogLevelsEntries = Seq(
new ConfigEntry("kafka.controller.KafkaController", LogLevelConfig.INFO_LOG_LEVEL) new ConfigEntry("kafka.server.ControllerServer", LogLevelConfig.INFO_LOG_LEVEL)
).asJavaCollection ).asJavaCollection
val exception = assertThrows(classOf[ExecutionException], () => { val exception = assertThrows(classOf[ExecutionException], () => {
@ -3071,127 +3073,122 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
LoggerFactory.getLogger("kafka.cluster.Replica").trace("Message to create the logger") LoggerFactory.getLogger("kafka.cluster.Replica").trace("Message to create the logger")
val loggerConfig = describeBrokerLoggers() val loggerConfig = describeBrokerLoggers()
val kafkaLogLevel = loggerConfig.get("kafka").value() val kafkaLogLevel = loggerConfig.get("kafka").value()
val logCleanerLogLevelConfig = loggerConfig.get("kafka.cluster.Replica") val clusterReplicaLogLevel = loggerConfig.get("kafka.cluster.Replica")
// we expect the log level to be inherited from the first ancestor with a level configured // we expect the log level to be inherited from the first ancestor with a level configured
assertEquals(kafkaLogLevel, logCleanerLogLevelConfig.value()) assertEquals(kafkaLogLevel, clusterReplicaLogLevel.value())
assertEquals("kafka.cluster.Replica", logCleanerLogLevelConfig.name()) assertEquals("kafka.cluster.Replica", clusterReplicaLogLevel.name())
assertEquals(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG, logCleanerLogLevelConfig.source()) assertEquals(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG, clusterReplicaLogLevel.source())
assertEquals(false, logCleanerLogLevelConfig.isReadOnly) assertEquals(false, clusterReplicaLogLevel.isReadOnly)
assertEquals(false, logCleanerLogLevelConfig.isSensitive) assertEquals(false, clusterReplicaLogLevel.isSensitive)
assertTrue(logCleanerLogLevelConfig.synonyms().isEmpty) assertTrue(clusterReplicaLogLevel.synonyms().isEmpty)
} }
@ParameterizedTest @ParameterizedTest
@ValueSource(strings = Array("kraft")) @ValueSource(strings = Array("kraft"))
@Disabled // To be re-enabled once KAFKA-8779 is resolved
def testIncrementalAlterConfigsForLog4jLogLevels(quorum: String): Unit = { def testIncrementalAlterConfigsForLog4jLogLevels(quorum: String): Unit = {
client = createAdminClient client = createAdminClient
val ancestorLogger = "kafka";
val initialLoggerConfig = describeBrokerLoggers() val initialLoggerConfig = describeBrokerLoggers()
val initialRootLogLevel = initialLoggerConfig.get(Log4jController.ROOT_LOGGER).value() val initialAncestorLogLevel = initialLoggerConfig.get("kafka").value()
assertEquals(initialRootLogLevel, initialLoggerConfig.get("kafka.controller.KafkaController").value()) val initialControllerServerLogLevel = initialLoggerConfig.get("kafka.server.ControllerServer").value()
assertEquals(initialRootLogLevel, initialLoggerConfig.get("kafka.log.LogCleaner").value()) val initialLogCleanerLogLevel = initialLoggerConfig.get("kafka.log.LogCleaner").value()
assertEquals(initialRootLogLevel, initialLoggerConfig.get("kafka.server.ReplicaManager").value()) val initialReplicaManagerLogLevel = initialLoggerConfig.get("kafka.server.ReplicaManager").value()
val newRootLogLevel = LogLevelConfig.DEBUG_LOG_LEVEL val newAncestorLogLevel = LogLevelConfig.DEBUG_LOG_LEVEL
val alterRootLoggerEntry = Seq( val alterAncestorLoggerEntry = Seq(
new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, newRootLogLevel), AlterConfigOp.OpType.SET) new AlterConfigOp(new ConfigEntry(ancestorLogger, newAncestorLogLevel), AlterConfigOp.OpType.SET)
).asJavaCollection ).asJavaCollection
// Test validateOnly does not change anything // Test validateOnly does not change anything
alterBrokerLoggers(alterRootLoggerEntry, validateOnly = true) alterBrokerLoggers(alterAncestorLoggerEntry, validateOnly = true)
val validatedLoggerConfig = describeBrokerLoggers() val validatedLoggerConfig = describeBrokerLoggers()
assertEquals(initialRootLogLevel, validatedLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) assertEquals(initialAncestorLogLevel, validatedLoggerConfig.get(ancestorLogger).value())
assertEquals(initialRootLogLevel, validatedLoggerConfig.get("kafka.controller.KafkaController").value()) assertEquals(initialControllerServerLogLevel, validatedLoggerConfig.get("kafka.server.ControllerServer").value())
assertEquals(initialRootLogLevel, validatedLoggerConfig.get("kafka.log.LogCleaner").value()) assertEquals(initialLogCleanerLogLevel, validatedLoggerConfig.get("kafka.log.LogCleaner").value())
assertEquals(initialRootLogLevel, validatedLoggerConfig.get("kafka.server.ReplicaManager").value()) assertEquals(initialReplicaManagerLogLevel, validatedLoggerConfig.get("kafka.server.ReplicaManager").value())
assertEquals(initialRootLogLevel, validatedLoggerConfig.get("kafka.zookeeper.ZooKeeperClient").value())
// test that we can change them and unset loggers still use the root's log level // test that we can change them and unset loggers still use the root's log level
alterBrokerLoggers(alterRootLoggerEntry) alterBrokerLoggers(alterAncestorLoggerEntry)
val changedRootLoggerConfig = describeBrokerLoggers() val changedAncestorLoggerConfig = describeBrokerLoggers()
assertEquals(newRootLogLevel, changedRootLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) assertEquals(newAncestorLogLevel, changedAncestorLoggerConfig.get(ancestorLogger).value())
assertEquals(newRootLogLevel, changedRootLoggerConfig.get("kafka.controller.KafkaController").value()) assertEquals(newAncestorLogLevel, changedAncestorLoggerConfig.get("kafka.server.ControllerServer").value())
assertEquals(newRootLogLevel, changedRootLoggerConfig.get("kafka.log.LogCleaner").value()) assertEquals(newAncestorLogLevel, changedAncestorLoggerConfig.get("kafka.log.LogCleaner").value())
assertEquals(newRootLogLevel, changedRootLoggerConfig.get("kafka.server.ReplicaManager").value()) assertEquals(newAncestorLogLevel, changedAncestorLoggerConfig.get("kafka.server.ReplicaManager").value())
assertEquals(newRootLogLevel, changedRootLoggerConfig.get("kafka.zookeeper.ZooKeeperClient").value())
// alter the ZK client's logger so we can later test resetting it // alter the LogCleaner's logger so we can later test resetting it
val alterZKLoggerEntry = Seq( val alterLogCleanerLoggerEntry = Seq(
new AlterConfigOp(new ConfigEntry("kafka.zookeeper.ZooKeeperClient", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SET) new AlterConfigOp(new ConfigEntry("kafka.log.LogCleaner", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SET)
).asJavaCollection ).asJavaCollection
alterBrokerLoggers(alterZKLoggerEntry) alterBrokerLoggers(alterLogCleanerLoggerEntry)
val changedZKLoggerConfig = describeBrokerLoggers() val changedZKLoggerConfig = describeBrokerLoggers()
assertEquals(LogLevelConfig.ERROR_LOG_LEVEL, changedZKLoggerConfig.get("kafka.zookeeper.ZooKeeperClient").value()) assertEquals(LogLevelConfig.ERROR_LOG_LEVEL, changedZKLoggerConfig.get("kafka.log.LogCleaner").value())
// properly test various set operations and one delete // properly test various set operations and one delete
val alterLogLevelsEntries = Seq( val alterLogLevelsEntries = Seq(
new AlterConfigOp(new ConfigEntry("kafka.controller.KafkaController", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("kafka.server.ControllerServer", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("kafka.log.LogCleaner", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("kafka.log.LogCleaner", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("kafka.server.ReplicaManager", LogLevelConfig.TRACE_LOG_LEVEL), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("kafka.server.ReplicaManager", LogLevelConfig.TRACE_LOG_LEVEL), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("kafka.zookeeper.ZooKeeperClient", ""), AlterConfigOp.OpType.DELETE) // should reset to the root logger level
).asJavaCollection ).asJavaCollection
alterBrokerLoggers(alterLogLevelsEntries) alterBrokerLoggers(alterLogLevelsEntries)
val alteredLoggerConfig = describeBrokerLoggers() val alteredLoggerConfig = describeBrokerLoggers()
assertEquals(newRootLogLevel, alteredLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) assertEquals(newAncestorLogLevel, alteredLoggerConfig.get(ancestorLogger).value())
assertEquals(LogLevelConfig.INFO_LOG_LEVEL, alteredLoggerConfig.get("kafka.controller.KafkaController").value()) assertEquals(LogLevelConfig.INFO_LOG_LEVEL, alteredLoggerConfig.get("kafka.server.ControllerServer").value())
assertEquals(LogLevelConfig.ERROR_LOG_LEVEL, alteredLoggerConfig.get("kafka.log.LogCleaner").value()) assertEquals(LogLevelConfig.ERROR_LOG_LEVEL, alteredLoggerConfig.get("kafka.log.LogCleaner").value())
assertEquals(LogLevelConfig.TRACE_LOG_LEVEL, alteredLoggerConfig.get("kafka.server.ReplicaManager").value()) assertEquals(LogLevelConfig.TRACE_LOG_LEVEL, alteredLoggerConfig.get("kafka.server.ReplicaManager").value())
assertEquals(newRootLogLevel, alteredLoggerConfig.get("kafka.zookeeper.ZooKeeperClient").value())
} }
/** /**
* 1. Assume ROOT logger == TRACE * 1. Assume kafka logger == TRACE
* 2. Change kafka.controller.KafkaController logger to INFO * 2. Change kafka.server.ControllerServer logger to INFO
* 3. Unset kafka.controller.KafkaController via AlterConfigOp.OpType.DELETE (resets it to the root logger - TRACE) * 3. Unset kafka.server.ControllerServer via AlterConfigOp.OpType.DELETE (resets it to the kafka logger - TRACE)
* 4. Change ROOT logger to ERROR * 4. Change kafka logger to ERROR
* 5. Ensure the kafka.controller.KafkaController logger's level is ERROR (the current root logger level) * 5. Ensure the kafka.server.ControllerServer logger's level is ERROR (the current kafka logger level)
*/ */
@ParameterizedTest @ParameterizedTest
@ValueSource(strings = Array("kraft")) @ValueSource(strings = Array("kraft"))
@Disabled // To be re-enabled once KAFKA-8779 is resolved
def testIncrementalAlterConfigsForLog4jLogLevelsCanResetLoggerToCurrentRoot(quorum: String): Unit = { def testIncrementalAlterConfigsForLog4jLogLevelsCanResetLoggerToCurrentRoot(quorum: String): Unit = {
client = createAdminClient client = createAdminClient
// step 1 - configure root logger val ancestorLogger = "kafka"
val initialRootLogLevel = LogLevelConfig.TRACE_LOG_LEVEL // step 1 - configure kafka logger
val alterRootLoggerEntry = Seq( val initialAncestorLogLevel = LogLevelConfig.TRACE_LOG_LEVEL
new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, initialRootLogLevel), AlterConfigOp.OpType.SET) val alterAncestorLoggerEntry = Seq(
new AlterConfigOp(new ConfigEntry(ancestorLogger, initialAncestorLogLevel), AlterConfigOp.OpType.SET)
).asJavaCollection ).asJavaCollection
alterBrokerLoggers(alterRootLoggerEntry) alterBrokerLoggers(alterAncestorLoggerEntry)
val initialLoggerConfig = describeBrokerLoggers() val initialLoggerConfig = describeBrokerLoggers()
assertEquals(initialRootLogLevel, initialLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) assertEquals(initialAncestorLogLevel, initialLoggerConfig.get(ancestorLogger).value())
assertEquals(initialRootLogLevel, initialLoggerConfig.get("kafka.controller.KafkaController").value()) assertEquals(initialAncestorLogLevel, initialLoggerConfig.get("kafka.server.ControllerServer").value())
// step 2 - change KafkaController logger to INFO // step 2 - change ControllerServer logger to INFO
val alterControllerLoggerEntry = Seq( val alterControllerLoggerEntry = Seq(
new AlterConfigOp(new ConfigEntry("kafka.controller.KafkaController", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET) new AlterConfigOp(new ConfigEntry("kafka.server.ControllerServer", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET)
).asJavaCollection ).asJavaCollection
alterBrokerLoggers(alterControllerLoggerEntry) alterBrokerLoggers(alterControllerLoggerEntry)
val changedControllerLoggerConfig = describeBrokerLoggers() val changedControllerLoggerConfig = describeBrokerLoggers()
assertEquals(initialRootLogLevel, changedControllerLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) assertEquals(initialAncestorLogLevel, changedControllerLoggerConfig.get(ancestorLogger).value())
assertEquals(LogLevelConfig.INFO_LOG_LEVEL, changedControllerLoggerConfig.get("kafka.controller.KafkaController").value()) assertEquals(LogLevelConfig.INFO_LOG_LEVEL, changedControllerLoggerConfig.get("kafka.server.ControllerServer").value())
// step 3 - unset KafkaController logger // step 3 - unset ControllerServer logger
val deleteControllerLoggerEntry = Seq( val deleteControllerLoggerEntry = Seq(
new AlterConfigOp(new ConfigEntry("kafka.controller.KafkaController", ""), AlterConfigOp.OpType.DELETE) new AlterConfigOp(new ConfigEntry("kafka.server.ControllerServer", ""), AlterConfigOp.OpType.DELETE)
).asJavaCollection ).asJavaCollection
alterBrokerLoggers(deleteControllerLoggerEntry) alterBrokerLoggers(deleteControllerLoggerEntry)
val deletedControllerLoggerConfig = describeBrokerLoggers() val deletedControllerLoggerConfig = describeBrokerLoggers()
assertEquals(initialRootLogLevel, deletedControllerLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) assertEquals(initialAncestorLogLevel, deletedControllerLoggerConfig.get(ancestorLogger).value())
assertEquals(initialRootLogLevel, deletedControllerLoggerConfig.get("kafka.controller.KafkaController").value()) assertEquals(initialAncestorLogLevel, deletedControllerLoggerConfig.get("kafka.server.ControllerServer").value())
val newRootLogLevel = LogLevelConfig.ERROR_LOG_LEVEL val newAncestorLogLevel = LogLevelConfig.ERROR_LOG_LEVEL
val newAlterRootLoggerEntry = Seq( val newAlterAncestorLoggerEntry = Seq(
new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, newRootLogLevel), AlterConfigOp.OpType.SET) new AlterConfigOp(new ConfigEntry(ancestorLogger, newAncestorLogLevel), AlterConfigOp.OpType.SET)
).asJavaCollection ).asJavaCollection
alterBrokerLoggers(newAlterRootLoggerEntry) alterBrokerLoggers(newAlterAncestorLoggerEntry)
val newRootLoggerConfig = describeBrokerLoggers() val newAncestorLoggerConfig = describeBrokerLoggers()
assertEquals(newRootLogLevel, newRootLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) assertEquals(newAncestorLogLevel, newAncestorLoggerConfig.get(ancestorLogger).value())
assertEquals(newRootLogLevel, newRootLoggerConfig.get("kafka.controller.KafkaController").value()) assertEquals(newAncestorLogLevel, newAncestorLoggerConfig.get("kafka.server.ControllerServer").value())
} }
@ParameterizedTest @ParameterizedTest
@ValueSource(strings = Array("kraft")) @ValueSource(strings = Array("kraft"))
@Disabled // to be re-enabled once KAFKA-8779 is resolved
def testIncrementalAlterConfigsForLog4jLogLevelsCannotResetRootLogger(quorum: String): Unit = { def testIncrementalAlterConfigsForLog4jLogLevelsCannotResetRootLogger(quorum: String): Unit = {
client = createAdminClient client = createAdminClient
val deleteRootLoggerEntry = Seq( val deleteRootLoggerEntry = Seq(
@ -3203,7 +3200,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
@ParameterizedTest @ParameterizedTest
@ValueSource(strings = Array("kraft")) @ValueSource(strings = Array("kraft"))
@Disabled // To be re-enabled once KAFKA-8779 is resolved
def testIncrementalAlterConfigsForLog4jLogLevelsDoesNotWorkWithInvalidConfigs(quorum: String): Unit = { def testIncrementalAlterConfigsForLog4jLogLevelsDoesNotWorkWithInvalidConfigs(quorum: String): Unit = {
client = createAdminClient client = createAdminClient
val validLoggerName = "kafka.server.KafkaRequestHandler" val validLoggerName = "kafka.server.KafkaRequestHandler"
@ -3216,29 +3212,28 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid
new AlterConfigOp(new ConfigEntry("kafka.network.SocketServer", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.APPEND) // append is not supported new AlterConfigOp(new ConfigEntry("kafka.network.SocketServer", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.APPEND) // append is not supported
).asJavaCollection ).asJavaCollection
assertTrue(assertThrows(classOf[ExecutionException], assertInstanceOf(classOf[InvalidRequestException], assertThrows(classOf[ExecutionException], () => alterBrokerLoggers(appendLogLevelEntries)).getCause)
() => alterBrokerLoggers(appendLogLevelEntries)).getCause.isInstanceOf[InvalidRequestException])
assertLogLevelDidNotChange() assertLogLevelDidNotChange()
val subtractLogLevelEntries = Seq( val subtractLogLevelEntries = Seq(
new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid
new AlterConfigOp(new ConfigEntry("kafka.network.SocketServer", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SUBTRACT) // subtract is not supported new AlterConfigOp(new ConfigEntry("kafka.network.SocketServer", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SUBTRACT) // subtract is not supported
).asJavaCollection ).asJavaCollection
assertTrue(assertThrows(classOf[ExecutionException], () => alterBrokerLoggers(subtractLogLevelEntries)).getCause.isInstanceOf[InvalidRequestException]) assertInstanceOf(classOf[InvalidRequestException], assertThrows(classOf[ExecutionException], () => alterBrokerLoggers(subtractLogLevelEntries)).getCause)
assertLogLevelDidNotChange() assertLogLevelDidNotChange()
val invalidLogLevelLogLevelEntries = Seq( val invalidLogLevelLogLevelEntries = Seq(
new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid
new AlterConfigOp(new ConfigEntry("kafka.network.SocketServer", "OFF"), AlterConfigOp.OpType.SET) // OFF is not a valid log level new AlterConfigOp(new ConfigEntry("kafka.network.SocketServer", "OFF"), AlterConfigOp.OpType.SET) // OFF is not a valid log level
).asJavaCollection ).asJavaCollection
assertTrue(assertThrows(classOf[ExecutionException], () => alterBrokerLoggers(invalidLogLevelLogLevelEntries)).getCause.isInstanceOf[InvalidRequestException]) assertInstanceOf(classOf[InvalidConfigurationException], assertThrows(classOf[ExecutionException], () => alterBrokerLoggers(invalidLogLevelLogLevelEntries)).getCause)
assertLogLevelDidNotChange() assertLogLevelDidNotChange()
val invalidLoggerNameLogLevelEntries = Seq( val invalidLoggerNameLogLevelEntries = Seq(
new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid
new AlterConfigOp(new ConfigEntry("Some Other LogCleaner", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SET) // invalid logger name is not supported new AlterConfigOp(new ConfigEntry("Some Other LogCleaner", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SET) // invalid logger name is not supported
).asJavaCollection ).asJavaCollection
assertTrue(assertThrows(classOf[ExecutionException], () => alterBrokerLoggers(invalidLoggerNameLogLevelEntries)).getCause.isInstanceOf[InvalidRequestException]) assertInstanceOf(classOf[InvalidConfigurationException], assertThrows(classOf[ExecutionException], () => alterBrokerLoggers(invalidLoggerNameLogLevelEntries)).getCause)
assertLogLevelDidNotChange() assertLogLevelDidNotChange()
} }
@ -3252,18 +3247,13 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
client = createAdminClient client = createAdminClient
val alterLogLevelsEntries = Seq( val alterLogLevelsEntries = Seq(
new ConfigEntry("kafka.controller.KafkaController", LogLevelConfig.INFO_LOG_LEVEL) new ConfigEntry("kafka.server.ControllerServer", LogLevelConfig.INFO_LOG_LEVEL)
).asJavaCollection ).asJavaCollection
val alterResult = client.alterConfigs(Map(brokerLoggerConfigResource -> new Config(alterLogLevelsEntries)).asJava) val alterResult = client.alterConfigs(Map(brokerLoggerConfigResource -> new Config(alterLogLevelsEntries)).asJava)
assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(brokerLoggerConfigResource).get).getCause.isInstanceOf[InvalidRequestException]) assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(brokerLoggerConfigResource).get).getCause.isInstanceOf[InvalidRequestException])
} }
def alterBrokerLoggers(entries: util.Collection[AlterConfigOp], validateOnly: Boolean = false): Unit = { def alterBrokerLoggers(entries: util.Collection[AlterConfigOp], validateOnly: Boolean = false): Unit = {
if (!validateOnly) {
for (entry <- entries.asScala)
changedBrokerLoggers.add(entry.configEntry().name())
}
client.incrementalAlterConfigs(Map(brokerLoggerConfigResource -> entries).asJava, new AlterConfigsOptions().validateOnly(validateOnly)) client.incrementalAlterConfigs(Map(brokerLoggerConfigResource -> entries).asJava, new AlterConfigsOptions().validateOnly(validateOnly))
.values.get(brokerLoggerConfigResource).get() .values.get(brokerLoggerConfigResource).get()
} }
@ -3271,28 +3261,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
def describeBrokerLoggers(): Config = def describeBrokerLoggers(): Config =
client.describeConfigs(Collections.singletonList(brokerLoggerConfigResource)).values.get(brokerLoggerConfigResource).get() client.describeConfigs(Collections.singletonList(brokerLoggerConfigResource)).values.get(brokerLoggerConfigResource).get()
/**
* Due to the fact that log4j is not re-initialized across tests, changing a logger's log level persists across test classes.
* We need to clean up the changes done while testing.
*/
private def teardownBrokerLoggers(): Unit = {
if (changedBrokerLoggers.nonEmpty) {
val validLoggers = describeBrokerLoggers().entries().asScala.filterNot(_.name.equals(Log4jController.ROOT_LOGGER)).map(_.name).toSet
val unsetBrokerLoggersEntries = changedBrokerLoggers
.intersect(validLoggers)
.map { logger => new AlterConfigOp(new ConfigEntry(logger, ""), AlterConfigOp.OpType.DELETE) }
.asJavaCollection
// ensure that we first reset the root logger to an arbitrary log level. Note that we cannot reset it to its original value
alterBrokerLoggers(List(
new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, LogLevelConfig.FATAL_LOG_LEVEL), AlterConfigOp.OpType.SET)
).asJavaCollection)
alterBrokerLoggers(unsetBrokerLoggersEntries)
changedBrokerLoggers.clear()
}
}
@ParameterizedTest @ParameterizedTest
@ValueSource(strings = Array("kraft")) @ValueSource(strings = Array("kraft"))
def testAppendConfigToEmptyDefaultValue(ignored: String): Unit = { def testAppendConfigToEmptyDefaultValue(ignored: String): Unit = {
@ -3607,4 +3575,17 @@ object PlaintextAdminIntegrationTest {
Arguments.of("kraft", "consumer") Arguments.of("kraft", "consumer")
)) ))
} }
/**
* Resets the logging configuration after the test.
*/
def resetLogging(): Unit = {
org.apache.log4j.LogManager.resetConfiguration()
val stream = this.getClass.getResourceAsStream("/log4j.properties")
try {
PropertyConfigurator.configure(stream)
} finally {
stream.close()
}
}
} }