mirror of https://github.com/apache/kafka.git
HOTFIX: StoreChangelogReader should require stable consumer group (#18901)
Fixing regression bug, introduced by beac86f049
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Bruno Cadonna <bruno@confluent.io>
This commit is contained in:
parent
c4ea05f684
commit
7a749b589f
|
@ -741,10 +741,14 @@ public class StoreChangelogReader implements ChangelogReader {
|
|||
try {
|
||||
// those which do not have a committed offset would default to 0
|
||||
final ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions()
|
||||
.requireStable(true);
|
||||
.requireStable(true);
|
||||
final ListConsumerGroupOffsetsSpec spec = new ListConsumerGroupOffsetsSpec()
|
||||
.topicPartitions(new ArrayList<>(partitions));
|
||||
final Map<TopicPartition, Long> committedOffsets = adminClient.listConsumerGroupOffsets(Collections.singletonMap(groupId, spec))
|
||||
.topicPartitions(new ArrayList<>(partitions));
|
||||
final Map<TopicPartition, Long> committedOffsets =
|
||||
adminClient.listConsumerGroupOffsets(
|
||||
Collections.singletonMap(groupId, spec),
|
||||
options
|
||||
)
|
||||
.partitionsToOffsetAndMetadata(groupId).get().entrySet()
|
||||
.stream()
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() == null ? 0L : e.getValue().offset()));
|
||||
|
|
Loading…
Reference in New Issue