setWorkerLoggerLevel(String namespace, String level);
+
+ /**
+ * Set the level for a logging namespace (i.e., a specific logger and all of its children) for all workers
+ * in the cluster. Changes should only last over the lifetime of workers, and should be wiped if/when
+ * workers are restarted.
+ * @param namespace the logging namespace to alter; may not be null
+ * @param level the new level to set for the namespace; may not be null
+ */
+ void setClusterLoggerLevel(String namespace, String level);
+
enum ConfigReloadAction {
NONE,
RESTART
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java
new file mode 100644
index 00000000000..b0d58871c38
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+/**
+ * Manages logging levels on a single worker. Supports dynamic adjustment and querying
+ * of logging levels.
+ *
+ * This class is thread-safe; concurrent calls to all of its public methods from any number
+ * of threads are permitted.
+ */
+public class Loggers {
+
+ private static final Logger log = LoggerFactory.getLogger(Loggers.class);
+
+ /**
+ * Log4j uses "root" (case-insensitive) as name of the root logger.
+ */
+ private static final String ROOT_LOGGER_NAME = "root";
+
+ private final Time time;
+ private final Map lastModifiedTimes;
+
+ public Loggers(Time time) {
+ this.time = time;
+ this.lastModifiedTimes = new HashMap<>();
+ }
+
+ /**
+ * Retrieve the current level for a single logger.
+ * @param logger the name of the logger to retrieve the level for; may not be null
+ * @return the current level (falling back on the effective level if necessary) of the logger,
+ * or null if no logger with the specified name exists
+ */
+ public synchronized LoggerLevel level(String logger) {
+ Objects.requireNonNull(logger, "Logger may not be null");
+
+ org.apache.log4j.Logger foundLogger = null;
+ if (ROOT_LOGGER_NAME.equalsIgnoreCase(logger)) {
+ foundLogger = rootLogger();
+ } else {
+ Enumeration en = currentLoggers();
+ // search within existing loggers for the given name.
+ // using LogManger.getLogger() will create a logger if it doesn't exist
+ // (potential leak since these don't get cleaned up).
+ while (en.hasMoreElements()) {
+ org.apache.log4j.Logger l = en.nextElement();
+ if (logger.equals(l.getName())) {
+ foundLogger = l;
+ break;
+ }
+ }
+ }
+
+ if (foundLogger == null) {
+ log.warn("Unable to find level for logger {}", logger);
+ return null;
+ }
+
+ return loggerLevel(foundLogger);
+ }
+
+ /**
+ * Retrieve the current levels of all known loggers
+ * @return the levels of all known loggers; may be empty, but never null
+ */
+ public synchronized Map allLevels() {
+ Map result = new TreeMap<>();
+
+ Enumeration enumeration = currentLoggers();
+ Collections.list(enumeration)
+ .stream()
+ .filter(logger -> logger.getLevel() != null)
+ .forEach(logger -> result.put(logger.getName(), loggerLevel(logger)));
+
+ org.apache.log4j.Logger root = rootLogger();
+ if (root.getLevel() != null) {
+ result.put(ROOT_LOGGER_NAME, loggerLevel(root));
+ }
+
+ return result;
+ }
+
+ /**
+ * Set the level for the specified logger and all of its children
+ * @param namespace the name of the logger to adjust along with its children; may not be null
+ * @param level the level to set for the logger and its children; may not be null
+ * @return all loggers that were affected by this action, sorted by their natural ordering;
+ * may be empty, but never null
+ */
+ public synchronized List setLevel(String namespace, Level level) {
+ Objects.requireNonNull(namespace, "Logging namespace may not be null");
+ Objects.requireNonNull(level, "Level may not be null");
+
+ log.info("Setting level of namespace {} and children to {}", namespace, level);
+ List childLoggers = loggers(namespace);
+
+ List result = new ArrayList<>();
+ for (org.apache.log4j.Logger logger: childLoggers) {
+ setLevel(logger, level);
+ result.add(logger.getName());
+ }
+ Collections.sort(result);
+
+ return result;
+ }
+
+ /**
+ * Retrieve all known loggers within a given namespace, creating an ancestor logger for that
+ * namespace if one does not already exist
+ * @param namespace the namespace that the loggers should fall under; may not be null
+ * @return all loggers that fall under the given namespace; never null, and will always contain
+ * at least one logger (the ancestor logger for the namespace)
+ */
+ private synchronized List loggers(String namespace) {
+ Objects.requireNonNull(namespace, "Logging namespace may not be null");
+
+ if (ROOT_LOGGER_NAME.equalsIgnoreCase(namespace)) {
+ List result = Collections.list(currentLoggers());
+ result.add(rootLogger());
+ return result;
+ }
+
+ List result = new ArrayList<>();
+ org.apache.log4j.Logger ancestorLogger = lookupLogger(namespace);
+ Enumeration en = currentLoggers();
+ boolean present = false;
+ while (en.hasMoreElements()) {
+ org.apache.log4j.Logger current = en.nextElement();
+ if (current.getName().startsWith(namespace)) {
+ result.add(current);
+ }
+ if (namespace.equals(current.getName())) {
+ present = true;
+ }
+ }
+
+ if (!present) {
+ result.add(ancestorLogger);
+ }
+
+ return result;
+ }
+
+ // visible for testing
+ org.apache.log4j.Logger lookupLogger(String logger) {
+ return LogManager.getLogger(logger);
+ }
+
+ @SuppressWarnings("unchecked")
+ // visible for testing
+ Enumeration currentLoggers() {
+ return LogManager.getCurrentLoggers();
+ }
+
+ // visible for testing
+ org.apache.log4j.Logger rootLogger() {
+ return LogManager.getRootLogger();
+ }
+
+ private void setLevel(org.apache.log4j.Logger logger, Level level) {
+ Level currentLevel = logger.getLevel();
+ if (currentLevel == null)
+ currentLevel = logger.getEffectiveLevel();
+
+ if (level.equals(currentLevel)) {
+ log.debug("Skipping update for logger {} since its level is already {}", logger.getName(), level);
+ return;
+ }
+
+ log.debug("Setting level of logger {} (excluding children) to {}", logger.getName(), level);
+ logger.setLevel(level);
+ lastModifiedTimes.put(logger.getName(), time.milliseconds());
+ }
+
+ private LoggerLevel loggerLevel(org.apache.log4j.Logger logger) {
+ Level level = logger.getLevel();
+ if (level == null)
+ level = logger.getEffectiveLevel();
+
+ Long lastModified = lastModifiedTimes.get(logger.getName());
+ return new LoggerLevel(Objects.toString(level), lastModified);
+ }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 94d19207cdc..48b5fad3423 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -279,7 +279,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
ExecutorService forwardRequestExecutor,
// https://github.com/mockito/mockito/issues/2601 explains why we can't use varargs here
AutoCloseable[] uponShutdown) {
- super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore, connectorClientConfigOverridePolicy);
+ super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore, connectorClientConfigOverridePolicy, time);
this.time = time;
this.herderMetrics = new HerderMetrics(metrics);
@@ -1612,6 +1612,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
return true;
}
+ @Override
+ public void setClusterLoggerLevel(String namespace, String level) {
+ configBackingStore.putLoggerLevel(namespace, level);
+ }
+
// Should only be called from work thread, so synchronization should not be needed
private boolean isLeader() {
return assignment != null && member.memberId().equals(assignment.leader());
@@ -2376,6 +2381,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
member.wakeup();
}
+
+ @Override
+ public void onLoggingLevelUpdate(String namespace, String level) {
+ setWorkerLoggerLevel(namespace, level);
+ }
}
class DistributedHerderRequest implements HerderRequest, Comparable {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java
index ca9eb731c0c..6cef19c22b8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java
@@ -57,7 +57,7 @@ public class ConnectRestServer extends RestServer {
@Override
protected Collection adminResources() {
return Arrays.asList(
- new LoggingResource()
+ new LoggingResource(herder)
);
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/LoggerLevel.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/LoggerLevel.java
new file mode 100644
index 00000000000..4a9a6be32e5
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/LoggerLevel.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+public class LoggerLevel {
+
+ private final String level;
+ private final Long lastModified;
+
+ public LoggerLevel(
+ @JsonProperty("level") String level,
+ @JsonProperty("last_modified") Long lastModified
+ ) {
+ this.level = Objects.requireNonNull(level, "level may not be null");
+ this.lastModified = lastModified;
+ }
+
+ @JsonProperty
+ public String level() {
+ return level;
+ }
+
+ @JsonProperty("last_modified")
+ public Long lastModified() {
+ return lastModified;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ LoggerLevel that = (LoggerLevel) o;
+ return level.equals(that.level) && Objects.equals(lastModified, that.lastModified);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(level, lastModified);
+ }
+
+ @Override
+ public String toString() {
+ return "LoggerLevel{"
+ + "level='" + level + '\''
+ + ", lastModified=" + lastModified
+ + '}';
+ }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
index 812cf696563..b215fe72adb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
@@ -17,28 +17,28 @@
package org.apache.kafka.connect.runtime.rest.resources;
import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.Parameter;
import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Enumeration;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
-import java.util.TreeMap;
/**
* A set of endpoints to adjust the log levels of runtime loggers.
@@ -48,10 +48,16 @@ import java.util.TreeMap;
@Consumes(MediaType.APPLICATION_JSON)
public class LoggingResource implements ConnectResource {
- /**
- * Log4j uses "root" (case-insensitive) as name of the root logger.
- */
- private static final String ROOT_LOGGER_NAME = "root";
+ private static final org.slf4j.Logger log = LoggerFactory.getLogger(LoggingResource.class);
+
+ private static final String WORKER_SCOPE = "worker";
+ private static final String CLUSTER_SCOPE = "cluster";
+
+ private final Herder herder;
+
+ public LoggingResource(Herder herder) {
+ this.herder = herder;
+ }
@Override
public void requestTimeout(long requestTimeoutMs) {
@@ -67,19 +73,7 @@ public class LoggingResource implements ConnectResource {
@Path("/")
@Operation(summary = "List the current loggers that have their levels explicitly set and their log levels")
public Response listLoggers() {
- Map> loggers = new TreeMap<>();
- Enumeration enumeration = currentLoggers();
- Collections.list(enumeration)
- .stream()
- .filter(logger -> logger.getLevel() != null)
- .forEach(logger -> loggers.put(logger.getName(), levelToMap(logger)));
-
- Logger root = rootLogger();
- if (root.getLevel() != null) {
- loggers.put(ROOT_LOGGER_NAME, levelToMap(root));
- }
-
- return Response.ok(loggers).build();
+ return Response.ok(herder.allLoggerLevels()).build();
}
/**
@@ -94,34 +88,17 @@ public class LoggingResource implements ConnectResource {
public Response getLogger(final @PathParam("logger") String namedLogger) {
Objects.requireNonNull(namedLogger, "require non-null name");
- Logger logger = null;
- if (ROOT_LOGGER_NAME.equalsIgnoreCase(namedLogger)) {
- logger = rootLogger();
- } else {
- Enumeration en = currentLoggers();
- // search within existing loggers for the given name.
- // using LogManger.getLogger() will create a logger if it doesn't exist
- // (potential leak since these don't get cleaned up).
- while (en.hasMoreElements()) {
- Logger l = en.nextElement();
- if (namedLogger.equals(l.getName())) {
- logger = l;
- break;
- }
- }
- }
- if (logger == null) {
+ LoggerLevel loggerLevel = herder.loggerLevel(namedLogger);
+ if (loggerLevel == null)
throw new NotFoundException("Logger " + namedLogger + " not found.");
- } else {
- return Response.ok(effectiveLevelToMap(logger)).build();
- }
- }
+ return Response.ok(loggerLevel).build();
+ }
/**
* Adjust level of a named logger. If the name corresponds to an ancestor, then the log level is applied to all child loggers.
*
- * @param namedLogger name of the logger
+ * @param namespace name of the logger
* @param levelMap a map that is expected to contain one key 'level', and a value that is one of the log4j levels:
* DEBUG, ERROR, FATAL, INFO, TRACE, WARN
* @return names of loggers whose levels were modified
@@ -129,87 +106,36 @@ public class LoggingResource implements ConnectResource {
@PUT
@Path("/{logger}")
@Operation(summary = "Set the log level for the specified logger")
- public Response setLevel(final @PathParam("logger") String namedLogger,
- final Map levelMap) {
- String desiredLevelStr = levelMap.get("level");
- if (desiredLevelStr == null) {
+ @SuppressWarnings("fallthrough")
+ public Response setLevel(final @PathParam("logger") String namespace,
+ final Map levelMap,
+ @DefaultValue("worker") @QueryParam("scope") @Parameter(description = "The scope for the logging modification (single-worker, cluster-wide, etc.)") String scope) {
+ if (scope == null) {
+ log.warn("Received null scope in request to adjust logging level; will default to {}", WORKER_SCOPE);
+ scope = WORKER_SCOPE;
+ }
+
+ String levelString = levelMap.get("level");
+ if (levelString == null) {
throw new BadRequestException("Desired 'level' parameter was not specified in request.");
}
- Level level = Level.toLevel(desiredLevelStr.toUpperCase(Locale.ROOT), null);
+ // Make sure that this is a valid level
+ Level level = Level.toLevel(levelString.toUpperCase(Locale.ROOT), null);
if (level == null) {
- throw new NotFoundException("invalid log level '" + desiredLevelStr + "'.");
+ throw new NotFoundException("invalid log level '" + levelString + "'.");
}
- List childLoggers;
- if (ROOT_LOGGER_NAME.equalsIgnoreCase(namedLogger)) {
- childLoggers = Collections.list(currentLoggers());
- childLoggers.add(rootLogger());
- } else {
- childLoggers = new ArrayList<>();
- Logger ancestorLogger = lookupLogger(namedLogger);
- Enumeration en = currentLoggers();
- boolean present = false;
- while (en.hasMoreElements()) {
- Logger current = en.nextElement();
- if (current.getName().startsWith(namedLogger)) {
- childLoggers.add(current);
- }
- if (namedLogger.equals(current.getName())) {
- present = true;
- }
- }
- if (!present) {
- childLoggers.add(ancestorLogger);
- }
+ switch (scope.toLowerCase(Locale.ROOT)) {
+ default:
+ log.warn("Received invalid scope '{}' in request to adjust logging level; will default to {}", scope, WORKER_SCOPE);
+ case WORKER_SCOPE:
+ List affectedLoggers = herder.setWorkerLoggerLevel(namespace, levelString);
+ return Response.ok(affectedLoggers).build();
+ case CLUSTER_SCOPE:
+ herder.setClusterLoggerLevel(namespace, levelString);
+ return Response.noContent().build();
}
-
- List modifiedLoggerNames = new ArrayList<>();
- for (Logger logger: childLoggers) {
- logger.setLevel(level);
- modifiedLoggerNames.add(logger.getName());
- }
- Collections.sort(modifiedLoggerNames);
-
- return Response.ok(modifiedLoggerNames).build();
}
- protected Logger lookupLogger(String namedLogger) {
- return LogManager.getLogger(namedLogger);
- }
-
- @SuppressWarnings("unchecked")
- protected Enumeration currentLoggers() {
- return LogManager.getCurrentLoggers();
- }
-
- protected Logger rootLogger() {
- return LogManager.getRootLogger();
- }
-
- /**
- *
- * Map representation of a logger's effective log level.
- *
- * @param logger a non-null log4j logger
- * @return a singleton map whose key is level and the value is the string representation of the logger's effective log level.
- */
- private static Map effectiveLevelToMap(Logger logger) {
- Level level = logger.getLevel();
- if (level == null) {
- level = logger.getEffectiveLevel();
- }
- return Collections.singletonMap("level", String.valueOf(level));
- }
-
- /**
- *
- * Map representation of a logger's log level.
- *
- * @param logger a non-null log4j logger
- * @return a singleton map whose key is level and the value is the string representation of the logger's log level.
- */
- private static Map levelToMap(Logger logger) {
- return Collections.singletonMap("level", String.valueOf(logger.getLevel()));
- }
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index babb157f772..0da89b2f668 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime.standalone;
import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
@@ -82,7 +83,9 @@ public class StandaloneHerder extends AbstractHerder {
kafkaClusterId,
new MemoryStatusBackingStore(),
new MemoryConfigBackingStore(worker.configTransformer()),
- connectorClientConfigOverridePolicy);
+ connectorClientConfigOverridePolicy,
+ Time.SYSTEM
+ );
}
// visible for testing
@@ -91,8 +94,9 @@ public class StandaloneHerder extends AbstractHerder {
String kafkaClusterId,
StatusBackingStore statusBackingStore,
MemoryConfigBackingStore configBackingStore,
- ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
- super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore, connectorClientConfigOverridePolicy);
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+ Time time) {
+ super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore, connectorClientConfigOverridePolicy, time);
this.configState = ClusterConfigState.EMPTY;
this.requestExecutorService = Executors.newSingleThreadScheduledExecutor();
configBackingStore.setUpdateListener(new ConfigUpdateListener());
@@ -404,6 +408,12 @@ public class StandaloneHerder extends AbstractHerder {
return true;
}
+ @Override
+ public void setClusterLoggerLevel(String namespace, String level) {
+ // In standalone mode, this single worker is the entire cluster
+ setWorkerLoggerLevel(namespace, level);
+ }
+
private void startConnector(String connName, Callback onStart) {
Map connConfigs = configState.connectorConfig(connName);
TargetState targetState = configState.targetState(connName);
@@ -544,6 +554,11 @@ public class StandaloneHerder extends AbstractHerder {
public void onRestartRequest(RestartRequest restartRequest) {
// no-op
}
+
+ @Override
+ public void onLoggingLevelUpdate(String namespace, String level) {
+ // no-op
+ }
}
static class StandaloneHerderRequest implements HerderRequest {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java
index e867087593a..c869c545f80 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java
@@ -122,6 +122,14 @@ public interface ConfigBackingStore {
default void claimWritePrivileges() {
}
+ /**
+ * Emit a new level for the specified logging namespace (and all of its children). This level should
+ * be applied by all workers currently in the cluster, but not to workers that join after it is stored.
+ * @param namespace the namespace to adjust; may not be null
+ * @param level the new level for the namespace; may not be null
+ */
+ void putLoggerLevel(String namespace, String level);
+
/**
* Set an update listener to get notifications when there are new records written to the backing store.
* @param listener non-null listener
@@ -164,6 +172,13 @@ public interface ConfigBackingStore {
* @param restartRequest the {@link RestartRequest restart request}
*/
void onRestartRequest(RestartRequest restartRequest);
+
+ /**
+ * Invoked when a dynamic log level adjustment has been read
+ * @param namespace the namespace to adjust; never null
+ * @param level the level to set the namespace to; never null
+ */
+ void onLoggingLevelUpdate(String namespace, String level);
}
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index ad8608b76cd..a95c8249942 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -271,6 +271,14 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
.field(ONLY_FAILED_FIELD_NAME, Schema.BOOLEAN_SCHEMA)
.build();
+ public static final String LOGGER_CLUSTER_PREFIX = "logger-cluster-";
+ public static String LOGGER_CLUSTER_KEY(String namespace) {
+ return LOGGER_CLUSTER_PREFIX + namespace;
+ }
+ public static final Schema LOGGER_LEVEL_V0 = SchemaBuilder.struct()
+ .field("level", Schema.STRING_SCHEMA)
+ .build();
+
// Visible for testing
static final long READ_WRITE_TOTAL_TIMEOUT_MS = 30000;
@@ -732,6 +740,20 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
}
}
+ @Override
+ public void putLoggerLevel(String namespace, String level) {
+ log.debug("Writing level {} for logging namespace {} to Kafka", level, namespace);
+ Struct value = new Struct(LOGGER_LEVEL_V0);
+ value.put("level", level);
+ byte[] serializedValue = converter.fromConnectData(topic, value.schema(), value);
+ try {
+ configLog.sendWithReceipt(LOGGER_CLUSTER_KEY(namespace), serializedValue).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ log.error("Failed to write logger level to Kafka", e);
+ throw new ConnectException("Error writing logger level to Kafka", e);
+ }
+ }
+
// package private for testing
KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) {
String clusterId = config.kafkaClusterId();
@@ -901,6 +923,9 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
processTaskCountRecord(connectorName, value);
} else if (record.key().equals(SESSION_KEY_KEY)) {
processSessionKeyRecord(value);
+ } else if (record.key().startsWith(LOGGER_CLUSTER_PREFIX)) {
+ String loggingNamespace = record.key().substring(LOGGER_CLUSTER_PREFIX.length());
+ processLoggerLevelRecord(loggingNamespace, value);
} else {
log.error("Discarding config update record with invalid key: {}", record.key());
}
@@ -1185,6 +1210,37 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);
}
+ private void processLoggerLevelRecord(String namespace, SchemaAndValue value) {
+ if (value.value() == null) {
+ log.error("Ignoring logging level for namespace {} because it is unexpectedly null", namespace);
+ return;
+ }
+ if (!(value.value() instanceof Map)) {
+ log.error("Ignoring logging level for namespace {} because the value is not a Map but is {}", namespace, className(value.value()));
+ return;
+ }
+
+ @SuppressWarnings("unchecked")
+ Map valueAsMap = (Map) value.value();
+
+ Object level = valueAsMap.get("level");
+ if (!(level instanceof String)) {
+ log.error("Invalid data for logging level key 'level' field with namespace {}; should be a String but it is {}", namespace, className(level));
+ return;
+ }
+
+ if (started) {
+ updateListener.onLoggingLevelUpdate(namespace, (String) level);
+ } else {
+ // TRACE level since there may be many of these records in the config topic
+ log.trace(
+ "Ignoring old logging level {} for namespace {} that was writen to the config topic before this worker completed startup",
+ level,
+ namespace
+ );
+ }
+ }
+
private ConnectorTaskId parseTaskId(String key) {
String[] parts = key.split("-");
if (parts.length < 3) return null;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
index dcdfd71296b..52c360c3b33 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
@@ -169,6 +169,11 @@ public class MemoryConfigBackingStore implements ConfigBackingStore {
// no-op
}
+ @Override
+ public void putLoggerLevel(String namespace, String level) {
+ // no-op
+ }
+
@Override
public synchronized void setUpdateListener(UpdateListener listener) {
this.updateListener = listener;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 42c3831faf4..cd9b9c05171 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -54,9 +54,9 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CO
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
-import static org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.apache.kafka.connect.util.clusters.ConnectAssertions.CONNECTOR_SETUP_DURATION_MS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
index e399eee2ae9..26b4eb11417 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
@@ -53,7 +53,7 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CO
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
-import static org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS;
+import static org.apache.kafka.connect.util.clusters.ConnectAssertions.CONNECTOR_SETUP_DURATION_MS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
index 26b2d7cba16..4eac236810a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
@@ -48,7 +48,7 @@ import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
-import org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions;
+import org.apache.kafka.connect.util.clusters.ConnectAssertions;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
@@ -650,7 +650,7 @@ public class ExactlyOnceSourceIntegrationTest {
final String globalOffsetsTopic = "connect-worker-offsets-topic";
workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, globalOffsetsTopic);
- connectBuilder.clientConfigs(superUserClientConfig);
+ connectBuilder.clientProps(superUserClientConfig);
startConnect();
@@ -1095,7 +1095,7 @@ public class ExactlyOnceSourceIntegrationTest {
private void assertConnectorStarted(StartAndStopLatch connectorStart) throws InterruptedException {
assertTrue("Connector and tasks did not finish startup in time",
connectorStart.await(
- EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS,
+ ConnectAssertions.CONNECTOR_SETUP_DURATION_MS,
TimeUnit.MILLISECONDS
)
);
@@ -1105,7 +1105,7 @@ public class ExactlyOnceSourceIntegrationTest {
assertTrue(
"Connector and tasks did not finish shutdown in time",
connectorStop.await(
- EmbeddedConnectClusterAssertions.CONNECTOR_SHUTDOWN_DURATION_MS,
+ ConnectAssertions.CONNECTOR_SHUTDOWN_DURATION_MS,
TimeUnit.MILLISECONDS
)
);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StandaloneWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StandaloneWorkerIntegrationTest.java
new file mode 100644
index 00000000000..ea938f9a4f6
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StandaloneWorkerIntegrationTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectStandalone;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+@Category(IntegrationTest.class)
+public class StandaloneWorkerIntegrationTest {
+
+ private EmbeddedConnectStandalone connect;
+
+ @Before
+ public void setup() {
+ connect = new EmbeddedConnectStandalone.Builder()
+ .build();
+ connect.start();
+ }
+
+ @After
+ public void cleanup() {
+ connect.stop();
+ }
+
+ @Test
+ public void testDynamicLogging() {
+ Map initialLevels = connect.allLogLevels();
+ assertFalse("Connect REST API did not list any known loggers", initialLevels.isEmpty());
+ Map invalidModifiedLoggers = Utils.filterMap(
+ initialLevels,
+ StandaloneWorkerIntegrationTest::isModified
+ );
+ assertEquals(
+ "No loggers should have a non-null last-modified timestamp",
+ Collections.emptyMap(),
+ invalidModifiedLoggers
+ );
+
+ // Tests with no scope
+ // The current level may match the first level we set the namespace to,
+ // so we issue a preliminary request with a different level to guarantee that a
+ // change takes place and that the last modified timestamp should be non-null
+ final String namespace1 = "org.apache.kafka.connect";
+ final String level1 = "DEBUG";
+ connect.setLogLevel(namespace1, "ERROR", null);
+ Map currentLevels = testSetLoggingLevel(namespace1, level1, null, initialLevels);
+
+ // Tests with scope=worker
+ final String namespace2 = "org.apache.kafka.clients";
+ final String level2 = "INFO";
+ connect.setLogLevel(namespace2, "WARN", "worker");
+ currentLevels = testSetLoggingLevel(namespace2, level2, "worker", currentLevels);
+
+ LoggerLevel priorLoggerLevel = connect.getLogLevel(namespace2);
+ connect.setLogLevel(namespace2, level2, "worker");
+ LoggerLevel currentLoggerLevel = connect.getLogLevel(namespace2);
+ assertEquals(
+ "Log level and last-modified timestamp should not be affected by consecutive identical requests",
+ priorLoggerLevel,
+ currentLoggerLevel
+ );
+
+ // Tests with scope=cluster
+ final String namespace3 = "org.apache.kafka.streams";
+ final String level3 = "TRACE";
+ connect.setLogLevel(namespace3, "DEBUG", "cluster");
+ testSetLoggingLevel(namespace3, level3, "cluster", currentLevels);
+ }
+
+ private Map testSetLoggingLevel(
+ String namespace,
+ String level,
+ String scope,
+ Map initialLevels
+ ) {
+ long requestTime = System.currentTimeMillis();
+ List affectedLoggers = connect.setLogLevel(namespace, level, scope);
+ if ("cluster".equals(scope)) {
+ assertNull(
+ "Modifying log levels with scope=cluster should result in an empty response",
+ affectedLoggers
+ );
+ } else {
+ assertTrue(affectedLoggers.contains(namespace));
+ List invalidAffectedLoggers = affectedLoggers.stream()
+ .filter(l -> !l.startsWith(namespace))
+ .collect(Collectors.toList());
+ assertEquals(
+ "No loggers outside the namespace '" + namespace
+ + "' should have been included in the response for a request to modify that namespace",
+ Collections.emptyList(),
+ invalidAffectedLoggers
+ );
+ }
+
+ // Verify the information for this single logger
+
+ LoggerLevel loggerLevel = connect.getLogLevel(namespace);
+ assertNotNull(loggerLevel);
+ assertEquals(level, loggerLevel.level());
+ assertNotNull(loggerLevel.lastModified());
+ assertTrue(
+ "Last-modified timestamp for logger level is " + loggerLevel.lastModified()
+ + ", which is before " + requestTime + ", the most-recent time the level was adjusted",
+ loggerLevel.lastModified() >= requestTime
+ );
+
+ // Verify information for all listed loggers
+
+ Map newLevels = connect.allLogLevels();
+
+ Map invalidAffectedLoggerLevels = Utils.filterMap(
+ newLevels,
+ e -> hasNamespace(e, namespace)
+ && (!level(e).equals(level)
+ || !isModified(e)
+ || lastModified(e) < requestTime
+ )
+ );
+ assertEquals(
+ "At least one logger in the affected namespace '" + namespace
+ + "' does not have the expected level of '" + level
+ + "', has a null last-modified timestamp, or has a last-modified timestamp "
+ + "that is less recent than " + requestTime
+ + ", which is when the namespace was last adjusted",
+ Collections.emptyMap(),
+ invalidAffectedLoggerLevels
+ );
+
+ Set droppedLoggers = Utils.diff(HashSet::new, initialLevels.keySet(), newLevels.keySet());
+ assertEquals(
+ "At least one logger was present in the listing of all loggers "
+ + "before the logging level for namespace '" + namespace
+ + "' was set to '" + level
+ + "' that is no longer present",
+ Collections.emptySet(),
+ droppedLoggers
+ );
+
+ Map invalidUnaffectedLoggerLevels = Utils.filterMap(
+ newLevels,
+ e -> !hasNamespace(e, namespace) && !e.getValue().equals(initialLevels.get(e.getKey()))
+ );
+ assertEquals(
+ "At least one logger outside of the affected namespace '" + namespace
+ + "' has a different logging level or last-modified timestamp than it did "
+ + "before the namespace was set to level '" + level
+ + "'; none of these loggers should have been affected",
+ Collections.emptyMap(),
+ invalidUnaffectedLoggerLevels
+ );
+
+ return newLevels;
+ }
+
+ private static boolean hasNamespace(Map.Entry entry, String namespace) {
+ return entry.getKey().startsWith(namespace);
+ }
+
+ private static boolean isModified(Map.Entry, LoggerLevel> entry) {
+ return lastModified(entry) != null;
+ }
+
+ private static Long lastModified(Map.Entry, LoggerLevel> entry) {
+ return entry.getValue().lastModified();
+ }
+
+ private static String level(Map.Entry, LoggerLevel> entry) {
+ return entry.getValue().level();
+ }
+
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 6b06c43ce3d..eb86920adff 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.provider.DirectoryConfigProvider;
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
@@ -176,9 +177,7 @@ public class AbstractHerderTest {
@Test
public void testConnectors() {
- AbstractHerder herder = mock(AbstractHerder.class, withSettings()
- .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .defaultAnswer(CALLS_REAL_METHODS));
+ AbstractHerder herder = testHerder();
when(configStore.snapshot()).thenReturn(SNAPSHOT);
assertEquals(Collections.singleton(CONN1), new HashSet<>(herder.connectors()));
@@ -188,9 +187,7 @@ public class AbstractHerderTest {
public void testConnectorClientConfigOverridePolicyClose() {
SampleConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new SampleConnectorClientConfigOverridePolicy();
- AbstractHerder herder = mock(AbstractHerder.class, withSettings()
- .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .defaultAnswer(CALLS_REAL_METHODS));
+ AbstractHerder herder = testHerder(noneConnectorClientConfigOverridePolicy);
herder.stopServices();
assertTrue(noneConnectorClientConfigOverridePolicy.isClosed());
@@ -200,9 +197,7 @@ public class AbstractHerderTest {
public void testConnectorStatus() {
ConnectorTaskId taskId = new ConnectorTaskId(connectorName, 0);
- AbstractHerder herder = mock(AbstractHerder.class, withSettings()
- .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .defaultAnswer(CALLS_REAL_METHODS));
+ AbstractHerder herder = testHerder();
when(plugins.newConnector(anyString())).thenReturn(new SampleSourceConnector());
when(herder.plugins()).thenReturn(plugins);
@@ -236,9 +231,7 @@ public class AbstractHerderTest {
public void testConnectorStatusMissingPlugin() {
ConnectorTaskId taskId = new ConnectorTaskId(connectorName, 0);
- AbstractHerder herder = mock(AbstractHerder.class, withSettings()
- .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .defaultAnswer(CALLS_REAL_METHODS));
+ AbstractHerder herder = testHerder();
when(plugins.newConnector(anyString())).thenThrow(new ConnectException("Unable to find class"));
when(herder.plugins()).thenReturn(plugins);
@@ -269,9 +262,7 @@ public class AbstractHerderTest {
@Test
public void testConnectorInfo() {
- AbstractHerder herder = mock(AbstractHerder.class, withSettings()
- .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .defaultAnswer(CALLS_REAL_METHODS));
+ AbstractHerder herder = testHerder();
when(plugins.newConnector(anyString())).thenReturn(new SampleSourceConnector());
when(herder.plugins()).thenReturn(plugins);
@@ -288,9 +279,7 @@ public class AbstractHerderTest {
@Test
public void testPauseConnector() {
- AbstractHerder herder = mock(AbstractHerder.class, withSettings()
- .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .defaultAnswer(CALLS_REAL_METHODS));
+ AbstractHerder herder = testHerder();
when(configStore.contains(CONN1)).thenReturn(true);
@@ -301,9 +290,7 @@ public class AbstractHerderTest {
@Test
public void testResumeConnector() {
- AbstractHerder herder = mock(AbstractHerder.class, withSettings()
- .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .defaultAnswer(CALLS_REAL_METHODS));
+ AbstractHerder herder = testHerder();
when(configStore.contains(CONN1)).thenReturn(true);
@@ -314,9 +301,7 @@ public class AbstractHerderTest {
@Test
public void testConnectorInfoMissingPlugin() {
- AbstractHerder herder = mock(AbstractHerder.class, withSettings()
- .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .defaultAnswer(CALLS_REAL_METHODS));
+ AbstractHerder herder = testHerder();
when(plugins.newConnector(anyString())).thenThrow(new ConnectException("No class found"));
when(herder.plugins()).thenReturn(plugins);
@@ -336,9 +321,7 @@ public class AbstractHerderTest {
ConnectorTaskId taskId = new ConnectorTaskId(connectorName, 0);
String workerId = "workerId";
- AbstractHerder herder = mock(AbstractHerder.class, withSettings()
- .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .defaultAnswer(CALLS_REAL_METHODS));
+ AbstractHerder herder = testHerder();
final ArgumentCaptor taskStatusArgumentCaptor = ArgumentCaptor.forClass(TaskStatus.class);
doNothing().when(statusStore).putSafe(taskStatusArgumentCaptor.capture());
@@ -358,9 +341,7 @@ public class AbstractHerderTest {
public void testBuildRestartPlanForUnknownConnector() {
String connectorName = "UnknownConnector";
RestartRequest restartRequest = new RestartRequest(connectorName, false, true);
- AbstractHerder herder = mock(AbstractHerder.class, withSettings()
- .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .defaultAnswer(CALLS_REAL_METHODS));
+ AbstractHerder herder = testHerder();
when(statusStore.get(connectorName)).thenReturn(null);
@@ -415,9 +396,7 @@ public class AbstractHerderTest {
taskStatuses.add(new TaskStatus(taskId1, AbstractStatus.State.RUNNING, workerId, generation));
taskStatuses.add(new TaskStatus(taskId2, AbstractStatus.State.FAILED, workerId, generation));
- AbstractHerder herder = mock(AbstractHerder.class, withSettings()
- .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .defaultAnswer(CALLS_REAL_METHODS));
+ AbstractHerder herder = testHerder();
when(herder.rawConfig(connectorName)).thenReturn(null);
@@ -447,9 +426,7 @@ public class AbstractHerderTest {
taskStatuses.add(new TaskStatus(taskId1, AbstractStatus.State.RUNNING, workerId, generation));
taskStatuses.add(new TaskStatus(taskId2, AbstractStatus.State.FAILED, workerId, generation));
- AbstractHerder herder = mock(AbstractHerder.class, withSettings()
- .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .defaultAnswer(CALLS_REAL_METHODS));
+ AbstractHerder herder = testHerder();
when(herder.rawConfig(connectorName)).thenReturn(null);
@@ -1051,9 +1028,7 @@ public class AbstractHerderTest {
Function pluginConfig,
Optional baseConfig
) throws ClassNotFoundException {
- AbstractHerder herder = mock(AbstractHerder.class, withSettings()
- .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .defaultAnswer(CALLS_REAL_METHODS));
+ AbstractHerder herder = testHerder();
when(plugins.pluginClass(pluginName)).then(invocation -> newPluginInstance.get().getClass());
when(plugins.newPlugin(anyString())).then(invocation -> newPluginInstance.get());
@@ -1073,9 +1048,7 @@ public class AbstractHerderTest {
@Test(expected = NotFoundException.class)
public void testGetConnectorConfigDefWithBadName() throws Exception {
String connName = "AnotherPlugin";
- AbstractHerder herder = mock(AbstractHerder.class, withSettings()
- .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .defaultAnswer(CALLS_REAL_METHODS));
+ AbstractHerder herder = testHerder();
when(worker.getPlugins()).thenReturn(plugins);
when(plugins.pluginClass(anyString())).thenThrow(new ClassNotFoundException());
herder.connectorPluginConfig(connName);
@@ -1085,9 +1058,7 @@ public class AbstractHerderTest {
@SuppressWarnings({"rawtypes", "unchecked"})
public void testGetConnectorConfigDefWithInvalidPluginType() throws Exception {
String connName = "AnotherPlugin";
- AbstractHerder herder = mock(AbstractHerder.class, withSettings()
- .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .defaultAnswer(CALLS_REAL_METHODS));
+ AbstractHerder herder = testHerder();
when(worker.getPlugins()).thenReturn(plugins);
when(plugins.pluginClass(anyString())).thenReturn((Class) Object.class);
when(plugins.newPlugin(anyString())).thenReturn(new DirectoryConfigProvider());
@@ -1097,9 +1068,7 @@ public class AbstractHerderTest {
@Test
public void testGetConnectorTypeWithMissingPlugin() {
String connName = "AnotherPlugin";
- AbstractHerder herder = mock(AbstractHerder.class, withSettings()
- .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .defaultAnswer(CALLS_REAL_METHODS));
+ AbstractHerder herder = testHerder();
when(worker.getPlugins()).thenReturn(plugins);
when(plugins.newConnector(anyString())).thenThrow(new ConnectException("No class found"));
assertEquals(ConnectorType.UNKNOWN, herder.connectorType(Collections.singletonMap(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connName)));
@@ -1107,26 +1076,20 @@ public class AbstractHerderTest {
@Test
public void testGetConnectorTypeWithNullConfig() {
- AbstractHerder herder = mock(AbstractHerder.class, withSettings()
- .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .defaultAnswer(CALLS_REAL_METHODS));
+ AbstractHerder herder = testHerder();
assertEquals(ConnectorType.UNKNOWN, herder.connectorType(null));
}
@Test
public void testGetConnectorTypeWithEmptyConfig() {
- AbstractHerder herder = mock(AbstractHerder.class, withSettings()
- .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .defaultAnswer(CALLS_REAL_METHODS));
+ AbstractHerder herder = testHerder();
assertEquals(ConnectorType.UNKNOWN, herder.connectorType(Collections.emptyMap()));
}
@Test
public void testConnectorOffsetsConnectorNotFound() {
when(configStore.snapshot()).thenReturn(SNAPSHOT);
- AbstractHerder herder = mock(AbstractHerder.class, withSettings()
- .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .defaultAnswer(CALLS_REAL_METHODS));
+ AbstractHerder herder = testHerder();
FutureCallback cb = new FutureCallback<>();
herder.connectorOffsets("unknown-connector", cb);
ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS));
@@ -1145,9 +1108,7 @@ public class AbstractHerderTest {
workerCallback.getValue().onCompletion(null, offsets);
return null;
}).when(worker).connectorOffsets(eq(CONN1), eq(CONN1_CONFIG), workerCallback.capture());
- AbstractHerder herder = mock(AbstractHerder.class, withSettings()
- .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .defaultAnswer(CALLS_REAL_METHODS));
+ AbstractHerder herder = testHerder();
when(configStore.snapshot()).thenReturn(SNAPSHOT);
FutureCallback cb = new FutureCallback<>();
@@ -1210,9 +1171,7 @@ public class AbstractHerderTest {
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
int countOfCallingNewConnector) {
- AbstractHerder herder = mock(AbstractHerder.class, withSettings()
- .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, connectorClientConfigOverridePolicy)
- .defaultAnswer(CALLS_REAL_METHODS));
+ AbstractHerder herder = testHerder(connectorClientConfigOverridePolicy);
// Call to validateConnectorConfig
when(worker.configTransformer()).thenReturn(transformer);
@@ -1232,6 +1191,16 @@ public class AbstractHerderTest {
return herder;
}
+ private AbstractHerder testHerder() {
+ return testHerder(noneConnectorClientConfigOverridePolicy);
+ }
+
+ private AbstractHerder testHerder(ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+ return mock(AbstractHerder.class, withSettings()
+ .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, connectorClientConfigOverridePolicy, Time.SYSTEM)
+ .defaultAnswer(CALLS_REAL_METHODS));
+ }
+
private void mockValidationIsolation(String connectorClass, Connector connector) {
when(plugins.newConnector(connectorClass)).thenReturn(connector);
when(plugins.connectorLoader(connectorClass)).thenReturn(classLoader);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/LoggersTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/LoggersTest.java
new file mode 100644
index 00000000000..184724ef255
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/LoggersTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.log4j.Hierarchy;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class LoggersTest {
+
+ private static final long INITIAL_TIME = 1696951712135L;
+ private Time time;
+
+ @Before
+ public void setup() {
+ time = new MockTime(0, INITIAL_TIME, 0);
+ }
+
+ @Test
+ public void testGetLoggersIgnoresNullLevels() {
+ Logger root = logger("root");
+
+ Logger a = logger("a");
+ a.setLevel(null);
+ Logger b = logger("b");
+ b.setLevel(Level.INFO);
+
+ Loggers loggers = new TestLoggers(root, a, b);
+
+ Map expectedLevels = Collections.singletonMap(
+ "b",
+ new LoggerLevel(Level.INFO.toString(), null)
+ );
+ Map actualLevels = loggers.allLevels();
+ assertEquals(expectedLevels, actualLevels);
+ }
+
+ @Test
+ public void testGetLoggerFallsBackToEffectiveLogLevel() {
+ Logger root = logger("root");
+ root.setLevel(Level.ERROR);
+
+ Hierarchy hierarchy = new Hierarchy(root);
+ Logger a = hierarchy.getLogger("a");
+ a.setLevel(null);
+ Logger b = hierarchy.getLogger("b");
+ b.setLevel(Level.INFO);
+
+ Loggers loggers = new TestLoggers(root, a, b);
+
+ LoggerLevel expectedLevel = new LoggerLevel(Level.ERROR.toString(), null);
+ LoggerLevel actualLevel = loggers.level("a");
+ assertEquals(expectedLevel, actualLevel);
+ }
+
+ @Test
+ public void testGetUnknownLogger() {
+ Logger root = logger("root");
+ root.setLevel(Level.ERROR);
+
+ Hierarchy hierarchy = new Hierarchy(root);
+ Logger a = hierarchy.getLogger("a");
+ a.setLevel(null);
+ Logger b = hierarchy.getLogger("b");
+ b.setLevel(Level.INFO);
+
+ Loggers loggers = new TestLoggers(root, a, b);
+
+ LoggerLevel level = loggers.level("c");
+ assertNull(level);
+ }
+
+ @Test
+ public void testSetLevel() {
+ Logger root = logger("root");
+ root.setLevel(Level.ERROR);
+
+ Logger x = logger("a.b.c.p.X");
+ Logger y = logger("a.b.c.p.Y");
+ Logger z = logger("a.b.c.p.Z");
+ Logger w = logger("a.b.c.s.W");
+ x.setLevel(Level.INFO);
+ y.setLevel(Level.INFO);
+ z.setLevel(Level.INFO);
+ w.setLevel(Level.INFO);
+
+ // We don't explicitly register a logger for a.b.c.p, so it won't appear in the list of current loggers;
+ // one should be created by the Loggers instance when we set the level
+ TestLoggers loggers = new TestLoggers(root, x, y, z, w);
+
+ List modified = loggers.setLevel("a.b.c.p", Level.DEBUG);
+ assertEquals(Arrays.asList("a.b.c.p", "a.b.c.p.X", "a.b.c.p.Y", "a.b.c.p.Z"), modified);
+ assertEquals(Level.DEBUG.toString(), loggers.level("a.b.c.p").level());
+ assertEquals(Level.DEBUG, x.getLevel());
+ assertEquals(Level.DEBUG, y.getLevel());
+ assertEquals(Level.DEBUG, z.getLevel());
+
+ LoggerLevel expectedLevel = new LoggerLevel(Level.DEBUG.toString(), INITIAL_TIME);
+ LoggerLevel actualLevel = loggers.level("a.b.c.p");
+ assertEquals(expectedLevel, actualLevel);
+
+ // Sleep a little and adjust the level of a leaf logger
+ time.sleep(10);
+ loggers.setLevel("a.b.c.p.X", Level.ERROR);
+ expectedLevel = new LoggerLevel(Level.ERROR.toString(), INITIAL_TIME + 10);
+ actualLevel = loggers.level("a.b.c.p.X");
+ assertEquals(expectedLevel, actualLevel);
+
+ // Make sure that the direct parent logger and a sibling logger remain unaffected
+ expectedLevel = new LoggerLevel(Level.DEBUG.toString(), INITIAL_TIME);
+ actualLevel = loggers.level("a.b.c.p");
+ assertEquals(expectedLevel, actualLevel);
+
+ expectedLevel = new LoggerLevel(Level.DEBUG.toString(), INITIAL_TIME);
+ actualLevel = loggers.level("a.b.c.p.Y");
+ assertEquals(expectedLevel, actualLevel);
+
+ // Set the same level again, and verify that the last modified time hasn't been altered
+ time.sleep(10);
+ loggers.setLevel("a.b.c.p.X", Level.ERROR);
+ expectedLevel = new LoggerLevel(Level.ERROR.toString(), INITIAL_TIME + 10);
+ actualLevel = loggers.level("a.b.c.p.X");
+ assertEquals(expectedLevel, actualLevel);
+ }
+
+ @Test
+ public void testSetRootLevel() {
+ Logger root = logger("root");
+ root.setLevel(Level.ERROR);
+
+ Logger p = logger("a.b.c.p");
+ Logger x = logger("a.b.c.p.X");
+ Logger y = logger("a.b.c.p.Y");
+ Logger z = logger("a.b.c.p.Z");
+ Logger w = logger("a.b.c.s.W");
+ x.setLevel(Level.INFO);
+ y.setLevel(Level.INFO);
+ z.setLevel(Level.INFO);
+ w.setLevel(Level.INFO);
+
+ Loggers loggers = new TestLoggers(root, x, y, z, w);
+
+ List modified = loggers.setLevel("root", Level.DEBUG);
+ assertEquals(Arrays.asList("a.b.c.p.X", "a.b.c.p.Y", "a.b.c.p.Z", "a.b.c.s.W", "root"), modified);
+
+ assertNull(p.getLevel());
+
+ assertEquals(root.getLevel(), Level.DEBUG);
+
+ assertEquals(w.getLevel(), Level.DEBUG);
+ assertEquals(x.getLevel(), Level.DEBUG);
+ assertEquals(y.getLevel(), Level.DEBUG);
+ assertEquals(z.getLevel(), Level.DEBUG);
+
+ Map expectedLevels = new HashMap<>();
+ expectedLevels.put("root", new LoggerLevel(Level.DEBUG.toString(), INITIAL_TIME));
+ expectedLevels.put("a.b.c.p.X", new LoggerLevel(Level.DEBUG.toString(), INITIAL_TIME));
+ expectedLevels.put("a.b.c.p.Y", new LoggerLevel(Level.DEBUG.toString(), INITIAL_TIME));
+ expectedLevels.put("a.b.c.p.Z", new LoggerLevel(Level.DEBUG.toString(), INITIAL_TIME));
+ expectedLevels.put("a.b.c.s.W", new LoggerLevel(Level.DEBUG.toString(), INITIAL_TIME));
+
+ Map actualLevels = loggers.allLevels();
+ assertEquals(expectedLevels, actualLevels);
+ }
+
+ @Test
+ public void testSetLevelNullArguments() {
+ Logger root = logger("root");
+ Loggers loggers = new TestLoggers(root);
+ assertThrows(NullPointerException.class, () -> loggers.setLevel(null, Level.INFO));
+ assertThrows(NullPointerException.class, () -> loggers.setLevel("root", null));
+ }
+
+ private class TestLoggers extends Loggers {
+
+ private final Logger rootLogger;
+ private final Map currentLoggers;
+
+ public TestLoggers(Logger rootLogger, Logger... knownLoggers) {
+ super(time);
+ this.rootLogger = rootLogger;
+ this.currentLoggers = new HashMap<>(Stream.of(knownLoggers)
+ .collect(Collectors.toMap(
+ Logger::getName,
+ Function.identity()
+ )));
+ }
+
+ @Override
+ Logger lookupLogger(String logger) {
+ return currentLoggers.computeIfAbsent(logger, l -> new Logger(logger) { });
+ }
+
+ @Override
+ Enumeration currentLoggers() {
+ return new Vector<>(currentLoggers.values()).elements();
+ }
+
+ @Override
+ Logger rootLogger() {
+ return rootLogger;
+ }
+ }
+
+ private Logger logger(String name) {
+ return new Logger(name) { };
+ }
+
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
index 2b78900bc25..f2978678bbf 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -56,7 +57,6 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@@ -247,15 +247,18 @@ public class ConnectRestServerTest {
}
@Test
- public void testLoggersEndpointWithDefaults() throws IOException {
+ public void testLoggerEndpointWithDefaults() throws IOException {
Map configMap = new HashMap<>(baseServerProps());
+ final String logger = "a.b.c.s.W";
+ final String loggingLevel = "INFO";
+ final long lastModified = 789052637671L;
+
doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId();
doReturn(plugins).when(herder).plugins();
expectEmptyRestExtensions();
-
- // create some loggers in the process
- LoggerFactory.getLogger("a.b.c.s.W");
+ doReturn(Collections.emptyList()).when(herder).setWorkerLoggerLevel(logger, loggingLevel);
+ doReturn(Collections.singletonMap(logger, new LoggerLevel(loggingLevel, lastModified))).when(herder).allLoggerLevels();
server = new ConnectRestServer(null, null, configMap);
server.initializeServer();
@@ -265,14 +268,16 @@ public class ConnectRestServerTest {
URI serverUrl = server.advertisedUrl();
- executePut(serverUrl, "/admin/loggers/a.b.c.s.W", "{\"level\": \"INFO\"}");
+ executePut(serverUrl, "/admin/loggers/" + logger, "{\"level\": \"" + loggingLevel + "\"}");
String responseStr = executeGet(serverUrl, "/admin/loggers");
- Map> loggers = mapper.readValue(responseStr, new TypeReference