KAFKA-17744: Improve state updater logs when restorating state (#17714)

The logs for Kafka Streams local state restoration incorrectly refer to the StreamThread instead of the StateUpdater thread, which is responsible for decoupling the restoration process. The restore consumer also references StreamThread instead of StateUpdater. 

This commit corrects the log message for more clarity.

Reviewer: Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
Sebastien Viale 2024-11-12 17:20:44 +01:00 committed by GitHub
parent a696b4d6f4
commit c8f360c5f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 7 additions and 4 deletions

View File

@ -369,10 +369,15 @@ public class StreamThread extends Thread implements ProcessingThread {
final Runnable shutdownErrorHook,
final BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler) {
final boolean stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals());
final String threadId = clientId + THREAD_ID_SUBSTRING + threadIdx;
final String stateUpdaterId = threadId.replace(THREAD_ID_SUBSTRING, STATE_UPDATER_ID_SUBSTRING);
final String restorationThreadId = stateUpdaterEnabled ? stateUpdaterId : threadId;
final String logPrefix = String.format("stream-thread [%s] ", threadId);
final LogContext logContext = new LogContext(logPrefix);
final LogContext restorationLogContext = stateUpdaterEnabled ? new LogContext(String.format("state-updater [%s] ", restorationThreadId)) : logContext;
final Logger log = logContext.logger(StreamThread.class);
final ReferenceContainer referenceContainer = new ReferenceContainer();
@ -382,13 +387,13 @@ public class StreamThread extends Thread implements ProcessingThread {
referenceContainer.clientTags = config.getClientTags();
log.info("Creating restore consumer client");
final Map<String, Object> restoreConsumerConfigs = config.getRestoreConsumerConfigs(restoreConsumerClientId(threadId));
final Map<String, Object> restoreConsumerConfigs = config.getRestoreConsumerConfigs(restoreConsumerClientId(restorationThreadId));
final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
final StoreChangelogReader changelogReader = new StoreChangelogReader(
time,
config,
logContext,
restorationLogContext,
adminClient,
restoreConsumer,
userStateRestoreListener,
@ -397,7 +402,6 @@ public class StreamThread extends Thread implements ProcessingThread {
final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
final boolean stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals());
final boolean proceessingThreadsEnabled = InternalConfig.processingThreadsEnabled(config.originals());
final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator(
topologyMetadata,
@ -475,7 +479,6 @@ public class StreamThread extends Thread implements ProcessingThread {
taskManager.setMainConsumer(mainConsumer);
referenceContainer.mainConsumer = mainConsumer;
final String stateUpdaterId = threadId.replace(THREAD_ID_SUBSTRING, STATE_UPDATER_ID_SUBSTRING);
final StreamsThreadMetricsDelegatingReporter reporter = new StreamsThreadMetricsDelegatingReporter(mainConsumer, threadId, stateUpdaterId);
streamsMetrics.metricsRegistry().addReporter(reporter);