MINOR: Add generation to consumer assignor logs

This commit is contained in:
John Roesler 2022-04-08 13:06:52 -05:00 committed by John Roesler
parent 8380d2edf4
commit 8eaca9087a
4 changed files with 409 additions and 10 deletions

View File

@ -68,6 +68,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -83,6 +84,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ASSIGN_FROM_SUBSCRIBED_ASSIGNORS;
@ -171,13 +173,23 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
metricGrpPrefix,
time);
this.rebalanceConfig = rebalanceConfig;
this.log = logContext.logger(ConsumerCoordinator.class);
final Supplier<String> dynamicPrefix =
() -> logContext.logPrefix() + "[generationId=" + generation().generationId + "] ";
this.log = new DynamicPrefixLogger(
dynamicPrefix,
LoggerFactory.getLogger(ConsumerCoordinator.class)
);
this.metadata = metadata;
this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch(), metadata.updateVersion());
this.subscriptions = subscriptions;
this.defaultOffsetCommitCallback = new DefaultOffsetCommitCallback();
this.autoCommitEnabled = autoCommitEnabled;
this.autoCommitIntervalMs = autoCommitIntervalMs;
for (final ConsumerPartitionAssignor assignor : assignors) {
if (assignor instanceof ContextualLogging) {
((ContextualLogging) assignor).setLoggingContext(dynamicPrefix);
}
}
this.assignors = assignors;
this.completedOffsetCommits = new ConcurrentLinkedQueue<>();
this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
@ -677,8 +689,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
isLeader = true;
if (skipAssignment) {
log.info("Skipped assignment for returning static leader at generation {}. The static leader " +
"will continue with its existing assignment.", generation().generationId);
log.info("Skipped assignment for returning static leader. The static leader " +
"will continue with its existing assignment.");
assignmentSnapshot = metadataSnapshot;
return Collections.emptyMap();
}
@ -699,7 +711,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// we must take the assignment snapshot after.
assignmentSnapshot = metadataSnapshot;
log.info("Finished assignment for group at generation {}: {}", generation().generationId, assignments);
log.info("Finished assignment for group: {}", assignments);
Map<String, ByteBuffer> groupAssignment = new HashMap<>();
for (Map.Entry<String, Assignment> assignmentEntry : assignments.entrySet()) {
@ -1218,7 +1230,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
private RequestFuture<Void> maybeAutoCommitOffsetsAsync() {
if (autoCommitEnabled)
return autoCommitOffsetsAsync();
return null;
return null;
}
private class DefaultOffsetCommitCallback implements OffsetCommitCallback {

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import java.util.function.Supplier;
public interface ContextualLogging {
void setLoggingContext(Supplier<String> loggingContext);
}

View File

@ -0,0 +1,353 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import org.slf4j.Logger;
import org.slf4j.Marker;
import java.util.function.Supplier;
public final class DynamicPrefixLogger implements Logger {
private final Supplier<String> prefix;
private final Logger delegate;
public DynamicPrefixLogger(final Supplier<String> prefix, final Logger delegate) {
this.prefix = prefix;
this.delegate = delegate;
}
@Override
public String getName() {
return delegate.getName();
}
@Override
public boolean isTraceEnabled() {
return delegate.isTraceEnabled();
}
@Override
public void trace(final String msg) {
delegate.trace(prefix.get() + msg);
}
@Override
public void trace(final String format, final Object arg) {
delegate.trace(prefix.get() + format, arg);
}
@Override
public void trace(final String format, final Object arg1, final Object arg2) {
delegate.trace(prefix.get() + format, arg1, arg2);
}
@Override
public void trace(final String format, final Object... arguments) {
delegate.trace(prefix.get() + format, arguments);
}
@Override
public void trace(final String msg, final Throwable t) {
delegate.trace(prefix.get() + msg, t);
}
@Override
public boolean isTraceEnabled(final Marker marker) {
return delegate.isTraceEnabled(marker);
}
@Override
public void trace(final Marker marker, final String msg) {
delegate.trace(marker, prefix.get() + msg);
}
@Override
public void trace(final Marker marker, final String format, final Object arg) {
delegate.trace(marker, prefix.get() + format, arg);
}
@Override
public void trace(final Marker marker,
final String format,
final Object arg1,
final Object arg2) {
delegate.trace(marker, prefix.get() + format, arg1, arg2);
}
@Override
public void trace(final Marker marker, final String format, final Object... argArray) {
delegate.trace(marker, prefix.get() + format, argArray);
}
@Override
public void trace(final Marker marker, final String msg, final Throwable t) {
delegate.trace(marker, prefix.get() + msg, t);
}
@Override
public boolean isDebugEnabled() {
return delegate.isDebugEnabled();
}
@Override
public void debug(final String msg) {
delegate.debug(prefix.get() + msg);
}
@Override
public void debug(final String format, final Object arg) {
delegate.debug(prefix.get() + format, arg);
}
@Override
public void debug(final String format, final Object arg1, final Object arg2) {
delegate.debug(prefix.get() + format, arg1, arg2);
}
@Override
public void debug(final String format, final Object... arguments) {
delegate.debug(prefix.get() + format, arguments);
}
@Override
public void debug(final String msg, final Throwable t) {
delegate.debug(prefix.get() + msg, t);
}
@Override
public boolean isDebugEnabled(final Marker marker) {
return delegate.isDebugEnabled(marker);
}
@Override
public void debug(final Marker marker, final String msg) {
delegate.debug(marker, prefix.get() + msg);
}
@Override
public void debug(final Marker marker, final String format, final Object arg) {
delegate.debug(marker, prefix.get() + format, arg);
}
@Override
public void debug(final Marker marker,
final String format,
final Object arg1,
final Object arg2) {
delegate.debug(marker, prefix.get() + format, arg1, arg2);
}
@Override
public void debug(final Marker marker, final String format, final Object... arguments) {
delegate.debug(marker, prefix.get() + format, arguments);
}
@Override
public void debug(final Marker marker, final String msg, final Throwable t) {
delegate.debug(marker, prefix.get() + msg, t);
}
@Override
public boolean isInfoEnabled() {
return delegate.isInfoEnabled();
}
@Override
public void info(final String msg) {
delegate.info(prefix.get() + msg);
}
@Override
public void info(final String format, final Object arg) {
delegate.info(prefix.get() + format, arg);
}
@Override
public void info(final String format, final Object arg1, final Object arg2) {
delegate.info(prefix.get() + format, arg1, arg2);
}
@Override
public void info(final String format, final Object... arguments) {
delegate.info(prefix.get() + format, arguments);
}
@Override
public void info(final String msg, final Throwable t) {
delegate.info(prefix.get() + msg, t);
}
@Override
public boolean isInfoEnabled(final Marker marker) {
return delegate.isInfoEnabled(marker);
}
@Override
public void info(final Marker marker, final String msg) {
delegate.info(marker, prefix.get() + msg);
}
@Override
public void info(final Marker marker, final String format, final Object arg) {
delegate.info(marker, prefix.get() + format, arg);
}
@Override
public void info(final Marker marker,
final String format,
final Object arg1,
final Object arg2) {
delegate.info(marker, prefix.get() + format, arg1, arg2);
}
@Override
public void info(final Marker marker, final String format, final Object... arguments) {
delegate.info(marker, prefix.get() + format, arguments);
}
@Override
public void info(final Marker marker, final String msg, final Throwable t) {
delegate.info(marker, prefix.get() + msg, t);
}
@Override
public boolean isWarnEnabled() {
return delegate.isWarnEnabled();
}
@Override
public void warn(final String msg) {
delegate.warn(prefix.get() + msg);
}
@Override
public void warn(final String format, final Object arg) {
delegate.warn(prefix.get() + format, arg);
}
@Override
public void warn(final String format, final Object... arguments) {
delegate.warn(prefix.get() + format, arguments);
}
@Override
public void warn(final String format, final Object arg1, final Object arg2) {
delegate.warn(prefix.get() + format, arg1, arg2);
}
@Override
public void warn(final String msg, final Throwable t) {
delegate.warn(prefix.get() + msg, t);
}
@Override
public boolean isWarnEnabled(final Marker marker) {
return delegate.isWarnEnabled(marker);
}
@Override
public void warn(final Marker marker, final String msg) {
delegate.warn(marker, prefix.get() + msg);
}
@Override
public void warn(final Marker marker, final String format, final Object arg) {
delegate.warn(marker, prefix.get() + format, arg);
}
@Override
public void warn(final Marker marker,
final String format,
final Object arg1,
final Object arg2) {
delegate.warn(marker, prefix.get() + format, arg1, arg2);
}
@Override
public void warn(final Marker marker, final String format, final Object... arguments) {
delegate.warn(marker, prefix.get() + format, arguments);
}
@Override
public void warn(final Marker marker, final String msg, final Throwable t) {
delegate.warn(marker, prefix.get() + msg, t);
}
@Override
public boolean isErrorEnabled() {
return delegate.isErrorEnabled();
}
@Override
public void error(final String msg) {
delegate.error(prefix.get() + msg);
}
@Override
public void error(final String format, final Object arg) {
delegate.error(prefix.get() + format, arg);
}
@Override
public void error(final String format, final Object arg1, final Object arg2) {
delegate.error(prefix.get() + format, arg1, arg2);
}
@Override
public void error(final String format, final Object... arguments) {
delegate.error(prefix.get() + format, arguments);
}
@Override
public void error(final String msg, final Throwable t) {
delegate.error(prefix.get() + msg, t);
}
@Override
public boolean isErrorEnabled(final Marker marker) {
return delegate.isErrorEnabled(marker);
}
@Override
public void error(final Marker marker, final String msg) {
delegate.error(marker, prefix.get() + msg);
}
@Override
public void error(final Marker marker, final String format, final Object arg) {
delegate.error(marker, format, arg);
}
@Override
public void error(final Marker marker,
final String format,
final Object arg1,
final Object arg2) {
delegate.error(marker, prefix.get() + format, arg1, arg2);
}
@Override
public void error(final Marker marker, final String format, final Object... arguments) {
delegate.error(marker, prefix.get() + format, arguments);
}
@Override
public void error(final Marker marker, final String msg, final Throwable t) {
delegate.error(marker, prefix.get() + msg, t);
}
}

View File

@ -21,6 +21,8 @@ import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.ContextualLogging;
import org.apache.kafka.clients.consumer.internals.DynamicPrefixLogger;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
@ -29,7 +31,6 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
@ -52,6 +53,7 @@ import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.time.Instant;
@ -78,7 +80,6 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import static java.util.UUID.randomUUID;
import static org.apache.kafka.common.utils.Utils.filterMap;
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets;
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsFuture;
@ -87,10 +88,21 @@ import static org.apache.kafka.streams.processor.internals.assignment.StreamsAss
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN;
import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM;
public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Configurable {
public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Configurable,
ContextualLogging {
private Logger log;
// set first via configure()
private String logPrefix;
// overwritten via setLoggingContext() if it is called.
private Logger log = LoggerFactory.getLogger(StreamsPartitionAssignor.class);
@Override
public void setLoggingContext(final Supplier<String> loggingContext) {
this.log = new DynamicPrefixLogger(
() -> loggingContext.get() + logPrefix,
LoggerFactory.getLogger(StreamsPartitionAssignor.class)
);
}
private static class AssignedPartition implements Comparable<AssignedPartition> {
@ -204,7 +216,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(configs);
logPrefix = assignorConfiguration.logPrefix();
log = new LogContext(logPrefix).logger(getClass());
usedSubscriptionMetadataVersion = assignorConfiguration.configuredMetadataVersion(usedSubscriptionMetadataVersion);
final ReferenceContainer referenceContainer = assignorConfiguration.referenceContainer();