mirror of https://github.com/apache/kafka.git
HOTFIX: StoreChangelogReader should require stable consumer group (#18901)
Fixing regression bug, introduced by beac86f049
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Bruno Cadonna <bruno@confluent.io>
This commit is contained in:
parent
c4ea05f684
commit
7a749b589f
|
@ -744,7 +744,11 @@ public class StoreChangelogReader implements ChangelogReader {
|
||||||
.requireStable(true);
|
.requireStable(true);
|
||||||
final ListConsumerGroupOffsetsSpec spec = new ListConsumerGroupOffsetsSpec()
|
final ListConsumerGroupOffsetsSpec spec = new ListConsumerGroupOffsetsSpec()
|
||||||
.topicPartitions(new ArrayList<>(partitions));
|
.topicPartitions(new ArrayList<>(partitions));
|
||||||
final Map<TopicPartition, Long> committedOffsets = adminClient.listConsumerGroupOffsets(Collections.singletonMap(groupId, spec))
|
final Map<TopicPartition, Long> committedOffsets =
|
||||||
|
adminClient.listConsumerGroupOffsets(
|
||||||
|
Collections.singletonMap(groupId, spec),
|
||||||
|
options
|
||||||
|
)
|
||||||
.partitionsToOffsetAndMetadata(groupId).get().entrySet()
|
.partitionsToOffsetAndMetadata(groupId).get().entrySet()
|
||||||
.stream()
|
.stream()
|
||||||
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() == null ? 0L : e.getValue().offset()));
|
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() == null ? 0L : e.getValue().offset()));
|
||||||
|
|
Loading…
Reference in New Issue