mirror of https://github.com/apache/kafka.git
MINOR: fix incorrect offset reset logging (#20558)
We need to only pass in the reset strategy, as the `logMessage` parameter was removed. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
parent
f16d1f3c9d
commit
71efb89290
|
@ -1532,6 +1532,7 @@ public class StreamThread extends Thread implements ProcessingThread {
|
||||||
try {
|
try {
|
||||||
records = mainConsumer.poll(pollTime);
|
records = mainConsumer.poll(pollTime);
|
||||||
} catch (final InvalidOffsetException e) {
|
} catch (final InvalidOffsetException e) {
|
||||||
|
log.info("Found no valid offset for {} partitions, resetting.", e.partitions().size());
|
||||||
resetOffsets(e.partitions(), e);
|
resetOffsets(e.partitions(), e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1647,14 +1648,14 @@ public class StreamThread extends Thread implements ProcessingThread {
|
||||||
addToResetList(
|
addToResetList(
|
||||||
partition,
|
partition,
|
||||||
seekToBeginning,
|
seekToBeginning,
|
||||||
"Setting topic '{}' to consume from earliest offset",
|
"Setting topic '{}' to consume from 'earliest' offset",
|
||||||
loggedTopics
|
loggedTopics
|
||||||
);
|
);
|
||||||
} else if (resetPolicy == AutoOffsetResetStrategy.LATEST) {
|
} else if (resetPolicy == AutoOffsetResetStrategy.LATEST) {
|
||||||
addToResetList(
|
addToResetList(
|
||||||
partition,
|
partition,
|
||||||
seekToEnd,
|
seekToEnd,
|
||||||
"Setting topic '{}' to consume from latest offset",
|
"Setting topic '{}' to consume from 'latest' offset",
|
||||||
loggedTopics
|
loggedTopics
|
||||||
);
|
);
|
||||||
} else if (resetPolicy.type() == AutoOffsetResetStrategy.StrategyType.BY_DURATION) {
|
} else if (resetPolicy.type() == AutoOffsetResetStrategy.StrategyType.BY_DURATION) {
|
||||||
|
@ -1662,7 +1663,7 @@ public class StreamThread extends Thread implements ProcessingThread {
|
||||||
partition,
|
partition,
|
||||||
seekByDuration,
|
seekByDuration,
|
||||||
resetPolicy.duration().get(),
|
resetPolicy.duration().get(),
|
||||||
"Setting topic '{}' to consume from by_duration:{}",
|
"Setting topic '{}' to consume from 'by_duration:{}'",
|
||||||
resetPolicy.duration().get().toString(),
|
resetPolicy.duration().get().toString(),
|
||||||
loggedTopics
|
loggedTopics
|
||||||
);
|
);
|
||||||
|
@ -1778,12 +1779,12 @@ public class StreamThread extends Thread implements ProcessingThread {
|
||||||
private void addToResetList(
|
private void addToResetList(
|
||||||
final TopicPartition partition,
|
final TopicPartition partition,
|
||||||
final Set<TopicPartition> partitions,
|
final Set<TopicPartition> partitions,
|
||||||
final String resetPolicy,
|
final String logMessage,
|
||||||
final Set<String> loggedTopics
|
final Set<String> loggedTopics
|
||||||
) {
|
) {
|
||||||
final String topic = partition.topic();
|
final String topic = partition.topic();
|
||||||
if (loggedTopics.add(topic)) {
|
if (loggedTopics.add(topic)) {
|
||||||
log.info("Setting topic '{}' to consume from {} offset", topic, resetPolicy);
|
log.info(logMessage, topic);
|
||||||
}
|
}
|
||||||
partitions.add(partition);
|
partitions.add(partition);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue