mirror of https://github.com/apache/kafka.git
KAFKA-19260 Move LoggingController to server module (#19687)
CI / build (push) Waiting to run
Details
CI / build (push) Waiting to run
Details
Move `LoggingController` to server module and rewrite it in java. Reviewers: PoAn Yang <payang@apache.org>, Ken Huang <s7133700@gmail.com>, Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
e107e69a51
commit
7da9457b36
|
|
@ -890,6 +890,9 @@ project(':server') {
|
|||
}
|
||||
|
||||
dependencies {
|
||||
compileOnly libs.bndlib
|
||||
compileOnly libs.spotbugs
|
||||
|
||||
implementation project(':clients')
|
||||
implementation project(':metadata')
|
||||
implementation project(':server-common')
|
||||
|
|
@ -902,6 +905,7 @@ project(':server') {
|
|||
implementation libs.jacksonDatabind
|
||||
implementation libs.metrics
|
||||
implementation libs.slf4jApi
|
||||
implementation log4j2Libs
|
||||
|
||||
testImplementation project(':clients').sourceSets.test.output
|
||||
|
||||
|
|
|
|||
|
|
@ -88,6 +88,9 @@
|
|||
<allow pkg="org.apache.kafka.network.metrics" />
|
||||
<allow pkg="org.apache.kafka.storage.internals.log" />
|
||||
<allow pkg="org.apache.kafka.storage.internals.checkpoint" />
|
||||
<allow pkg="org.apache.logging.log4j" />
|
||||
<allow pkg="org.apache.logging.log4j.core" />
|
||||
<allow pkg="org.apache.logging.log4j.core.config" />
|
||||
<subpackage name="metrics">
|
||||
<allow class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" />
|
||||
<allow pkg="org.apache.kafka.server.telemetry" />
|
||||
|
|
|
|||
|
|
@ -17,8 +17,6 @@
|
|||
|
||||
package kafka.server.logger;
|
||||
|
||||
import kafka.utils.LoggingController;
|
||||
|
||||
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
|
||||
import org.apache.kafka.common.config.LogLevelConfig;
|
||||
import org.apache.kafka.common.errors.ClusterAuthorizationException;
|
||||
|
|
@ -27,6 +25,7 @@ import org.apache.kafka.common.errors.InvalidRequestException;
|
|||
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
|
||||
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.server.logger.LoggingController;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
||||
|
|
@ -131,9 +130,9 @@ public class RuntimeLoggerManager {
|
|||
break;
|
||||
case DELETE:
|
||||
validateLoggerNameExists(loggerName);
|
||||
if (loggerName.equals(LoggingController.ROOT_LOGGER())) {
|
||||
if (loggerName.equals(LoggingController.ROOT_LOGGER)) {
|
||||
throw new InvalidRequestException("Removing the log level of the " +
|
||||
LoggingController.ROOT_LOGGER() + " logger is not allowed");
|
||||
LoggingController.ROOT_LOGGER + " logger is not allowed");
|
||||
}
|
||||
break;
|
||||
case APPEND:
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ package kafka.server
|
|||
import kafka.network.RequestChannel
|
||||
|
||||
import java.util.{Collections, Properties}
|
||||
import kafka.utils.{LoggingController, Logging}
|
||||
import kafka.utils.Logging
|
||||
import org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS
|
||||
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigResource}
|
||||
import org.apache.kafka.common.errors.{ApiException, InvalidRequestException}
|
||||
|
|
@ -35,6 +35,7 @@ import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC}
|
|||
import org.apache.kafka.coordinator.group.GroupConfig
|
||||
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
|
||||
import org.apache.kafka.server.config.ServerTopicConfigSynonyms
|
||||
import org.apache.kafka.server.logger.LoggingController
|
||||
import org.apache.kafka.server.metrics.ClientMetricsConfigs
|
||||
import org.apache.kafka.storage.internals.log.LogConfig
|
||||
|
||||
|
|
@ -130,7 +131,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
|
|||
else if (resourceNameToBrokerId(resource.resourceName) != config.brokerId)
|
||||
throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} but received ${resource.resourceName}")
|
||||
else
|
||||
createResponseConfig(LoggingController.loggers,
|
||||
createResponseConfig(LoggingController.loggers.asScala,
|
||||
(name, value) => new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name)
|
||||
.setValue(value.toString).setConfigSource(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG.id)
|
||||
.setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava))
|
||||
|
|
|
|||
|
|
@ -18,9 +18,9 @@
|
|||
package kafka.utils
|
||||
|
||||
import com.typesafe.scalalogging.Logger
|
||||
import org.apache.kafka.server.logger.LoggingController
|
||||
import org.slf4j.{LoggerFactory, Marker, MarkerFactory}
|
||||
|
||||
|
||||
object Log4jControllerRegistration {
|
||||
private val logger = Logger(this.getClass.getName)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,174 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.utils
|
||||
|
||||
import com.typesafe.scalalogging.Logger
|
||||
import kafka.utils.LoggingController.ROOT_LOGGER
|
||||
import org.apache.kafka.common.utils.Utils
|
||||
import org.apache.logging.log4j.core.LoggerContext
|
||||
import org.apache.logging.log4j.core.config.Configurator
|
||||
import org.apache.logging.log4j.{Level, LogManager}
|
||||
|
||||
import java.util
|
||||
import java.util.Locale
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
|
||||
object LoggingController {
|
||||
|
||||
private val logger = Logger[LoggingController]
|
||||
|
||||
/**
|
||||
* Note: In Log4j 1, the root logger's name was "root" and Kafka also followed that name for dynamic logging control feature.
|
||||
*
|
||||
* The root logger's name is changed in log4j2 to empty string (see: [[LogManager.ROOT_LOGGER_NAME]]) but for backward-
|
||||
* compatibility. Kafka keeps its original root logger name. It is why here is a dedicated definition for the root logger name.
|
||||
*/
|
||||
val ROOT_LOGGER = "root"
|
||||
|
||||
private[this] val delegate: LoggingControllerDelegate = {
|
||||
try {
|
||||
new Log4jCoreController
|
||||
} catch {
|
||||
case _: ClassCastException | _: LinkageError =>
|
||||
logger.info("No supported logging implementation found. Logging configuration endpoint will be disabled.")
|
||||
new NoOpController
|
||||
case e: Exception =>
|
||||
logger.warn("A problem occurred, while initializing the logging controller. Logging configuration endpoint will be disabled.", e)
|
||||
new NoOpController
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a map of the log4j loggers and their assigned log level.
|
||||
* If a logger does not have a log level assigned, we return the log level of the first ancestor with a level configured.
|
||||
*/
|
||||
def loggers: Map[String, String] = delegate.loggers
|
||||
|
||||
/**
|
||||
* Sets the log level of a particular logger. If the given logLevel is not an available level
|
||||
* (i.e., one of OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE, ALL) it falls back to DEBUG.
|
||||
*
|
||||
* @see [[Level.toLevel]]
|
||||
*/
|
||||
def logLevel(loggerName: String, logLevel: String): Boolean = delegate.logLevel(loggerName, logLevel)
|
||||
|
||||
def unsetLogLevel(loggerName: String): Boolean = delegate.unsetLogLevel(loggerName)
|
||||
|
||||
def loggerExists(loggerName: String): Boolean = delegate.loggerExists(loggerName)
|
||||
}
|
||||
|
||||
private class NoOpController extends LoggingControllerDelegate {
|
||||
override def loggers: Map[String, String] = Map.empty
|
||||
|
||||
override def logLevel(loggerName: String, logLevel: String): Boolean = false
|
||||
|
||||
override def unsetLogLevel(loggerName: String): Boolean = false
|
||||
}
|
||||
|
||||
private class Log4jCoreController extends LoggingControllerDelegate {
|
||||
private[this] val logContext = LogManager.getContext(false).asInstanceOf[LoggerContext]
|
||||
|
||||
override def loggers: Map[String, String] = {
|
||||
val rootLoggerLevel = logContext.getRootLogger.getLevel.toString
|
||||
|
||||
// Loggers defined in the configuration
|
||||
val configured = logContext.getConfiguration.getLoggers.asScala
|
||||
.values
|
||||
.filterNot(_.getName.equals(LogManager.ROOT_LOGGER_NAME))
|
||||
.map { logger =>
|
||||
logger.getName -> logger.getLevel.toString
|
||||
}.toMap
|
||||
|
||||
// Loggers actually running
|
||||
val actual = logContext.getLoggers.asScala
|
||||
.filterNot(_.getName.equals(LogManager.ROOT_LOGGER_NAME))
|
||||
.map { logger =>
|
||||
logger.getName -> logger.getLevel.toString
|
||||
}.toMap
|
||||
|
||||
(configured ++ actual) + (ROOT_LOGGER -> rootLoggerLevel)
|
||||
}
|
||||
|
||||
override def logLevel(loggerName: String, logLevel: String): Boolean = {
|
||||
if (Utils.isBlank(loggerName) || Utils.isBlank(logLevel))
|
||||
return false
|
||||
|
||||
val level = Level.toLevel(logLevel.toUpperCase(Locale.ROOT))
|
||||
|
||||
if (loggerName == ROOT_LOGGER) {
|
||||
Configurator.setLevel(LogManager.ROOT_LOGGER_NAME, level)
|
||||
true
|
||||
} else {
|
||||
if (loggerExists(loggerName) && level != null) {
|
||||
Configurator.setLevel(loggerName, level)
|
||||
true
|
||||
}
|
||||
else false
|
||||
}
|
||||
}
|
||||
|
||||
override def unsetLogLevel(loggerName: String): Boolean = {
|
||||
val nullLevel: Level = null
|
||||
if (loggerName == ROOT_LOGGER) {
|
||||
Configurator.setLevel(LogManager.ROOT_LOGGER_NAME, nullLevel)
|
||||
true
|
||||
} else {
|
||||
if (loggerExists(loggerName)) {
|
||||
Configurator.setLevel(loggerName, nullLevel)
|
||||
true
|
||||
}
|
||||
else false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private abstract class LoggingControllerDelegate {
|
||||
def loggers: Map[String, String]
|
||||
def logLevel(loggerName: String, logLevel: String): Boolean
|
||||
def unsetLogLevel(loggerName: String): Boolean
|
||||
def loggerExists(loggerName: String): Boolean = loggers.contains(loggerName)
|
||||
}
|
||||
|
||||
/**
|
||||
* An MBean that allows the user to dynamically alter log4j levels at runtime.
|
||||
* The companion object contains the singleton instance of this class and
|
||||
* registers the MBean. The [[kafka.utils.Logging]] trait forces initialization
|
||||
* of the companion object.
|
||||
*/
|
||||
class LoggingController extends LoggingControllerMBean {
|
||||
|
||||
def getLoggers: util.List[String] = {
|
||||
// we replace scala collection by java collection so mbean client is able to deserialize it without scala library.
|
||||
new util.ArrayList[String](LoggingController.loggers.map {
|
||||
case (logger, level) => s"$logger=$level"
|
||||
}.toSeq.asJava)
|
||||
}
|
||||
|
||||
def getLogLevel(loggerName: String): String = {
|
||||
LoggingController.loggers.getOrElse(loggerName, "No such logger.")
|
||||
}
|
||||
|
||||
def setLogLevel(loggerName: String, level: String): Boolean = LoggingController.logLevel(loggerName, level)
|
||||
}
|
||||
|
||||
trait LoggingControllerMBean {
|
||||
def getLoggers: java.util.List[String]
|
||||
def getLogLevel(logger: String): String
|
||||
def setLogLevel(logger: String, level: String): Boolean
|
||||
}
|
||||
|
|
@ -16,13 +16,12 @@
|
|||
*/
|
||||
package kafka.server.logger;
|
||||
|
||||
import kafka.utils.LoggingController;
|
||||
|
||||
import org.apache.kafka.clients.admin.AlterConfigOp;
|
||||
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
|
||||
import org.apache.kafka.common.errors.InvalidConfigurationException;
|
||||
import org.apache.kafka.common.errors.InvalidRequestException;
|
||||
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
|
||||
import org.apache.kafka.server.logger.LoggingController;
|
||||
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
|
@ -82,18 +81,18 @@ public class RuntimeLoggerManagerTest {
|
|||
@Test
|
||||
public void testValidateSetRootLogLevelConfig() {
|
||||
MANAGER.validateLogLevelConfigs(List.of(new AlterableConfig().
|
||||
setName(LoggingController.ROOT_LOGGER()).
|
||||
setName(LoggingController.ROOT_LOGGER).
|
||||
setConfigOperation(OpType.SET.id()).
|
||||
setValue("TRACE")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateRemoveRootLogLevelConfigNotAllowed() {
|
||||
assertEquals("Removing the log level of the " + LoggingController.ROOT_LOGGER() +
|
||||
assertEquals("Removing the log level of the " + LoggingController.ROOT_LOGGER +
|
||||
" logger is not allowed",
|
||||
Assertions.assertThrows(InvalidRequestException.class,
|
||||
() -> MANAGER.validateLogLevelConfigs(List.of(new AlterableConfig().
|
||||
setName(LoggingController.ROOT_LOGGER()).
|
||||
setName(LoggingController.ROOT_LOGGER).
|
||||
setConfigOperation(OpType.DELETE.id()).
|
||||
setValue("")))).getMessage());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ import java.{time, util}
|
|||
import kafka.integration.KafkaServerTestHarness
|
||||
import kafka.server.KafkaConfig
|
||||
import kafka.utils.TestUtils._
|
||||
import kafka.utils.{LoggingController, TestInfoUtils, TestUtils}
|
||||
import kafka.utils.{TestInfoUtils, TestUtils}
|
||||
import org.apache.kafka.clients.HostResolver
|
||||
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
|
||||
import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
|
||||
|
|
@ -53,6 +53,7 @@ import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
|
|||
import org.apache.kafka.network.SocketServerConfigs
|
||||
import org.apache.kafka.security.authorizer.AclEntry
|
||||
import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, ServerLogConfigs}
|
||||
import org.apache.kafka.server.logger.LoggingController
|
||||
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogFileUtils}
|
||||
import org.apache.kafka.test.TestUtils.{DEFAULT_MAX_WAIT_MS, assertFutureThrows}
|
||||
import org.apache.logging.log4j.core.config.Configurator
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package kafka.utils
|
||||
|
||||
import org.apache.kafka.server.logger.LoggingController
|
||||
import java.lang.management.ManagementFactory
|
||||
|
||||
import javax.management.ObjectName
|
||||
|
|
@ -24,17 +25,8 @@ import org.junit.jupiter.api.Test
|
|||
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
|
||||
class LoggingTest extends Logging {
|
||||
|
||||
@Test
|
||||
def testTypeOfGetLoggers(): Unit = {
|
||||
val log4jController = new LoggingController
|
||||
// the return object of getLoggers must be a collection instance from java standard library.
|
||||
// That enables mbean client to deserialize it without extra libraries.
|
||||
assertEquals(classOf[java.util.ArrayList[String]], log4jController.getLoggers.getClass)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testLog4jControllerIsRegistered(): Unit = {
|
||||
val mbs = ManagementFactory.getPlatformMBeanServer
|
||||
|
|
@ -42,7 +34,7 @@ class LoggingTest extends Logging {
|
|||
val log4jControllerName = ObjectName.getInstance("kafka:type=kafka.Log4jController")
|
||||
assertTrue(mbs.isRegistered(log4jControllerName), "kafka.utils.Log4jController is not registered")
|
||||
val log4jInstance = mbs.getObjectInstance(log4jControllerName)
|
||||
assertEquals("kafka.utils.LoggingController", log4jInstance.getClassName)
|
||||
assertEquals("org.apache.kafka.server.logger.LoggingController", log4jInstance.getClassName)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ import kafka.network.RequestChannel
|
|||
import kafka.server.QuotaFactory.QuotaManagers
|
||||
import kafka.server.metadata.KRaftMetadataCache
|
||||
import kafka.server.share.SharePartitionManager
|
||||
import kafka.utils.{CoreUtils, Logging, LoggingController, TestUtils}
|
||||
import kafka.utils.{CoreUtils, Logging, TestUtils}
|
||||
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
|
||||
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
|
||||
import org.apache.kafka.common._
|
||||
|
|
@ -90,6 +90,7 @@ import org.apache.kafka.server.{ClientMetricsManager, SimpleApiVersionManager}
|
|||
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
|
||||
import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, ShareVersion, StreamsVersion, TransactionVersion}
|
||||
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
|
||||
import org.apache.kafka.server.logger.LoggingController
|
||||
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
|
||||
import org.apache.kafka.server.share.{CachedSharePartition, ErroneousAndValidPartitionData, SharePartitionKey}
|
||||
import org.apache.kafka.server.quota.ThrottleCallback
|
||||
|
|
|
|||
|
|
@ -0,0 +1,92 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.server.logger;
|
||||
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
|
||||
import org.apache.logging.log4j.Level;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.core.LoggerContext;
|
||||
import org.apache.logging.log4j.core.config.Configurator;
|
||||
import org.apache.logging.log4j.core.config.LoggerConfig;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
||||
class Log4jCoreController implements LoggingControllerDelegate {
|
||||
private final LoggerContext logContext;
|
||||
|
||||
public Log4jCoreController() {
|
||||
this.logContext = (LoggerContext) LogManager.getContext(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> loggers() {
|
||||
String rootLoggerLevel = logContext.getRootLogger().getLevel().toString();
|
||||
|
||||
Map<String, String> result = new HashMap<>();
|
||||
// Loggers defined in the configuration
|
||||
for (LoggerConfig logger : logContext.getConfiguration().getLoggers().values()) {
|
||||
if (!logger.getName().equals(LogManager.ROOT_LOGGER_NAME)) {
|
||||
result.put(logger.getName(), logger.getLevel().toString());
|
||||
}
|
||||
}
|
||||
// Loggers actually running
|
||||
for (Logger logger : logContext.getLoggers()) {
|
||||
if (!logger.getName().equals(LogManager.ROOT_LOGGER_NAME)) {
|
||||
result.put(logger.getName(), logger.getLevel().toString());
|
||||
}
|
||||
}
|
||||
// Add root logger
|
||||
result.put(LoggingController.ROOT_LOGGER, rootLoggerLevel);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean logLevel(String loggerName, String logLevel) {
|
||||
if (Utils.isBlank(loggerName) || Utils.isBlank(logLevel))
|
||||
return false;
|
||||
|
||||
Level level = Level.toLevel(logLevel.toUpperCase(Locale.ROOT));
|
||||
|
||||
if (loggerName.equals(LoggingController.ROOT_LOGGER)) {
|
||||
Configurator.setLevel(LogManager.ROOT_LOGGER_NAME, level);
|
||||
return true;
|
||||
}
|
||||
if (loggerExists(loggerName) && level != null) {
|
||||
Configurator.setLevel(loggerName, level);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean unsetLogLevel(String loggerName) {
|
||||
Level nullLevel = null;
|
||||
if (loggerName.equals(LoggingController.ROOT_LOGGER)) {
|
||||
Configurator.setLevel(LogManager.ROOT_LOGGER_NAME, nullLevel);
|
||||
return true;
|
||||
}
|
||||
if (loggerExists(loggerName)) {
|
||||
Configurator.setLevel(loggerName, nullLevel);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,103 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.server.logger;
|
||||
|
||||
import org.apache.logging.log4j.Level;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* An MBean that allows the user to dynamically alter log4j levels at runtime.
|
||||
* The companion object contains the singleton instance of this class and
|
||||
* registers the MBean. The {@code kafka.utils.Logging} trait forces initialization
|
||||
* of the companion object.
|
||||
*/
|
||||
public class LoggingController implements LoggingControllerMBean {
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(LoggingController.class);
|
||||
|
||||
/**
|
||||
* Note: In Log4j 1, the root logger's name was "root" and Kafka also followed that name for dynamic logging control feature.
|
||||
* The root logger's name is changed in log4j2 to empty string (see: {@link LogManager#ROOT_LOGGER_NAME}) but for backward-compatibility.
|
||||
* Kafka keeps its original root logger name. It is why here is a dedicated definition for the root logger name.
|
||||
*/
|
||||
public static final String ROOT_LOGGER = "root";
|
||||
|
||||
private static final LoggingControllerDelegate DELEGATE;
|
||||
|
||||
static {
|
||||
LoggingControllerDelegate tempDelegate;
|
||||
try {
|
||||
tempDelegate = new Log4jCoreController();
|
||||
} catch (ClassCastException | LinkageError e) {
|
||||
LOGGER.info("No supported logging implementation found. Logging configuration endpoint will be disabled.");
|
||||
tempDelegate = new NoOpController();
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("A problem occurred, while initializing the logging controller. Logging configuration endpoint will be disabled.", e);
|
||||
tempDelegate = new NoOpController();
|
||||
}
|
||||
DELEGATE = tempDelegate;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a map of the log4j loggers and their assigned log level.
|
||||
* If a logger does not have a log level assigned, we return the log level of the first ancestor with a level configured.
|
||||
*/
|
||||
public static Map<String, String> loggers() {
|
||||
return DELEGATE.loggers();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the log level of a particular logger. If the given logLevel is not an available level
|
||||
* (i.e., one of OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE, ALL) it falls back to DEBUG.
|
||||
*
|
||||
* @see Level#toLevel(String, Level)
|
||||
*/
|
||||
public static boolean logLevel(String loggerName, String logLevel) {
|
||||
return DELEGATE.logLevel(loggerName, logLevel);
|
||||
}
|
||||
|
||||
public static boolean unsetLogLevel(String loggerName) {
|
||||
return DELEGATE.unsetLogLevel(loggerName);
|
||||
}
|
||||
|
||||
public static boolean loggerExists(String loggerName) {
|
||||
return DELEGATE.loggerExists(loggerName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getLoggers() {
|
||||
return LoggingController.loggers()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.map(entry -> entry.getKey() + "=" + entry.getValue())
|
||||
.toList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getLogLevel(String loggerName) {
|
||||
return LoggingController.loggers().getOrDefault(loggerName, "No such logger.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean setLogLevel(String loggerName, String level) {
|
||||
return LoggingController.logLevel(loggerName, level);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.server.logger;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public interface LoggingControllerDelegate {
|
||||
Map<String, String> loggers();
|
||||
boolean logLevel(String loggerName, String logLevel);
|
||||
boolean unsetLogLevel(String loggerName);
|
||||
default boolean loggerExists(String loggerName) {
|
||||
return loggers().containsKey(loggerName);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,26 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.server.logger;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
|
||||
public interface LoggingControllerMBean {
|
||||
List<String> getLoggers();
|
||||
String getLogLevel(String logger);
|
||||
boolean setLogLevel(String logger, String level);
|
||||
}
|
||||
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.server.logger;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
class NoOpController implements LoggingControllerDelegate {
|
||||
|
||||
@Override
|
||||
public Map<String, String> loggers() {
|
||||
return Map.of();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean logLevel(String loggerName, String logLevel) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean unsetLogLevel(String loggerName) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue