MINOR: fix incorrect offset reset logging (#20558)

We need to only pass in the reset strategy, as the `logMessage`
parameter was removed.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Lucas Brutschy
 <lbrutschy@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-09-22 09:54:50 -07:00
parent 93c4dc1d27
commit eeafe0a101
1 changed files with 6 additions and 5 deletions

View File

@ -1524,6 +1524,7 @@ public class StreamThread extends Thread implements ProcessingThread {
try { try {
records = mainConsumer.poll(pollTime); records = mainConsumer.poll(pollTime);
} catch (final InvalidOffsetException e) { } catch (final InvalidOffsetException e) {
log.info("Found no valid offset for {} partitions, resetting.", e.partitions().size());
resetOffsets(e.partitions(), e); resetOffsets(e.partitions(), e);
} }
@ -1598,14 +1599,14 @@ public class StreamThread extends Thread implements ProcessingThread {
addToResetList( addToResetList(
partition, partition,
seekToBeginning, seekToBeginning,
"Setting topic '{}' to consume from earliest offset", "Setting topic '{}' to consume from 'earliest' offset",
loggedTopics loggedTopics
); );
} else if (resetPolicy == AutoOffsetResetStrategy.LATEST) { } else if (resetPolicy == AutoOffsetResetStrategy.LATEST) {
addToResetList( addToResetList(
partition, partition,
seekToEnd, seekToEnd,
"Setting topic '{}' to consume from latest offset", "Setting topic '{}' to consume from 'latest' offset",
loggedTopics loggedTopics
); );
} else if (resetPolicy.type() == AutoOffsetResetStrategy.StrategyType.BY_DURATION) { } else if (resetPolicy.type() == AutoOffsetResetStrategy.StrategyType.BY_DURATION) {
@ -1613,7 +1614,7 @@ public class StreamThread extends Thread implements ProcessingThread {
partition, partition,
seekByDuration, seekByDuration,
resetPolicy.duration().get(), resetPolicy.duration().get(),
"Setting topic '{}' to consume from by_duration:{}", "Setting topic '{}' to consume from 'by_duration:{}'",
resetPolicy.duration().get().toString(), resetPolicy.duration().get().toString(),
loggedTopics loggedTopics
); );
@ -1729,12 +1730,12 @@ public class StreamThread extends Thread implements ProcessingThread {
private void addToResetList( private void addToResetList(
final TopicPartition partition, final TopicPartition partition,
final Set<TopicPartition> partitions, final Set<TopicPartition> partitions,
final String resetPolicy, final String logMessage,
final Set<String> loggedTopics final Set<String> loggedTopics
) { ) {
final String topic = partition.topic(); final String topic = partition.topic();
if (loggedTopics.add(topic)) { if (loggedTopics.add(topic)) {
log.info("Setting topic '{}' to consume from {} offset", topic, resetPolicy); log.info(logMessage, topic);
} }
partitions.add(partition); partitions.add(partition);
} }