diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 9838e7dc8fe..b965b91f0fc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -68,6 +68,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -83,6 +84,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.kafka.clients.consumer.ConsumerConfig.ASSIGN_FROM_SUBSCRIBED_ASSIGNORS; @@ -171,13 +173,23 @@ public final class ConsumerCoordinator extends AbstractCoordinator { metricGrpPrefix, time); this.rebalanceConfig = rebalanceConfig; - this.log = logContext.logger(ConsumerCoordinator.class); + final Supplier dynamicPrefix = + () -> logContext.logPrefix() + "[generationId=" + generation().generationId + "] "; + this.log = new DynamicPrefixLogger( + dynamicPrefix, + LoggerFactory.getLogger(ConsumerCoordinator.class) + ); this.metadata = metadata; this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch(), metadata.updateVersion()); this.subscriptions = subscriptions; this.defaultOffsetCommitCallback = new DefaultOffsetCommitCallback(); this.autoCommitEnabled = autoCommitEnabled; this.autoCommitIntervalMs = autoCommitIntervalMs; + for (final ConsumerPartitionAssignor assignor : assignors) { + if (assignor instanceof ContextualLogging) { + ((ContextualLogging) assignor).setLoggingContext(dynamicPrefix); + } + } this.assignors = assignors; this.completedOffsetCommits = new ConcurrentLinkedQueue<>(); this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix); @@ -677,8 +689,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { isLeader = true; if (skipAssignment) { - log.info("Skipped assignment for returning static leader at generation {}. The static leader " + - "will continue with its existing assignment.", generation().generationId); + log.info("Skipped assignment for returning static leader. The static leader " + + "will continue with its existing assignment."); assignmentSnapshot = metadataSnapshot; return Collections.emptyMap(); } @@ -699,7 +711,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // we must take the assignment snapshot after. assignmentSnapshot = metadataSnapshot; - log.info("Finished assignment for group at generation {}: {}", generation().generationId, assignments); + log.info("Finished assignment for group: {}", assignments); Map groupAssignment = new HashMap<>(); for (Map.Entry assignmentEntry : assignments.entrySet()) { @@ -1218,7 +1230,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private RequestFuture maybeAutoCommitOffsetsAsync() { if (autoCommitEnabled) return autoCommitOffsetsAsync(); - return null; + return null; } private class DefaultOffsetCommitCallback implements OffsetCommitCallback { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ContextualLogging.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ContextualLogging.java new file mode 100644 index 00000000000..c0765549a9f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ContextualLogging.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import java.util.function.Supplier; + +public interface ContextualLogging { + void setLoggingContext(Supplier loggingContext); +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicPrefixLogger.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicPrefixLogger.java new file mode 100644 index 00000000000..e2d9252d2b0 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicPrefixLogger.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.slf4j.Logger; +import org.slf4j.Marker; + +import java.util.function.Supplier; + +public final class DynamicPrefixLogger implements Logger { + + private final Supplier prefix; + private final Logger delegate; + + public DynamicPrefixLogger(final Supplier prefix, final Logger delegate) { + this.prefix = prefix; + this.delegate = delegate; + } + + @Override + public String getName() { + return delegate.getName(); + } + + @Override + public boolean isTraceEnabled() { + return delegate.isTraceEnabled(); + } + + @Override + public void trace(final String msg) { + delegate.trace(prefix.get() + msg); + } + + @Override + public void trace(final String format, final Object arg) { + delegate.trace(prefix.get() + format, arg); + } + + @Override + public void trace(final String format, final Object arg1, final Object arg2) { + delegate.trace(prefix.get() + format, arg1, arg2); + } + + @Override + public void trace(final String format, final Object... arguments) { + delegate.trace(prefix.get() + format, arguments); + } + + @Override + public void trace(final String msg, final Throwable t) { + delegate.trace(prefix.get() + msg, t); + } + + @Override + public boolean isTraceEnabled(final Marker marker) { + return delegate.isTraceEnabled(marker); + } + + @Override + public void trace(final Marker marker, final String msg) { + delegate.trace(marker, prefix.get() + msg); + } + + @Override + public void trace(final Marker marker, final String format, final Object arg) { + delegate.trace(marker, prefix.get() + format, arg); + } + + @Override + public void trace(final Marker marker, + final String format, + final Object arg1, + final Object arg2) { + delegate.trace(marker, prefix.get() + format, arg1, arg2); + } + + @Override + public void trace(final Marker marker, final String format, final Object... argArray) { + delegate.trace(marker, prefix.get() + format, argArray); + } + + @Override + public void trace(final Marker marker, final String msg, final Throwable t) { + delegate.trace(marker, prefix.get() + msg, t); + } + + @Override + public boolean isDebugEnabled() { + return delegate.isDebugEnabled(); + } + + @Override + public void debug(final String msg) { + delegate.debug(prefix.get() + msg); + } + + @Override + public void debug(final String format, final Object arg) { + delegate.debug(prefix.get() + format, arg); + } + + @Override + public void debug(final String format, final Object arg1, final Object arg2) { + delegate.debug(prefix.get() + format, arg1, arg2); + } + + @Override + public void debug(final String format, final Object... arguments) { + delegate.debug(prefix.get() + format, arguments); + } + + @Override + public void debug(final String msg, final Throwable t) { + delegate.debug(prefix.get() + msg, t); + } + + @Override + public boolean isDebugEnabled(final Marker marker) { + return delegate.isDebugEnabled(marker); + } + + @Override + public void debug(final Marker marker, final String msg) { + delegate.debug(marker, prefix.get() + msg); + } + + @Override + public void debug(final Marker marker, final String format, final Object arg) { + delegate.debug(marker, prefix.get() + format, arg); + } + + @Override + public void debug(final Marker marker, + final String format, + final Object arg1, + final Object arg2) { + delegate.debug(marker, prefix.get() + format, arg1, arg2); + } + + @Override + public void debug(final Marker marker, final String format, final Object... arguments) { + delegate.debug(marker, prefix.get() + format, arguments); + } + + @Override + public void debug(final Marker marker, final String msg, final Throwable t) { + delegate.debug(marker, prefix.get() + msg, t); + } + + @Override + public boolean isInfoEnabled() { + return delegate.isInfoEnabled(); + } + + @Override + public void info(final String msg) { + delegate.info(prefix.get() + msg); + } + + @Override + public void info(final String format, final Object arg) { + delegate.info(prefix.get() + format, arg); + } + + @Override + public void info(final String format, final Object arg1, final Object arg2) { + delegate.info(prefix.get() + format, arg1, arg2); + } + + @Override + public void info(final String format, final Object... arguments) { + delegate.info(prefix.get() + format, arguments); + } + + @Override + public void info(final String msg, final Throwable t) { + delegate.info(prefix.get() + msg, t); + } + + @Override + public boolean isInfoEnabled(final Marker marker) { + return delegate.isInfoEnabled(marker); + } + + @Override + public void info(final Marker marker, final String msg) { + delegate.info(marker, prefix.get() + msg); + } + + @Override + public void info(final Marker marker, final String format, final Object arg) { + delegate.info(marker, prefix.get() + format, arg); + } + + @Override + public void info(final Marker marker, + final String format, + final Object arg1, + final Object arg2) { + delegate.info(marker, prefix.get() + format, arg1, arg2); + } + + @Override + public void info(final Marker marker, final String format, final Object... arguments) { + delegate.info(marker, prefix.get() + format, arguments); + } + + @Override + public void info(final Marker marker, final String msg, final Throwable t) { + delegate.info(marker, prefix.get() + msg, t); + } + + @Override + public boolean isWarnEnabled() { + return delegate.isWarnEnabled(); + } + + @Override + public void warn(final String msg) { + delegate.warn(prefix.get() + msg); + } + + @Override + public void warn(final String format, final Object arg) { + delegate.warn(prefix.get() + format, arg); + } + + @Override + public void warn(final String format, final Object... arguments) { + delegate.warn(prefix.get() + format, arguments); + } + + @Override + public void warn(final String format, final Object arg1, final Object arg2) { + delegate.warn(prefix.get() + format, arg1, arg2); + } + + @Override + public void warn(final String msg, final Throwable t) { + delegate.warn(prefix.get() + msg, t); + } + + @Override + public boolean isWarnEnabled(final Marker marker) { + return delegate.isWarnEnabled(marker); + } + + @Override + public void warn(final Marker marker, final String msg) { + delegate.warn(marker, prefix.get() + msg); + } + + @Override + public void warn(final Marker marker, final String format, final Object arg) { + delegate.warn(marker, prefix.get() + format, arg); + } + + @Override + public void warn(final Marker marker, + final String format, + final Object arg1, + final Object arg2) { + delegate.warn(marker, prefix.get() + format, arg1, arg2); + } + + @Override + public void warn(final Marker marker, final String format, final Object... arguments) { + delegate.warn(marker, prefix.get() + format, arguments); + } + + @Override + public void warn(final Marker marker, final String msg, final Throwable t) { + delegate.warn(marker, prefix.get() + msg, t); + } + + @Override + public boolean isErrorEnabled() { + return delegate.isErrorEnabled(); + } + + @Override + public void error(final String msg) { + delegate.error(prefix.get() + msg); + } + + @Override + public void error(final String format, final Object arg) { + delegate.error(prefix.get() + format, arg); + } + + @Override + public void error(final String format, final Object arg1, final Object arg2) { + delegate.error(prefix.get() + format, arg1, arg2); + } + + @Override + public void error(final String format, final Object... arguments) { + delegate.error(prefix.get() + format, arguments); + } + + @Override + public void error(final String msg, final Throwable t) { + delegate.error(prefix.get() + msg, t); + } + + @Override + public boolean isErrorEnabled(final Marker marker) { + return delegate.isErrorEnabled(marker); + } + + @Override + public void error(final Marker marker, final String msg) { + delegate.error(marker, prefix.get() + msg); + } + + @Override + public void error(final Marker marker, final String format, final Object arg) { + delegate.error(marker, format, arg); + } + + @Override + public void error(final Marker marker, + final String format, + final Object arg1, + final Object arg2) { + delegate.error(marker, prefix.get() + format, arg1, arg2); + } + + @Override + public void error(final Marker marker, final String format, final Object... arguments) { + delegate.error(marker, prefix.get() + format, arguments); + } + + @Override + public void error(final Marker marker, final String msg, final Throwable t) { + delegate.error(marker, prefix.get() + msg, t); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 288d5d21fe9..c68e5ee3499 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -21,6 +21,8 @@ import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.internals.ContextualLogging; +import org.apache.kafka.clients.consumer.internals.DynamicPrefixLogger; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.KafkaException; @@ -29,7 +31,6 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.MissingSourceTopicException; @@ -52,6 +53,7 @@ import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor; import org.apache.kafka.streams.state.HostInfo; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.time.Instant; @@ -78,7 +80,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import static java.util.UUID.randomUUID; - import static org.apache.kafka.common.utils.Utils.filterMap; import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets; import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsFuture; @@ -87,10 +88,21 @@ import static org.apache.kafka.streams.processor.internals.assignment.StreamsAss import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN; import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM; -public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Configurable { +public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Configurable, + ContextualLogging { - private Logger log; + // set first via configure() private String logPrefix; + // overwritten via setLoggingContext() if it is called. + private Logger log = LoggerFactory.getLogger(StreamsPartitionAssignor.class); + + @Override + public void setLoggingContext(final Supplier loggingContext) { + this.log = new DynamicPrefixLogger( + () -> loggingContext.get() + logPrefix, + LoggerFactory.getLogger(StreamsPartitionAssignor.class) + ); + } private static class AssignedPartition implements Comparable { @@ -204,7 +216,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(configs); logPrefix = assignorConfiguration.logPrefix(); - log = new LogContext(logPrefix).logger(getClass()); usedSubscriptionMetadataVersion = assignorConfiguration.configuredMetadataVersion(usedSubscriptionMetadataVersion); final ReferenceContainer referenceContainer = assignorConfiguration.referenceContainer();