From 140d35c5459f4c7a91b23a238467cb5aa01b59fb Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Thu, 24 Oct 2024 21:39:35 +0800 Subject: [PATCH] KAFKA-8779 Fix flaky tests introduced by dynamic log levels (#17382) Reviewers: Chia-Ping Tsai --- checkstyle/import-control-core.xml | 1 + .../api/PlaintextAdminIntegrationTest.scala | 193 ++++++++---------- 2 files changed, 88 insertions(+), 106 deletions(-) diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index a2d46bc8c1c..b56e4c09670 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -114,6 +114,7 @@ + diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 7c429283179..d563886ac5b 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -41,7 +41,7 @@ import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig, SslConfig import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter} -import org.apache.kafka.common.requests.{DeleteRecordsRequest} +import org.apache.kafka.common.requests.DeleteRecordsRequest import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType} import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} import org.apache.kafka.common.utils.{Time, Utils} @@ -53,8 +53,9 @@ import org.apache.kafka.security.authorizer.AclEntry import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, ServerLogConfigs} import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig} import org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS +import org.apache.log4j.PropertyConfigurator import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo, Timeout} +import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{Arguments, MethodSource, ValueSource} import org.slf4j.LoggerFactory @@ -81,7 +82,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val topicPartition = new TopicPartition(topic, partition) private var brokerLoggerConfigResource: ConfigResource = _ - private val changedBrokerLoggers = scala.collection.mutable.Set[String]() @BeforeEach override def setUp(testInfo: TestInfo): Unit = { @@ -92,7 +92,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @AfterEach override def tearDown(): Unit = { - teardownBrokerLoggers() + // Due to the fact that log4j is not re-initialized across tests, changing a logger's log level persists + // across test classes. We need to clean up the changes done after testing. + resetLogging() super.tearDown() } @@ -128,7 +130,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { try { val alterLogLevelsEntries = Seq( - new ConfigEntry("kafka.controller.KafkaController", LogLevelConfig.INFO_LOG_LEVEL) + new ConfigEntry("kafka.server.ControllerServer", LogLevelConfig.INFO_LOG_LEVEL) ).asJavaCollection val exception = assertThrows(classOf[ExecutionException], () => { @@ -3071,127 +3073,122 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { LoggerFactory.getLogger("kafka.cluster.Replica").trace("Message to create the logger") val loggerConfig = describeBrokerLoggers() val kafkaLogLevel = loggerConfig.get("kafka").value() - val logCleanerLogLevelConfig = loggerConfig.get("kafka.cluster.Replica") + val clusterReplicaLogLevel = loggerConfig.get("kafka.cluster.Replica") // we expect the log level to be inherited from the first ancestor with a level configured - assertEquals(kafkaLogLevel, logCleanerLogLevelConfig.value()) - assertEquals("kafka.cluster.Replica", logCleanerLogLevelConfig.name()) - assertEquals(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG, logCleanerLogLevelConfig.source()) - assertEquals(false, logCleanerLogLevelConfig.isReadOnly) - assertEquals(false, logCleanerLogLevelConfig.isSensitive) - assertTrue(logCleanerLogLevelConfig.synonyms().isEmpty) + assertEquals(kafkaLogLevel, clusterReplicaLogLevel.value()) + assertEquals("kafka.cluster.Replica", clusterReplicaLogLevel.name()) + assertEquals(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG, clusterReplicaLogLevel.source()) + assertEquals(false, clusterReplicaLogLevel.isReadOnly) + assertEquals(false, clusterReplicaLogLevel.isSensitive) + assertTrue(clusterReplicaLogLevel.synonyms().isEmpty) } @ParameterizedTest @ValueSource(strings = Array("kraft")) - @Disabled // To be re-enabled once KAFKA-8779 is resolved def testIncrementalAlterConfigsForLog4jLogLevels(quorum: String): Unit = { client = createAdminClient + val ancestorLogger = "kafka"; val initialLoggerConfig = describeBrokerLoggers() - val initialRootLogLevel = initialLoggerConfig.get(Log4jController.ROOT_LOGGER).value() - assertEquals(initialRootLogLevel, initialLoggerConfig.get("kafka.controller.KafkaController").value()) - assertEquals(initialRootLogLevel, initialLoggerConfig.get("kafka.log.LogCleaner").value()) - assertEquals(initialRootLogLevel, initialLoggerConfig.get("kafka.server.ReplicaManager").value()) + val initialAncestorLogLevel = initialLoggerConfig.get("kafka").value() + val initialControllerServerLogLevel = initialLoggerConfig.get("kafka.server.ControllerServer").value() + val initialLogCleanerLogLevel = initialLoggerConfig.get("kafka.log.LogCleaner").value() + val initialReplicaManagerLogLevel = initialLoggerConfig.get("kafka.server.ReplicaManager").value() - val newRootLogLevel = LogLevelConfig.DEBUG_LOG_LEVEL - val alterRootLoggerEntry = Seq( - new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, newRootLogLevel), AlterConfigOp.OpType.SET) + val newAncestorLogLevel = LogLevelConfig.DEBUG_LOG_LEVEL + val alterAncestorLoggerEntry = Seq( + new AlterConfigOp(new ConfigEntry(ancestorLogger, newAncestorLogLevel), AlterConfigOp.OpType.SET) ).asJavaCollection // Test validateOnly does not change anything - alterBrokerLoggers(alterRootLoggerEntry, validateOnly = true) + alterBrokerLoggers(alterAncestorLoggerEntry, validateOnly = true) val validatedLoggerConfig = describeBrokerLoggers() - assertEquals(initialRootLogLevel, validatedLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) - assertEquals(initialRootLogLevel, validatedLoggerConfig.get("kafka.controller.KafkaController").value()) - assertEquals(initialRootLogLevel, validatedLoggerConfig.get("kafka.log.LogCleaner").value()) - assertEquals(initialRootLogLevel, validatedLoggerConfig.get("kafka.server.ReplicaManager").value()) - assertEquals(initialRootLogLevel, validatedLoggerConfig.get("kafka.zookeeper.ZooKeeperClient").value()) + assertEquals(initialAncestorLogLevel, validatedLoggerConfig.get(ancestorLogger).value()) + assertEquals(initialControllerServerLogLevel, validatedLoggerConfig.get("kafka.server.ControllerServer").value()) + assertEquals(initialLogCleanerLogLevel, validatedLoggerConfig.get("kafka.log.LogCleaner").value()) + assertEquals(initialReplicaManagerLogLevel, validatedLoggerConfig.get("kafka.server.ReplicaManager").value()) // test that we can change them and unset loggers still use the root's log level - alterBrokerLoggers(alterRootLoggerEntry) - val changedRootLoggerConfig = describeBrokerLoggers() - assertEquals(newRootLogLevel, changedRootLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) - assertEquals(newRootLogLevel, changedRootLoggerConfig.get("kafka.controller.KafkaController").value()) - assertEquals(newRootLogLevel, changedRootLoggerConfig.get("kafka.log.LogCleaner").value()) - assertEquals(newRootLogLevel, changedRootLoggerConfig.get("kafka.server.ReplicaManager").value()) - assertEquals(newRootLogLevel, changedRootLoggerConfig.get("kafka.zookeeper.ZooKeeperClient").value()) + alterBrokerLoggers(alterAncestorLoggerEntry) + val changedAncestorLoggerConfig = describeBrokerLoggers() + assertEquals(newAncestorLogLevel, changedAncestorLoggerConfig.get(ancestorLogger).value()) + assertEquals(newAncestorLogLevel, changedAncestorLoggerConfig.get("kafka.server.ControllerServer").value()) + assertEquals(newAncestorLogLevel, changedAncestorLoggerConfig.get("kafka.log.LogCleaner").value()) + assertEquals(newAncestorLogLevel, changedAncestorLoggerConfig.get("kafka.server.ReplicaManager").value()) - // alter the ZK client's logger so we can later test resetting it - val alterZKLoggerEntry = Seq( - new AlterConfigOp(new ConfigEntry("kafka.zookeeper.ZooKeeperClient", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SET) + // alter the LogCleaner's logger so we can later test resetting it + val alterLogCleanerLoggerEntry = Seq( + new AlterConfigOp(new ConfigEntry("kafka.log.LogCleaner", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SET) ).asJavaCollection - alterBrokerLoggers(alterZKLoggerEntry) + alterBrokerLoggers(alterLogCleanerLoggerEntry) val changedZKLoggerConfig = describeBrokerLoggers() - assertEquals(LogLevelConfig.ERROR_LOG_LEVEL, changedZKLoggerConfig.get("kafka.zookeeper.ZooKeeperClient").value()) + assertEquals(LogLevelConfig.ERROR_LOG_LEVEL, changedZKLoggerConfig.get("kafka.log.LogCleaner").value()) // properly test various set operations and one delete val alterLogLevelsEntries = Seq( - new AlterConfigOp(new ConfigEntry("kafka.controller.KafkaController", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry("kafka.server.ControllerServer", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("kafka.log.LogCleaner", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("kafka.server.ReplicaManager", LogLevelConfig.TRACE_LOG_LEVEL), AlterConfigOp.OpType.SET), - new AlterConfigOp(new ConfigEntry("kafka.zookeeper.ZooKeeperClient", ""), AlterConfigOp.OpType.DELETE) // should reset to the root logger level ).asJavaCollection alterBrokerLoggers(alterLogLevelsEntries) val alteredLoggerConfig = describeBrokerLoggers() - assertEquals(newRootLogLevel, alteredLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) - assertEquals(LogLevelConfig.INFO_LOG_LEVEL, alteredLoggerConfig.get("kafka.controller.KafkaController").value()) + assertEquals(newAncestorLogLevel, alteredLoggerConfig.get(ancestorLogger).value()) + assertEquals(LogLevelConfig.INFO_LOG_LEVEL, alteredLoggerConfig.get("kafka.server.ControllerServer").value()) assertEquals(LogLevelConfig.ERROR_LOG_LEVEL, alteredLoggerConfig.get("kafka.log.LogCleaner").value()) assertEquals(LogLevelConfig.TRACE_LOG_LEVEL, alteredLoggerConfig.get("kafka.server.ReplicaManager").value()) - assertEquals(newRootLogLevel, alteredLoggerConfig.get("kafka.zookeeper.ZooKeeperClient").value()) } /** - * 1. Assume ROOT logger == TRACE - * 2. Change kafka.controller.KafkaController logger to INFO - * 3. Unset kafka.controller.KafkaController via AlterConfigOp.OpType.DELETE (resets it to the root logger - TRACE) - * 4. Change ROOT logger to ERROR - * 5. Ensure the kafka.controller.KafkaController logger's level is ERROR (the current root logger level) + * 1. Assume kafka logger == TRACE + * 2. Change kafka.server.ControllerServer logger to INFO + * 3. Unset kafka.server.ControllerServer via AlterConfigOp.OpType.DELETE (resets it to the kafka logger - TRACE) + * 4. Change kafka logger to ERROR + * 5. Ensure the kafka.server.ControllerServer logger's level is ERROR (the current kafka logger level) */ @ParameterizedTest @ValueSource(strings = Array("kraft")) - @Disabled // To be re-enabled once KAFKA-8779 is resolved def testIncrementalAlterConfigsForLog4jLogLevelsCanResetLoggerToCurrentRoot(quorum: String): Unit = { client = createAdminClient - // step 1 - configure root logger - val initialRootLogLevel = LogLevelConfig.TRACE_LOG_LEVEL - val alterRootLoggerEntry = Seq( - new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, initialRootLogLevel), AlterConfigOp.OpType.SET) + val ancestorLogger = "kafka" + // step 1 - configure kafka logger + val initialAncestorLogLevel = LogLevelConfig.TRACE_LOG_LEVEL + val alterAncestorLoggerEntry = Seq( + new AlterConfigOp(new ConfigEntry(ancestorLogger, initialAncestorLogLevel), AlterConfigOp.OpType.SET) ).asJavaCollection - alterBrokerLoggers(alterRootLoggerEntry) + alterBrokerLoggers(alterAncestorLoggerEntry) val initialLoggerConfig = describeBrokerLoggers() - assertEquals(initialRootLogLevel, initialLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) - assertEquals(initialRootLogLevel, initialLoggerConfig.get("kafka.controller.KafkaController").value()) + assertEquals(initialAncestorLogLevel, initialLoggerConfig.get(ancestorLogger).value()) + assertEquals(initialAncestorLogLevel, initialLoggerConfig.get("kafka.server.ControllerServer").value()) - // step 2 - change KafkaController logger to INFO + // step 2 - change ControllerServer logger to INFO val alterControllerLoggerEntry = Seq( - new AlterConfigOp(new ConfigEntry("kafka.controller.KafkaController", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET) + new AlterConfigOp(new ConfigEntry("kafka.server.ControllerServer", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET) ).asJavaCollection alterBrokerLoggers(alterControllerLoggerEntry) val changedControllerLoggerConfig = describeBrokerLoggers() - assertEquals(initialRootLogLevel, changedControllerLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) - assertEquals(LogLevelConfig.INFO_LOG_LEVEL, changedControllerLoggerConfig.get("kafka.controller.KafkaController").value()) + assertEquals(initialAncestorLogLevel, changedControllerLoggerConfig.get(ancestorLogger).value()) + assertEquals(LogLevelConfig.INFO_LOG_LEVEL, changedControllerLoggerConfig.get("kafka.server.ControllerServer").value()) - // step 3 - unset KafkaController logger + // step 3 - unset ControllerServer logger val deleteControllerLoggerEntry = Seq( - new AlterConfigOp(new ConfigEntry("kafka.controller.KafkaController", ""), AlterConfigOp.OpType.DELETE) + new AlterConfigOp(new ConfigEntry("kafka.server.ControllerServer", ""), AlterConfigOp.OpType.DELETE) ).asJavaCollection alterBrokerLoggers(deleteControllerLoggerEntry) val deletedControllerLoggerConfig = describeBrokerLoggers() - assertEquals(initialRootLogLevel, deletedControllerLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) - assertEquals(initialRootLogLevel, deletedControllerLoggerConfig.get("kafka.controller.KafkaController").value()) + assertEquals(initialAncestorLogLevel, deletedControllerLoggerConfig.get(ancestorLogger).value()) + assertEquals(initialAncestorLogLevel, deletedControllerLoggerConfig.get("kafka.server.ControllerServer").value()) - val newRootLogLevel = LogLevelConfig.ERROR_LOG_LEVEL - val newAlterRootLoggerEntry = Seq( - new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, newRootLogLevel), AlterConfigOp.OpType.SET) + val newAncestorLogLevel = LogLevelConfig.ERROR_LOG_LEVEL + val newAlterAncestorLoggerEntry = Seq( + new AlterConfigOp(new ConfigEntry(ancestorLogger, newAncestorLogLevel), AlterConfigOp.OpType.SET) ).asJavaCollection - alterBrokerLoggers(newAlterRootLoggerEntry) - val newRootLoggerConfig = describeBrokerLoggers() - assertEquals(newRootLogLevel, newRootLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) - assertEquals(newRootLogLevel, newRootLoggerConfig.get("kafka.controller.KafkaController").value()) + alterBrokerLoggers(newAlterAncestorLoggerEntry) + val newAncestorLoggerConfig = describeBrokerLoggers() + assertEquals(newAncestorLogLevel, newAncestorLoggerConfig.get(ancestorLogger).value()) + assertEquals(newAncestorLogLevel, newAncestorLoggerConfig.get("kafka.server.ControllerServer").value()) } @ParameterizedTest @ValueSource(strings = Array("kraft")) - @Disabled // to be re-enabled once KAFKA-8779 is resolved def testIncrementalAlterConfigsForLog4jLogLevelsCannotResetRootLogger(quorum: String): Unit = { client = createAdminClient val deleteRootLoggerEntry = Seq( @@ -3203,7 +3200,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @ParameterizedTest @ValueSource(strings = Array("kraft")) - @Disabled // To be re-enabled once KAFKA-8779 is resolved def testIncrementalAlterConfigsForLog4jLogLevelsDoesNotWorkWithInvalidConfigs(quorum: String): Unit = { client = createAdminClient val validLoggerName = "kafka.server.KafkaRequestHandler" @@ -3216,29 +3212,28 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid new AlterConfigOp(new ConfigEntry("kafka.network.SocketServer", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.APPEND) // append is not supported ).asJavaCollection - assertTrue(assertThrows(classOf[ExecutionException], - () => alterBrokerLoggers(appendLogLevelEntries)).getCause.isInstanceOf[InvalidRequestException]) + assertInstanceOf(classOf[InvalidRequestException], assertThrows(classOf[ExecutionException], () => alterBrokerLoggers(appendLogLevelEntries)).getCause) assertLogLevelDidNotChange() val subtractLogLevelEntries = Seq( new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid new AlterConfigOp(new ConfigEntry("kafka.network.SocketServer", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SUBTRACT) // subtract is not supported ).asJavaCollection - assertTrue(assertThrows(classOf[ExecutionException], () => alterBrokerLoggers(subtractLogLevelEntries)).getCause.isInstanceOf[InvalidRequestException]) + assertInstanceOf(classOf[InvalidRequestException], assertThrows(classOf[ExecutionException], () => alterBrokerLoggers(subtractLogLevelEntries)).getCause) assertLogLevelDidNotChange() val invalidLogLevelLogLevelEntries = Seq( new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid new AlterConfigOp(new ConfigEntry("kafka.network.SocketServer", "OFF"), AlterConfigOp.OpType.SET) // OFF is not a valid log level ).asJavaCollection - assertTrue(assertThrows(classOf[ExecutionException], () => alterBrokerLoggers(invalidLogLevelLogLevelEntries)).getCause.isInstanceOf[InvalidRequestException]) + assertInstanceOf(classOf[InvalidConfigurationException], assertThrows(classOf[ExecutionException], () => alterBrokerLoggers(invalidLogLevelLogLevelEntries)).getCause) assertLogLevelDidNotChange() val invalidLoggerNameLogLevelEntries = Seq( new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid new AlterConfigOp(new ConfigEntry("Some Other LogCleaner", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SET) // invalid logger name is not supported ).asJavaCollection - assertTrue(assertThrows(classOf[ExecutionException], () => alterBrokerLoggers(invalidLoggerNameLogLevelEntries)).getCause.isInstanceOf[InvalidRequestException]) + assertInstanceOf(classOf[InvalidConfigurationException], assertThrows(classOf[ExecutionException], () => alterBrokerLoggers(invalidLoggerNameLogLevelEntries)).getCause) assertLogLevelDidNotChange() } @@ -3252,18 +3247,13 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { client = createAdminClient val alterLogLevelsEntries = Seq( - new ConfigEntry("kafka.controller.KafkaController", LogLevelConfig.INFO_LOG_LEVEL) + new ConfigEntry("kafka.server.ControllerServer", LogLevelConfig.INFO_LOG_LEVEL) ).asJavaCollection val alterResult = client.alterConfigs(Map(brokerLoggerConfigResource -> new Config(alterLogLevelsEntries)).asJava) assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(brokerLoggerConfigResource).get).getCause.isInstanceOf[InvalidRequestException]) } def alterBrokerLoggers(entries: util.Collection[AlterConfigOp], validateOnly: Boolean = false): Unit = { - if (!validateOnly) { - for (entry <- entries.asScala) - changedBrokerLoggers.add(entry.configEntry().name()) - } - client.incrementalAlterConfigs(Map(brokerLoggerConfigResource -> entries).asJava, new AlterConfigsOptions().validateOnly(validateOnly)) .values.get(brokerLoggerConfigResource).get() } @@ -3271,28 +3261,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { def describeBrokerLoggers(): Config = client.describeConfigs(Collections.singletonList(brokerLoggerConfigResource)).values.get(brokerLoggerConfigResource).get() - /** - * Due to the fact that log4j is not re-initialized across tests, changing a logger's log level persists across test classes. - * We need to clean up the changes done while testing. - */ - private def teardownBrokerLoggers(): Unit = { - if (changedBrokerLoggers.nonEmpty) { - val validLoggers = describeBrokerLoggers().entries().asScala.filterNot(_.name.equals(Log4jController.ROOT_LOGGER)).map(_.name).toSet - val unsetBrokerLoggersEntries = changedBrokerLoggers - .intersect(validLoggers) - .map { logger => new AlterConfigOp(new ConfigEntry(logger, ""), AlterConfigOp.OpType.DELETE) } - .asJavaCollection - - // ensure that we first reset the root logger to an arbitrary log level. Note that we cannot reset it to its original value - alterBrokerLoggers(List( - new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, LogLevelConfig.FATAL_LOG_LEVEL), AlterConfigOp.OpType.SET) - ).asJavaCollection) - alterBrokerLoggers(unsetBrokerLoggersEntries) - - changedBrokerLoggers.clear() - } - } - @ParameterizedTest @ValueSource(strings = Array("kraft")) def testAppendConfigToEmptyDefaultValue(ignored: String): Unit = { @@ -3607,4 +3575,17 @@ object PlaintextAdminIntegrationTest { Arguments.of("kraft", "consumer") )) } + + /** + * Resets the logging configuration after the test. + */ + def resetLogging(): Unit = { + org.apache.log4j.LogManager.resetConfiguration() + val stream = this.getClass.getResourceAsStream("/log4j.properties") + try { + PropertyConfigurator.configure(stream) + } finally { + stream.close() + } + } }