diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java index 93953d815be..448853c6367 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java @@ -77,7 +77,7 @@ class ActiveTaskCreator { final String threadId, final int threadIdx, final UUID processId, - final Logger log, + final LogContext logContext, final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { this.topologyMetadata = topologyMetadata; @@ -91,15 +91,12 @@ class ActiveTaskCreator { this.threadId = threadId; this.threadIdx = threadIdx; this.processId = processId; - this.log = log; + this.log = logContext.logger(getClass()); this.stateUpdaterEnabled = stateUpdaterEnabled; this.processingThreadsEnabled = processingThreadsEnabled; createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics); - final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName()); - final LogContext logContext = new LogContext(threadIdPrefix); - streamsProducer = new StreamsProducer( producer(), processingMode(applicationConfig), diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java index 668bd832c86..eb8bcafea69 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java @@ -52,20 +52,20 @@ class StandbyTaskCreator { final StateDirectory stateDirectory, final ChangelogReader storeChangelogReader, final String threadId, - final Logger log, + final LogContext logContext, final boolean stateUpdaterEnabled) { this.topologyMetadata = topologyMetadata; this.applicationConfig = applicationConfig; this.streamsMetrics = streamsMetrics; this.stateDirectory = stateDirectory; this.storeChangelogReader = storeChangelogReader; - this.log = log; + this.log = logContext.logger(getClass()); this.stateUpdaterEnabled = stateUpdaterEnabled; createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics); dummyCache = new ThreadCache( - new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())), + logContext, 0, streamsMetrics ); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 85700074a77..9509369fa2a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -73,6 +73,7 @@ import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.internals.ThreadCache; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Arrays; @@ -397,7 +398,7 @@ public class StreamThread extends Thread implements ProcessingThread { final String logPrefix = String.format("stream-thread [%s] ", threadId); final LogContext logContext = new LogContext(logPrefix); final LogContext restorationLogContext = stateUpdaterEnabled ? new LogContext(String.format("state-updater [%s] ", restorationThreadId)) : logContext; - final Logger log = logContext.logger(StreamThread.class); + final Logger log = LoggerFactory.getLogger(StreamThread.class); final ReferenceContainer referenceContainer = new ReferenceContainer(); referenceContainer.adminClient = adminClient; @@ -405,7 +406,7 @@ public class StreamThread extends Thread implements ProcessingThread { referenceContainer.time = time; referenceContainer.clientTags = config.getClientTags(); - log.info("Creating restore consumer client"); + log.info("Creating restore consumer client for thread {}", threadId); final Map restoreConsumerConfigs = config.getRestoreConsumerConfigs(restoreConsumerClientId(restorationThreadId)); final Consumer restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs); @@ -434,7 +435,7 @@ public class StreamThread extends Thread implements ProcessingThread { threadId, threadIdx, processId, - log, + logContext, stateUpdaterEnabled, proceessingThreadsEnabled ); @@ -445,10 +446,10 @@ public class StreamThread extends Thread implements ProcessingThread { stateDirectory, changelogReader, threadId, - log, + logContext, stateUpdaterEnabled); - final Tasks tasks = new Tasks(new LogContext(logPrefix)); + final Tasks tasks = new Tasks(logContext); final boolean processingThreadsEnabled = InternalConfig.processingThreadsEnabled(config.originals()); @@ -483,7 +484,7 @@ public class StreamThread extends Thread implements ProcessingThread { ); referenceContainer.taskManager = taskManager; - log.info("Creating consumer client"); + log.info("Creating consumer client for thread {}", threadId); final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); final Map consumerConfigs = config.getMainConsumerConfigs(applicationId, consumerClientId(threadId), threadIdx); consumerConfigs.put(StreamsConfig.InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer); @@ -494,7 +495,7 @@ public class StreamThread extends Thread implements ProcessingThread { consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); } - final MainConsumerSetup mainConsumerSetup = setupMainConsumer(topologyMetadata, config, clientSupplier, processId, log, consumerConfigs); + final MainConsumerSetup mainConsumerSetup = setupMainConsumer(topologyMetadata, config, clientSupplier, processId, log, threadId, consumerConfigs); taskManager.setMainConsumer(mainConsumerSetup.mainConsumer); referenceContainer.mainConsumer = mainConsumerSetup.mainConsumer; @@ -535,6 +536,7 @@ public class StreamThread extends Thread implements ProcessingThread { final KafkaClientSupplier clientSupplier, final UUID processId, final Logger log, + final String threadId, final Map consumerConfigs) { if (config.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name)) { if (topologyMetadata.hasNamedTopologies()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java index 17eb27bb3d8..362c32592ca 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java @@ -279,7 +279,7 @@ public class ActiveTaskCreatorTest { "clientId-StreamThread-0", 0, uuid, - new LogContext().logger(ActiveTaskCreator.class), + new LogContext(), false, false); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 2d56ed6f402..a6ab8d8209a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -117,7 +117,6 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import org.slf4j.Logger; import java.io.File; import java.io.IOException; @@ -4139,7 +4138,6 @@ public class StreamThreadTest { // TODO: change return type to `StandbyTask` private Collection createStandbyTask(final StreamsConfig config) { final LogContext logContext = new LogContext("test"); - final Logger log = logContext.logger(StreamThreadTest.class); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator( @@ -4149,7 +4147,7 @@ public class StreamThreadTest { stateDirectory, new MockChangelogReader(), CLIENT_ID, - log, + logContext, false); return standbyTaskCreator.createTasks(singletonMap(new TaskId(1, 2), emptySet())); }