From eeafe0a101eca851745508d3a440003fb8108de7 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 22 Sep 2025 09:54:50 -0700 Subject: [PATCH] MINOR: fix incorrect offset reset logging (#20558) We need to only pass in the reset strategy, as the `logMessage` parameter was removed. Reviewers: Chia-Ping Tsai , Lucas Brutschy --- .../streams/processor/internals/StreamThread.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index ad66a822e72..f7f3676d24f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1524,6 +1524,7 @@ public class StreamThread extends Thread implements ProcessingThread { try { records = mainConsumer.poll(pollTime); } catch (final InvalidOffsetException e) { + log.info("Found no valid offset for {} partitions, resetting.", e.partitions().size()); resetOffsets(e.partitions(), e); } @@ -1598,14 +1599,14 @@ public class StreamThread extends Thread implements ProcessingThread { addToResetList( partition, seekToBeginning, - "Setting topic '{}' to consume from earliest offset", + "Setting topic '{}' to consume from 'earliest' offset", loggedTopics ); } else if (resetPolicy == AutoOffsetResetStrategy.LATEST) { addToResetList( partition, seekToEnd, - "Setting topic '{}' to consume from latest offset", + "Setting topic '{}' to consume from 'latest' offset", loggedTopics ); } else if (resetPolicy.type() == AutoOffsetResetStrategy.StrategyType.BY_DURATION) { @@ -1613,7 +1614,7 @@ public class StreamThread extends Thread implements ProcessingThread { partition, seekByDuration, resetPolicy.duration().get(), - "Setting topic '{}' to consume from by_duration:{}", + "Setting topic '{}' to consume from 'by_duration:{}'", resetPolicy.duration().get().toString(), loggedTopics ); @@ -1729,12 +1730,12 @@ public class StreamThread extends Thread implements ProcessingThread { private void addToResetList( final TopicPartition partition, final Set partitions, - final String resetPolicy, + final String logMessage, final Set loggedTopics ) { final String topic = partition.topic(); if (loggedTopics.add(topic)) { - log.info("Setting topic '{}' to consume from {} offset", topic, resetPolicy); + log.info(logMessage, topic); } partitions.add(partition); }