KAFKA-18066: Fix mismatched StreamThread ID in log messages (#19517)
CI / build (push) Waiting to run Details

This PR fixes an issue where the thread name shown in log messages did
not match the actual execution context. Previously, log entries
displayed the context of the newly created thread, while the logger
reflected the current executing thread. This mismatch led to confusion
and made log tracing more difficult.

Changes:
 - Use logger without context to not have context
 - Updated log messages to explicitly describe the thread being created
- Fixed instances where the log context reflected the current thread
instead of the newly created one
 
 Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Uladzislau Blok 2025-07-29 02:00:50 +02:00 committed by GitHub
parent de2adb69de
commit f9ccf83a7f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 16 additions and 19 deletions

View File

@ -77,7 +77,7 @@ class ActiveTaskCreator {
final String threadId,
final int threadIdx,
final UUID processId,
final Logger log,
final LogContext logContext,
final boolean stateUpdaterEnabled,
final boolean processingThreadsEnabled) {
this.topologyMetadata = topologyMetadata;
@ -91,15 +91,12 @@ class ActiveTaskCreator {
this.threadId = threadId;
this.threadIdx = threadIdx;
this.processId = processId;
this.log = log;
this.log = logContext.logger(getClass());
this.stateUpdaterEnabled = stateUpdaterEnabled;
this.processingThreadsEnabled = processingThreadsEnabled;
createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics);
final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName());
final LogContext logContext = new LogContext(threadIdPrefix);
streamsProducer = new StreamsProducer(
producer(),
processingMode(applicationConfig),

View File

@ -52,20 +52,20 @@ class StandbyTaskCreator {
final StateDirectory stateDirectory,
final ChangelogReader storeChangelogReader,
final String threadId,
final Logger log,
final LogContext logContext,
final boolean stateUpdaterEnabled) {
this.topologyMetadata = topologyMetadata;
this.applicationConfig = applicationConfig;
this.streamsMetrics = streamsMetrics;
this.stateDirectory = stateDirectory;
this.storeChangelogReader = storeChangelogReader;
this.log = log;
this.log = logContext.logger(getClass());
this.stateUpdaterEnabled = stateUpdaterEnabled;
createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics);
dummyCache = new ThreadCache(
new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())),
logContext,
0,
streamsMetrics
);

View File

@ -73,6 +73,7 @@ import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
@ -397,7 +398,7 @@ public class StreamThread extends Thread implements ProcessingThread {
final String logPrefix = String.format("stream-thread [%s] ", threadId);
final LogContext logContext = new LogContext(logPrefix);
final LogContext restorationLogContext = stateUpdaterEnabled ? new LogContext(String.format("state-updater [%s] ", restorationThreadId)) : logContext;
final Logger log = logContext.logger(StreamThread.class);
final Logger log = LoggerFactory.getLogger(StreamThread.class);
final ReferenceContainer referenceContainer = new ReferenceContainer();
referenceContainer.adminClient = adminClient;
@ -405,7 +406,7 @@ public class StreamThread extends Thread implements ProcessingThread {
referenceContainer.time = time;
referenceContainer.clientTags = config.getClientTags();
log.info("Creating restore consumer client");
log.info("Creating restore consumer client for thread {}", threadId);
final Map<String, Object> restoreConsumerConfigs = config.getRestoreConsumerConfigs(restoreConsumerClientId(restorationThreadId));
final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
@ -434,7 +435,7 @@ public class StreamThread extends Thread implements ProcessingThread {
threadId,
threadIdx,
processId,
log,
logContext,
stateUpdaterEnabled,
proceessingThreadsEnabled
);
@ -445,10 +446,10 @@ public class StreamThread extends Thread implements ProcessingThread {
stateDirectory,
changelogReader,
threadId,
log,
logContext,
stateUpdaterEnabled);
final Tasks tasks = new Tasks(new LogContext(logPrefix));
final Tasks tasks = new Tasks(logContext);
final boolean processingThreadsEnabled =
InternalConfig.processingThreadsEnabled(config.originals());
@ -483,7 +484,7 @@ public class StreamThread extends Thread implements ProcessingThread {
);
referenceContainer.taskManager = taskManager;
log.info("Creating consumer client");
log.info("Creating consumer client for thread {}", threadId);
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
final Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId, consumerClientId(threadId), threadIdx);
consumerConfigs.put(StreamsConfig.InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
@ -494,7 +495,7 @@ public class StreamThread extends Thread implements ProcessingThread {
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
}
final MainConsumerSetup mainConsumerSetup = setupMainConsumer(topologyMetadata, config, clientSupplier, processId, log, consumerConfigs);
final MainConsumerSetup mainConsumerSetup = setupMainConsumer(topologyMetadata, config, clientSupplier, processId, log, threadId, consumerConfigs);
taskManager.setMainConsumer(mainConsumerSetup.mainConsumer);
referenceContainer.mainConsumer = mainConsumerSetup.mainConsumer;
@ -535,6 +536,7 @@ public class StreamThread extends Thread implements ProcessingThread {
final KafkaClientSupplier clientSupplier,
final UUID processId,
final Logger log,
final String threadId,
final Map<String, Object> consumerConfigs) {
if (config.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name)) {
if (topologyMetadata.hasNamedTopologies()) {

View File

@ -279,7 +279,7 @@ public class ActiveTaskCreatorTest {
"clientId-StreamThread-0",
0,
uuid,
new LogContext().logger(ActiveTaskCreator.class),
new LogContext(),
false,
false);

View File

@ -117,7 +117,6 @@ import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
@ -4139,7 +4138,6 @@ public class StreamThreadTest {
// TODO: change return type to `StandbyTask`
private Collection<Task> createStandbyTask(final StreamsConfig config) {
final LogContext logContext = new LogContext("test");
final Logger log = logContext.logger(StreamThreadTest.class);
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime);
final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
@ -4149,7 +4147,7 @@ public class StreamThreadTest {
stateDirectory,
new MockChangelogReader(),
CLIENT_ID,
log,
logContext,
false);
return standbyTaskCreator.createTasks(singletonMap(new TaskId(1, 2), emptySet()));
}