MINOR: Convert streams group options to consumer group options in Admin APIs (#19583)

This PR is fixing the issue introduced in #19120
The input `StreamsGroup`-options must not be ignored, but it must be
converted to `ConsumerGroup`-options.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Alieh Saeedi 2025-05-06 13:26:56 +02:00 committed by GitHub
parent d2bd68d50c
commit 54b3b3debc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 14 additions and 5 deletions

View File

@ -3779,7 +3779,10 @@ public class KafkaAdminClient extends AdminClient {
Map.Entry::getKey,
entry -> new ListConsumerGroupOffsetsSpec().topicPartitions(entry.getValue().topicPartitions())
));
return new ListStreamsGroupOffsetsResult(listConsumerGroupOffsets(consumerGroupSpecs, new ListConsumerGroupOffsetsOptions()));
ListConsumerGroupOffsetsOptions consumerGroupOptions = new ListConsumerGroupOffsetsOptions()
.requireStable(options.requireStable())
.timeoutMs(options.timeoutMs());
return new ListStreamsGroupOffsetsResult(listConsumerGroupOffsets(consumerGroupSpecs, consumerGroupOptions));
}
@Override
@ -3794,7 +3797,9 @@ public class KafkaAdminClient extends AdminClient {
@Override
public DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds, DeleteStreamsGroupsOptions options) {
return new DeleteStreamsGroupsResult(deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions()));
DeleteConsumerGroupsOptions consumerGroupOptions = new DeleteConsumerGroupsOptions()
.timeoutMs(options.timeoutMs());
return new DeleteStreamsGroupsResult(deleteConsumerGroups(groupIds, consumerGroupOptions));
}
@Override
@ -3814,7 +3819,9 @@ public class KafkaAdminClient extends AdminClient {
String groupId,
Set<TopicPartition> partitions,
DeleteStreamsGroupOffsetsOptions options) {
return new DeleteStreamsGroupOffsetsResult(deleteConsumerGroupOffsets(groupId, partitions, new DeleteConsumerGroupOffsetsOptions()));
DeleteConsumerGroupOffsetsOptions consumerGroupOptions = new DeleteConsumerGroupOffsetsOptions()
.timeoutMs(options.timeoutMs());
return new DeleteStreamsGroupOffsetsResult(deleteConsumerGroupOffsets(groupId, partitions, consumerGroupOptions));
}
@Override
@ -4273,7 +4280,9 @@ public class KafkaAdminClient extends AdminClient {
Map<TopicPartition, OffsetAndMetadata> offsets,
AlterStreamsGroupOffsetsOptions options
) {
return new AlterStreamsGroupOffsetsResult(alterConsumerGroupOffsets(groupId, offsets, new AlterConsumerGroupOffsetsOptions()));
AlterConsumerGroupOffsetsOptions consumerGroupOptions = new AlterConsumerGroupOffsetsOptions()
.timeoutMs(options.timeoutMs());
return new AlterStreamsGroupOffsetsResult(alterConsumerGroupOffsets(groupId, offsets, consumerGroupOptions));
}
@Override

View File

@ -21,7 +21,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
/**
* Options for {@link Admin#listStreamsGroupOffsets(Map, ListStreamsGroupOffsetsOptions)}.
* Options for {@link Admin#listStreamsGroupOffsets(java.util.Map, ListStreamsGroupOffsetsOptions)}.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/