HOTFIX: StoreChangelogReader should require stable consumer group (#18901)

Fixing regression bug, introduced by beac86f049

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Bruno Cadonna <bruno@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-02-17 12:53:13 -08:00
parent c4ea05f684
commit 7a749b589f
1 changed files with 7 additions and 3 deletions

View File

@ -744,7 +744,11 @@ public class StoreChangelogReader implements ChangelogReader {
.requireStable(true);
final ListConsumerGroupOffsetsSpec spec = new ListConsumerGroupOffsetsSpec()
.topicPartitions(new ArrayList<>(partitions));
final Map<TopicPartition, Long> committedOffsets = adminClient.listConsumerGroupOffsets(Collections.singletonMap(groupId, spec))
final Map<TopicPartition, Long> committedOffsets =
adminClient.listConsumerGroupOffsets(
Collections.singletonMap(groupId, spec),
options
)
.partitionsToOffsetAndMetadata(groupId).get().entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() == null ? 0L : e.getValue().offset()));