Revert "KAFKA-18887: Implement Streams Admin APIs (#19049)"

This reverts commit 017692e86c.
This commit is contained in:
David Arthur 2025-03-05 10:49:11 -05:00
parent 485699a187
commit d86cb59790
16 changed files with 6 additions and 1459 deletions

View File

@ -940,31 +940,6 @@ public interface Admin extends AutoCloseable {
return listConsumerGroupOffsets(groupSpecs, new ListConsumerGroupOffsetsOptions());
}
/**
* List the Streams group offsets available in the cluster for the specified Streams groups.
*
* <em>Note</em>: this method effectively does the same as the corresponding consumer group method {@link Admin#listConsumerGroupOffsets} does.
*
* @param groupSpecs Map of Streams group ids to a spec that specifies the topic partitions of the group to list offsets for.
*
* @param options The options to use when listing the Streams group offsets.
* @return The ListStreamsGroupOffsetsResult
*/
ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs, ListStreamsGroupOffsetsOptions options);
/**
* List the Streams group offsets available in the cluster for the specified groups with the default options.
* <p>
* This is a convenience method for
* {@link #listStreamsGroupOffsets(Map, ListStreamsGroupOffsetsOptions)} with default options.
*
* @param groupSpecs Map of Streams group ids to a spec that specifies the topic partitions of the group to list offsets for.
* @return The ListStreamsGroupOffsetsResult.
*/
default ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs) {
return listStreamsGroupOffsets(groupSpecs, new ListStreamsGroupOffsetsOptions());
}
/**
* Delete consumer groups from the cluster.
*
@ -982,25 +957,6 @@ public interface Admin extends AutoCloseable {
return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions());
}
/**
* Delete Streams groups from the cluster.
*
* <em>Note</em>: this method effectively does the same as the corresponding consumer group method {@link Admin#deleteConsumerGroups} does.
*
* @param options The options to use when deleting a Streams group.
* @return The DeleteStreamsGroupsResult.
*/
DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds, DeleteStreamsGroupsOptions options);
/**
* Delete Streams groups from the cluster with the default options.
*
* @return The DeleteStreamsGroupResult.
*/
default DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds) {
return deleteStreamsGroups(groupIds, new DeleteStreamsGroupsOptions());
}
/**
* Delete committed offsets for a set of partitions in a consumer group. This will
* succeed at the partition level only if the group is not actively subscribed
@ -1024,31 +980,6 @@ public interface Admin extends AutoCloseable {
return deleteConsumerGroupOffsets(groupId, partitions, new DeleteConsumerGroupOffsetsOptions());
}
/**
* Delete committed offsets for a set of partitions in a Streams group. This will
* succeed at the partition level only if the group is not actively subscribed
* to the corresponding topic.
*
* <em>Note</em>: this method effectively does the same as the corresponding consumer group method {@link Admin#deleteConsumerGroupOffsets} does.
*
* @param options The options to use when deleting offsets in a Streams group.
* @return The DeleteStreamsGroupOffsetsResult.
*/
DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId,
Set<TopicPartition> partitions,
DeleteStreamsGroupOffsetsOptions options);
/**
* Delete committed offsets for a set of partitions in a Streams group with the default
* options. This will succeed at the partition level only if the group is not actively
* subscribed to the corresponding topic.
*
* @return The DeleteStreamsGroupOffsetsResult.
*/
default DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId, Set<TopicPartition> partitions) {
return deleteStreamsGroupOffsets(groupId, partitions, new DeleteStreamsGroupOffsetsOptions());
}
/**
* List the groups available in the cluster with the default options.
*
@ -1282,34 +1213,6 @@ public interface Admin extends AutoCloseable {
*/
AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterConsumerGroupOffsetsOptions options);
/**
* <p>Alters offsets for the specified group. In order to succeed, the group must be empty.
*
* <p>This is a convenience method for {@link #alterStreamsGroupOffsets(String, Map, AlterStreamsGroupOffsetsOptions)} with default options.
* See the overload for more details.
*
* @param groupId The group for which to alter offsets.
* @param offsets A map of offsets by partition with associated metadata.
* @return The AlterOffsetsResult.
*/
default AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets) {
return alterStreamsGroupOffsets(groupId, offsets, new AlterStreamsGroupOffsetsOptions());
}
/**
* <p>Alters offsets for the specified group. In order to succeed, the group must be empty.
*
* <p>This operation is not transactional so it may succeed for some partitions while fail for others.
*
* <em>Note</em>: this method effectively does the same as the corresponding consumer group method {@link Admin#alterConsumerGroupOffsets} does.
*
* @param groupId The group for which to alter offsets.
* @param offsets A map of offsets by partition with associated metadata. Partitions not specified in the map are ignored.
* @param options The options to use when altering the offsets.
* @return The AlterOffsetsResult.
*/
AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterStreamsGroupOffsetsOptions options);
/**
* <p>List offset for the specified partitions and OffsetSpec. This operation enables to find
* the beginning offset, end offset as well as the offset matching a timestamp in partitions.

View File

@ -74,7 +74,7 @@ public class AlterConsumerGroupOffsetsResult {
for (Errors error : topicPartitionErrorsMap.values()) {
if (error != Errors.NONE) {
throw error.exception(
"Failed altering group offsets for the following partitions: " + partitionsFailed);
"Failed altering consumer group offsets for the following partitions: " + partitionsFailed);
}
}
return null;

View File

@ -1,29 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* Options for the {@link Admin#alterStreamsGroupOffsets(String groupId, Map), AlterStreamsGroupOffsetsOptions)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class AlterStreamsGroupOffsetsOptions extends AbstractOptions<AlterStreamsGroupOffsetsOptions> {
}

View File

@ -1,52 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Map;
/**
* The result of the {@link AdminClient#alterStreamsGroupOffsets(String, Map)} call.
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class AlterStreamsGroupOffsetsResult {
private final AlterConsumerGroupOffsetsResult delegate;
AlterStreamsGroupOffsetsResult(final AlterConsumerGroupOffsetsResult delegate) {
this.delegate = delegate;
}
/**
* Return a future which can be used to check the result for a given partition.
*/
public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
return delegate.partitionResult(partition);
}
/**
* Return a future which succeeds if all the alter offsets succeed.
*/
public KafkaFuture<Void> all() {
return delegate.all();
}
}

View File

@ -1,31 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Set;
/**
* Options for the {@link Admin#deleteStreamsGroupOffsets(String, Set, DeleteStreamsGroupOffsetsOptions)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DeleteStreamsGroupOffsetsOptions extends AbstractOptions<DeleteStreamsGroupOffsetsOptions> {
}

View File

@ -1,57 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.protocol.Errors;
import java.util.Map;
import java.util.Set;
/**
* The result of the {@link Admin#deleteStreamsGroupOffsets(String, Set, DeleteStreamsGroupOffsetsOptions)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DeleteStreamsGroupOffsetsResult {
private final DeleteConsumerGroupOffsetsResult delegate;
DeleteStreamsGroupOffsetsResult(KafkaFuture<Map<TopicPartition, Errors>> future, Set<TopicPartition> partitions) {
delegate = new DeleteConsumerGroupOffsetsResult(future, partitions);
}
DeleteStreamsGroupOffsetsResult(final DeleteConsumerGroupOffsetsResult delegate) {
this.delegate = delegate;
}
/**
* Return a future which succeeds only if all the deletions succeed.
*/
public KafkaFuture<Void> all() {
return delegate.all();
}
/**
* Return a future which can be used to check the result for a given topic.
*/
public KafkaFuture<Void> partitionResult(final TopicPartition topicPartition) {
return delegate.partitionResult(topicPartition);
}
}

View File

@ -1,30 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
/**
* Options for the {@link Admin#deleteStreamsGroups(Collection<String>, DeleteStreamsGroupsOptions)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DeleteStreamsGroupsOptions extends AbstractOptions<DeleteStreamsGroupsOptions> {
}

View File

@ -1,57 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
import java.util.Map;
/**
* The result of the {@link Admin#deleteStreamsGroups(Collection, DeleteStreamsGroupsOptions)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DeleteStreamsGroupsResult {
private final DeleteConsumerGroupsResult delegate;
DeleteStreamsGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
delegate = new DeleteConsumerGroupsResult(futures);
}
DeleteStreamsGroupsResult(final DeleteConsumerGroupsResult delegate) {
this.delegate = delegate;
}
/**
* Return a future which succeeds only if all the deletions succeed.
*/
public KafkaFuture<Void> all() {
return delegate.all();
}
/**
* Return a map from group id to futures which can be used to check the status of individual deletions.
*/
public Map<String, KafkaFuture<Void>> deletedGroups() {
return delegate.deletedGroups();
}
}

View File

@ -168,31 +168,16 @@ public class ForwardingAdmin implements Admin {
return delegate.listConsumerGroupOffsets(groupSpecs, options);
}
@Override
public ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs, ListStreamsGroupOffsetsOptions options) {
return delegate.listStreamsGroupOffsets(groupSpecs, options);
}
@Override
public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options) {
return delegate.deleteConsumerGroups(groupIds, options);
}
@Override
public DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds, DeleteStreamsGroupsOptions options) {
return delegate.deleteStreamsGroups(groupIds, options);
}
@Override
public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteConsumerGroupOffsetsOptions options) {
return delegate.deleteConsumerGroupOffsets(groupId, partitions, options);
}
@Override
public DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteStreamsGroupOffsetsOptions options) {
return delegate.deleteStreamsGroupOffsets(groupId, partitions, options);
}
@Override
public ElectLeadersResult electLeaders(ElectionType electionType, Set<TopicPartition> partitions, ElectLeadersOptions options) {
return delegate.electLeaders(electionType, partitions, options);
@ -218,11 +203,6 @@ public class ForwardingAdmin implements Admin {
return delegate.alterConsumerGroupOffsets(groupId, offsets, options);
}
@Override
public AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterStreamsGroupOffsetsOptions options) {
return delegate.alterStreamsGroupOffsets(groupId, offsets, options);
}
@Override
public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options) {
return delegate.listOffsets(topicPartitionOffsets, options);

View File

@ -3766,17 +3766,6 @@ public class KafkaAdminClient extends AdminClient {
return new ListConsumerGroupOffsetsResult(future.all());
}
@Override
public ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs,
ListStreamsGroupOffsetsOptions options) {
Map<String, ListConsumerGroupOffsetsSpec> consumerGroupSpecs = groupSpecs.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> new ListConsumerGroupOffsetsSpec().topicPartitions(entry.getValue().topicPartitions())
));
return new ListStreamsGroupOffsetsResult(listConsumerGroupOffsets(consumerGroupSpecs, new ListConsumerGroupOffsetsOptions()));
}
@Override
public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, Void> future =
@ -3787,11 +3776,6 @@ public class KafkaAdminClient extends AdminClient {
.collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
}
@Override
public DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds, DeleteStreamsGroupsOptions options) {
return new DeleteStreamsGroupsResult(deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions()));
}
@Override
public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(
String groupId,
@ -3804,14 +3788,6 @@ public class KafkaAdminClient extends AdminClient {
return new DeleteConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)), partitions);
}
@Override
public DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(
String groupId,
Set<TopicPartition> partitions,
DeleteStreamsGroupOffsetsOptions options) {
return new DeleteStreamsGroupOffsetsResult(deleteConsumerGroupOffsets(groupId, partitions, new DeleteConsumerGroupOffsetsOptions()));
}
@Override
public DescribeShareGroupsResult describeShareGroups(final Collection<String> groupIds,
final DescribeShareGroupsOptions options) {
@ -4243,15 +4219,6 @@ public class KafkaAdminClient extends AdminClient {
return new AlterConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)));
}
@Override
public AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(
String groupId,
Map<TopicPartition, OffsetAndMetadata> offsets,
AlterStreamsGroupOffsetsOptions options
) {
return new AlterStreamsGroupOffsetsResult(alterConsumerGroupOffsets(groupId, offsets, new AlterConsumerGroupOffsetsOptions()));
}
@Override
public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets,
ListOffsetsOptions options) {

View File

@ -1,44 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* Options for {@link Admin#listStreamsGroupOffsets(Map, ListStreamsGroupOffsetsOptions)}.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ListStreamsGroupOffsetsOptions extends AbstractOptions<ListStreamsGroupOffsetsOptions> {
private boolean requireStable = false;
/**
* Sets an optional requireStable flag.
*/
public ListStreamsGroupOffsetsOptions requireStable(final boolean requireStable) {
this.requireStable = requireStable;
return this;
}
public boolean requireStable() {
return requireStable;
}
}

View File

@ -1,58 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Map;
/**
* The result of the {@link Admin#listStreamsGroupOffsets(Map, ListStreamsGroupOffsetsOptions)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ListStreamsGroupOffsetsResult {
private final ListConsumerGroupOffsetsResult delegate;
ListStreamsGroupOffsetsResult(final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
delegate = new ListConsumerGroupOffsetsResult(futures);
}
ListStreamsGroupOffsetsResult(final ListConsumerGroupOffsetsResult delegate) {
this.delegate = delegate;
}
/**
* Return a future which yields all Map<String, Map<TopicPartition, Long> objects, if requests for all the groups succeed.
*/
public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> all() {
return delegate.all();
}
/**
* Return a future which yields a map of topic partitions to offsets for the specified group.
*/
public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata(String groupId) {
return delegate.partitionsToOffsetAndMetadata(groupId);
}
}

View File

@ -1,49 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
/**
* Specification of Streams group offsets to list using {@link Admin#listStreamsGroupOffsets(Map, ListStreamsGroupOffsetsOptions)}.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ListStreamsGroupOffsetsSpec {
private Collection<TopicPartition> topicPartitions;
/**
* Set the topic partitions whose offsets are to be listed for a Streams group.
*/
ListStreamsGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions) {
this.topicPartitions = topicPartitions;
return this;
}
/**
* Returns the topic partitions whose offsets are to be listed for a Streams group.
*/
Collection<TopicPartition> topicPartitions() {
return topicPartitions;
}
}

View File

@ -756,36 +756,16 @@ public class MockAdminClient extends AdminClient {
return new ListConsumerGroupOffsetsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group), future));
}
@Override
public synchronized ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs, ListStreamsGroupOffsetsOptions options) {
Map<String, ListConsumerGroupOffsetsSpec> consumerGroupSpecs = groupSpecs.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> new ListConsumerGroupOffsetsSpec().topicPartitions(entry.getValue().topicPartitions())
));
return new ListStreamsGroupOffsetsResult(listConsumerGroupOffsets(consumerGroupSpecs, new ListConsumerGroupOffsetsOptions()));
}
@Override
public synchronized DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public synchronized DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds, DeleteStreamsGroupsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public synchronized DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteConsumerGroupOffsetsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public synchronized DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteStreamsGroupOffsetsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public synchronized ElectLeadersResult electLeaders(
ElectionType electionType,
@ -1214,11 +1194,6 @@ public class MockAdminClient extends AdminClient {
throw new UnsupportedOperationException("Not implement yet");
}
@Override
public synchronized AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterStreamsGroupOffsetsOptions options) {
throw new UnsupportedOperationException("Not implement yet");
}
@Override
public synchronized ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options) {
Map<TopicPartition, KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>> futures = new HashMap<>();

View File

@ -37,8 +37,6 @@ import org.apache.kafka.clients.admin.AlterReplicaLogDirsOptions;
import org.apache.kafka.clients.admin.AlterReplicaLogDirsResult;
import org.apache.kafka.clients.admin.AlterShareGroupOffsetsOptions;
import org.apache.kafka.clients.admin.AlterShareGroupOffsetsResult;
import org.apache.kafka.clients.admin.AlterStreamsGroupOffsetsOptions;
import org.apache.kafka.clients.admin.AlterStreamsGroupOffsetsResult;
import org.apache.kafka.clients.admin.AlterUserScramCredentialsOptions;
import org.apache.kafka.clients.admin.AlterUserScramCredentialsResult;
import org.apache.kafka.clients.admin.CreateAclsOptions;
@ -59,10 +57,6 @@ import org.apache.kafka.clients.admin.DeleteRecordsOptions;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.DeleteShareGroupsOptions;
import org.apache.kafka.clients.admin.DeleteShareGroupsResult;
import org.apache.kafka.clients.admin.DeleteStreamsGroupOffsetsOptions;
import org.apache.kafka.clients.admin.DeleteStreamsGroupOffsetsResult;
import org.apache.kafka.clients.admin.DeleteStreamsGroupsOptions;
import org.apache.kafka.clients.admin.DeleteStreamsGroupsResult;
import org.apache.kafka.clients.admin.DeleteTopicsOptions;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeAclsOptions;
@ -120,9 +114,6 @@ import org.apache.kafka.clients.admin.ListPartitionReassignmentsResult;
import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListShareGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListStreamsGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListStreamsGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListStreamsGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.ListTransactionsOptions;
@ -299,31 +290,16 @@ public class TestingMetricsInterceptingAdminClient extends AdminClient {
return adminDelegate.listConsumerGroupOffsets(groupSpecs, options);
}
@Override
public ListStreamsGroupOffsetsResult listStreamsGroupOffsets(final Map<String, ListStreamsGroupOffsetsSpec> groupSpecs, final ListStreamsGroupOffsetsOptions options) {
return adminDelegate.listStreamsGroupOffsets(groupSpecs, options);
}
@Override
public DeleteConsumerGroupsResult deleteConsumerGroups(final Collection<String> groupIds, final DeleteConsumerGroupsOptions options) {
return adminDelegate.deleteConsumerGroups(groupIds, options);
}
@Override
public DeleteStreamsGroupsResult deleteStreamsGroups(final Collection<String> groupIds, final DeleteStreamsGroupsOptions options) {
return adminDelegate.deleteStreamsGroups(groupIds, options);
}
@Override
public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(final String groupId, final Set<TopicPartition> partitions, final DeleteConsumerGroupOffsetsOptions options) {
return adminDelegate.deleteConsumerGroupOffsets(groupId, partitions, options);
}
@Override
public DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(final String groupId, final Set<TopicPartition> partitions, final DeleteStreamsGroupOffsetsOptions options) {
return adminDelegate.deleteStreamsGroupOffsets(groupId, partitions, options);
}
@Override
public ElectLeadersResult electLeaders(final ElectionType electionType, final Set<TopicPartition> partitions, final ElectLeadersOptions options) {
return adminDelegate.electLeaders(electionType, partitions, options);
@ -349,11 +325,6 @@ public class TestingMetricsInterceptingAdminClient extends AdminClient {
return adminDelegate.alterConsumerGroupOffsets(groupId, offsets, options);
}
@Override
public AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(final String groupId, final Map<TopicPartition, OffsetAndMetadata> offsets, final AlterStreamsGroupOffsetsOptions options) {
return adminDelegate.alterStreamsGroupOffsets(groupId, offsets, options);
}
@Override
public ListOffsetsResult listOffsets(final Map<TopicPartition, OffsetSpec> topicPartitionOffsets, final ListOffsetsOptions options) {
return adminDelegate.listOffsets(topicPartitionOffsets, options);