mirror of https://github.com/apache/kafka.git
KAFKA-18887: Implement Streams Admin APIs (#19120)
Implement Admin API extensions beyond list/describe group (delete group, offset-related APIs). * adds methods for describing and manipulating offsets, as described in KIP-1071 * adds corresponding unit tests These are doing the exact same thing as the corresponding consumer group counter-parts. Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
parent
7fcee6f459
commit
7a976c651e
|
@ -940,6 +940,31 @@ public interface Admin extends AutoCloseable {
|
|||
return listConsumerGroupOffsets(groupSpecs, new ListConsumerGroupOffsetsOptions());
|
||||
}
|
||||
|
||||
/**
|
||||
* List the Streams group offsets available in the cluster for the specified Streams groups.
|
||||
*
|
||||
* <em>Note</em>: this method effectively does the same as the corresponding consumer group method {@link Admin#listConsumerGroupOffsets} does.
|
||||
*
|
||||
* @param groupSpecs Map of Streams group ids to a spec that specifies the topic partitions of the group to list offsets for.
|
||||
*
|
||||
* @param options The options to use when listing the Streams group offsets.
|
||||
* @return The ListStreamsGroupOffsetsResult
|
||||
*/
|
||||
ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs, ListStreamsGroupOffsetsOptions options);
|
||||
|
||||
/**
|
||||
* List the Streams group offsets available in the cluster for the specified groups with the default options.
|
||||
* <p>
|
||||
* This is a convenience method for
|
||||
* {@link #listStreamsGroupOffsets(Map, ListStreamsGroupOffsetsOptions)} with default options.
|
||||
*
|
||||
* @param groupSpecs Map of Streams group ids to a spec that specifies the topic partitions of the group to list offsets for.
|
||||
* @return The ListStreamsGroupOffsetsResult.
|
||||
*/
|
||||
default ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs) {
|
||||
return listStreamsGroupOffsets(groupSpecs, new ListStreamsGroupOffsetsOptions());
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete consumer groups from the cluster.
|
||||
*
|
||||
|
@ -957,6 +982,25 @@ public interface Admin extends AutoCloseable {
|
|||
return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions());
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete Streams groups from the cluster.
|
||||
*
|
||||
* <em>Note</em>: this method effectively does the same as the corresponding consumer group method {@link Admin#deleteConsumerGroups} does.
|
||||
*
|
||||
* @param options The options to use when deleting a Streams group.
|
||||
* @return The DeleteStreamsGroupsResult.
|
||||
*/
|
||||
DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds, DeleteStreamsGroupsOptions options);
|
||||
|
||||
/**
|
||||
* Delete Streams groups from the cluster with the default options.
|
||||
*
|
||||
* @return The DeleteStreamsGroupResult.
|
||||
*/
|
||||
default DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds) {
|
||||
return deleteStreamsGroups(groupIds, new DeleteStreamsGroupsOptions());
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete committed offsets for a set of partitions in a consumer group. This will
|
||||
* succeed at the partition level only if the group is not actively subscribed
|
||||
|
@ -980,6 +1024,31 @@ public interface Admin extends AutoCloseable {
|
|||
return deleteConsumerGroupOffsets(groupId, partitions, new DeleteConsumerGroupOffsetsOptions());
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete committed offsets for a set of partitions in a Streams group. This will
|
||||
* succeed at the partition level only if the group is not actively subscribed
|
||||
* to the corresponding topic.
|
||||
*
|
||||
* <em>Note</em>: this method effectively does the same as the corresponding consumer group method {@link Admin#deleteConsumerGroupOffsets} does.
|
||||
*
|
||||
* @param options The options to use when deleting offsets in a Streams group.
|
||||
* @return The DeleteStreamsGroupOffsetsResult.
|
||||
*/
|
||||
DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId,
|
||||
Set<TopicPartition> partitions,
|
||||
DeleteStreamsGroupOffsetsOptions options);
|
||||
|
||||
/**
|
||||
* Delete committed offsets for a set of partitions in a Streams group with the default
|
||||
* options. This will succeed at the partition level only if the group is not actively
|
||||
* subscribed to the corresponding topic.
|
||||
*
|
||||
* @return The DeleteStreamsGroupOffsetsResult.
|
||||
*/
|
||||
default DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId, Set<TopicPartition> partitions) {
|
||||
return deleteStreamsGroupOffsets(groupId, partitions, new DeleteStreamsGroupOffsetsOptions());
|
||||
}
|
||||
|
||||
/**
|
||||
* List the groups available in the cluster with the default options.
|
||||
*
|
||||
|
@ -1213,6 +1282,34 @@ public interface Admin extends AutoCloseable {
|
|||
*/
|
||||
AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterConsumerGroupOffsetsOptions options);
|
||||
|
||||
/**
|
||||
* <p>Alters offsets for the specified group. In order to succeed, the group must be empty.
|
||||
*
|
||||
* <p>This is a convenience method for {@link #alterStreamsGroupOffsets(String, Map, AlterStreamsGroupOffsetsOptions)} with default options.
|
||||
* See the overload for more details.
|
||||
*
|
||||
* @param groupId The group for which to alter offsets.
|
||||
* @param offsets A map of offsets by partition with associated metadata.
|
||||
* @return The AlterOffsetsResult.
|
||||
*/
|
||||
default AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets) {
|
||||
return alterStreamsGroupOffsets(groupId, offsets, new AlterStreamsGroupOffsetsOptions());
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Alters offsets for the specified group. In order to succeed, the group must be empty.
|
||||
*
|
||||
* <p>This operation is not transactional so it may succeed for some partitions while fail for others.
|
||||
*
|
||||
* <em>Note</em>: this method effectively does the same as the corresponding consumer group method {@link Admin#alterConsumerGroupOffsets} does.
|
||||
*
|
||||
* @param groupId The group for which to alter offsets.
|
||||
* @param offsets A map of offsets by partition with associated metadata. Partitions not specified in the map are ignored.
|
||||
* @param options The options to use when altering the offsets.
|
||||
* @return The AlterOffsetsResult.
|
||||
*/
|
||||
AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterStreamsGroupOffsetsOptions options);
|
||||
|
||||
/**
|
||||
* <p>List offset for the specified partitions and OffsetSpec. This operation enables to find
|
||||
* the beginning offset, end offset as well as the offset matching a timestamp in partitions.
|
||||
|
|
|
@ -74,7 +74,7 @@ public class AlterConsumerGroupOffsetsResult {
|
|||
for (Errors error : topicPartitionErrorsMap.values()) {
|
||||
if (error != Errors.NONE) {
|
||||
throw error.exception(
|
||||
"Failed altering consumer group offsets for the following partitions: " + partitionsFailed);
|
||||
"Failed altering group offsets for the following partitions: " + partitionsFailed);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.clients.admin;
|
||||
|
||||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
/**
|
||||
* Options for the {@link Admin#alterStreamsGroupOffsets(String, Map, AlterStreamsGroupOffsetsOptions)} call.
|
||||
* <p>
|
||||
* The API of this class is evolving, see {@link Admin} for details.
|
||||
*/
|
||||
@InterfaceStability.Evolving
|
||||
public class AlterStreamsGroupOffsetsOptions extends AbstractOptions<AlterStreamsGroupOffsetsOptions> {
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.clients.admin;
|
||||
|
||||
import org.apache.kafka.common.KafkaFuture;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* The result of the {@link AdminClient#alterStreamsGroupOffsets(String, Map)} call.
|
||||
*
|
||||
* The API of this class is evolving, see {@link AdminClient} for details.
|
||||
*/
|
||||
@InterfaceStability.Evolving
|
||||
public class AlterStreamsGroupOffsetsResult {
|
||||
|
||||
private final AlterConsumerGroupOffsetsResult delegate;
|
||||
|
||||
AlterStreamsGroupOffsetsResult(final AlterConsumerGroupOffsetsResult delegate) {
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a future which can be used to check the result for a given partition.
|
||||
*/
|
||||
public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
|
||||
return delegate.partitionResult(partition);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a future which succeeds if all the alter offsets succeed.
|
||||
*/
|
||||
public KafkaFuture<Void> all() {
|
||||
return delegate.all();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.clients.admin;
|
||||
|
||||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Options for the {@link Admin#deleteStreamsGroupOffsets(String, Set, DeleteStreamsGroupOffsetsOptions)} call.
|
||||
* <p>
|
||||
* The API of this class is evolving, see {@link Admin} for details.
|
||||
*/
|
||||
@InterfaceStability.Evolving
|
||||
public class DeleteStreamsGroupOffsetsOptions extends AbstractOptions<DeleteStreamsGroupOffsetsOptions> {
|
||||
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.clients.admin;
|
||||
|
||||
import org.apache.kafka.common.KafkaFuture;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* The result of the {@link Admin#deleteStreamsGroupOffsets(String, Set, DeleteStreamsGroupOffsetsOptions)} call.
|
||||
* <p>
|
||||
* The API of this class is evolving, see {@link Admin} for details.
|
||||
*/
|
||||
@InterfaceStability.Evolving
|
||||
public class DeleteStreamsGroupOffsetsResult {
|
||||
private final DeleteConsumerGroupOffsetsResult delegate;
|
||||
|
||||
DeleteStreamsGroupOffsetsResult(KafkaFuture<Map<TopicPartition, Errors>> future, Set<TopicPartition> partitions) {
|
||||
delegate = new DeleteConsumerGroupOffsetsResult(future, partitions);
|
||||
}
|
||||
|
||||
DeleteStreamsGroupOffsetsResult(final DeleteConsumerGroupOffsetsResult delegate) {
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a future which succeeds only if all the deletions succeed.
|
||||
*/
|
||||
public KafkaFuture<Void> all() {
|
||||
return delegate.all();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a future which can be used to check the result for a given topic.
|
||||
*/
|
||||
public KafkaFuture<Void> partitionResult(final TopicPartition topicPartition) {
|
||||
return delegate.partitionResult(topicPartition);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.clients.admin;
|
||||
|
||||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* Options for the {@link Admin#deleteStreamsGroups(Collection<String>, DeleteStreamsGroupsOptions)} call.
|
||||
* <p>
|
||||
* The API of this class is evolving, see {@link Admin} for details.
|
||||
*/
|
||||
@InterfaceStability.Evolving
|
||||
public class DeleteStreamsGroupsOptions extends AbstractOptions<DeleteStreamsGroupsOptions> {
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.clients.admin;
|
||||
|
||||
import org.apache.kafka.common.KafkaFuture;
|
||||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* The result of the {@link Admin#deleteStreamsGroups(Collection, DeleteStreamsGroupsOptions)} call.
|
||||
* <p>
|
||||
* The API of this class is evolving, see {@link Admin} for details.
|
||||
*/
|
||||
@InterfaceStability.Evolving
|
||||
public class DeleteStreamsGroupsResult {
|
||||
|
||||
private final DeleteConsumerGroupsResult delegate;
|
||||
|
||||
DeleteStreamsGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
|
||||
delegate = new DeleteConsumerGroupsResult(futures);
|
||||
}
|
||||
|
||||
DeleteStreamsGroupsResult(final DeleteConsumerGroupsResult delegate) {
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a future which succeeds only if all the deletions succeed.
|
||||
*/
|
||||
public KafkaFuture<Void> all() {
|
||||
return delegate.all();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a map from group id to futures which can be used to check the status of individual deletions.
|
||||
*/
|
||||
public Map<String, KafkaFuture<Void>> deletedGroups() {
|
||||
return delegate.deletedGroups();
|
||||
}
|
||||
}
|
|
@ -168,16 +168,31 @@ public class ForwardingAdmin implements Admin {
|
|||
return delegate.listConsumerGroupOffsets(groupSpecs, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs, ListStreamsGroupOffsetsOptions options) {
|
||||
return delegate.listStreamsGroupOffsets(groupSpecs, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options) {
|
||||
return delegate.deleteConsumerGroups(groupIds, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds, DeleteStreamsGroupsOptions options) {
|
||||
return delegate.deleteStreamsGroups(groupIds, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteConsumerGroupOffsetsOptions options) {
|
||||
return delegate.deleteConsumerGroupOffsets(groupId, partitions, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteStreamsGroupOffsetsOptions options) {
|
||||
return delegate.deleteStreamsGroupOffsets(groupId, partitions, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ElectLeadersResult electLeaders(ElectionType electionType, Set<TopicPartition> partitions, ElectLeadersOptions options) {
|
||||
return delegate.electLeaders(electionType, partitions, options);
|
||||
|
@ -203,6 +218,11 @@ public class ForwardingAdmin implements Admin {
|
|||
return delegate.alterConsumerGroupOffsets(groupId, offsets, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterStreamsGroupOffsetsOptions options) {
|
||||
return delegate.alterStreamsGroupOffsets(groupId, offsets, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options) {
|
||||
return delegate.listOffsets(topicPartitionOffsets, options);
|
||||
|
|
|
@ -3766,6 +3766,17 @@ public class KafkaAdminClient extends AdminClient {
|
|||
return new ListConsumerGroupOffsetsResult(future.all());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs,
|
||||
ListStreamsGroupOffsetsOptions options) {
|
||||
Map<String, ListConsumerGroupOffsetsSpec> consumerGroupSpecs = groupSpecs.entrySet().stream()
|
||||
.collect(Collectors.toMap(
|
||||
Map.Entry::getKey,
|
||||
entry -> new ListConsumerGroupOffsetsSpec().topicPartitions(entry.getValue().topicPartitions())
|
||||
));
|
||||
return new ListStreamsGroupOffsetsResult(listConsumerGroupOffsets(consumerGroupSpecs, new ListConsumerGroupOffsetsOptions()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options) {
|
||||
SimpleAdminApiFuture<CoordinatorKey, Void> future =
|
||||
|
@ -3776,6 +3787,11 @@ public class KafkaAdminClient extends AdminClient {
|
|||
.collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds, DeleteStreamsGroupsOptions options) {
|
||||
return new DeleteStreamsGroupsResult(deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(
|
||||
String groupId,
|
||||
|
@ -3788,6 +3804,14 @@ public class KafkaAdminClient extends AdminClient {
|
|||
return new DeleteConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)), partitions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(
|
||||
String groupId,
|
||||
Set<TopicPartition> partitions,
|
||||
DeleteStreamsGroupOffsetsOptions options) {
|
||||
return new DeleteStreamsGroupOffsetsResult(deleteConsumerGroupOffsets(groupId, partitions, new DeleteConsumerGroupOffsetsOptions()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public DescribeShareGroupsResult describeShareGroups(final Collection<String> groupIds,
|
||||
final DescribeShareGroupsOptions options) {
|
||||
|
@ -4219,6 +4243,15 @@ public class KafkaAdminClient extends AdminClient {
|
|||
return new AlterConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(
|
||||
String groupId,
|
||||
Map<TopicPartition, OffsetAndMetadata> offsets,
|
||||
AlterStreamsGroupOffsetsOptions options
|
||||
) {
|
||||
return new AlterStreamsGroupOffsetsResult(alterConsumerGroupOffsets(groupId, offsets, new AlterConsumerGroupOffsetsOptions()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets,
|
||||
ListOffsetsOptions options) {
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.clients.admin;
|
||||
|
||||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
|
||||
|
||||
/**
|
||||
* Options for {@link Admin#listStreamsGroupOffsets(Map, ListStreamsGroupOffsetsOptions)}.
|
||||
* <p>
|
||||
* The API of this class is evolving, see {@link Admin} for details.
|
||||
*/
|
||||
@InterfaceStability.Evolving
|
||||
public class ListStreamsGroupOffsetsOptions extends AbstractOptions<ListStreamsGroupOffsetsOptions> {
|
||||
|
||||
private boolean requireStable = false;
|
||||
|
||||
/**
|
||||
* Sets an optional requireStable flag.
|
||||
*/
|
||||
public ListStreamsGroupOffsetsOptions requireStable(final boolean requireStable) {
|
||||
this.requireStable = requireStable;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean requireStable() {
|
||||
return requireStable;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.clients.admin;
|
||||
|
||||
import org.apache.kafka.clients.admin.internals.CoordinatorKey;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
import org.apache.kafka.common.KafkaFuture;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* The result of the {@link Admin#listStreamsGroupOffsets(Map, ListStreamsGroupOffsetsOptions)} call.
|
||||
* <p>
|
||||
* The API of this class is evolving, see {@link Admin} for details.
|
||||
*/
|
||||
@InterfaceStability.Evolving
|
||||
public class ListStreamsGroupOffsetsResult {
|
||||
private final ListConsumerGroupOffsetsResult delegate;
|
||||
|
||||
ListStreamsGroupOffsetsResult(final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
|
||||
delegate = new ListConsumerGroupOffsetsResult(futures);
|
||||
}
|
||||
|
||||
ListStreamsGroupOffsetsResult(final ListConsumerGroupOffsetsResult delegate) {
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a future which yields all {@code Map<String, Map<TopicPartition, OffsetAndMetadata>>} objects, if requests for all the groups succeed.
|
||||
*/
|
||||
public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> all() {
|
||||
return delegate.all();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a future which yields a map of topic partitions to offsets for the specified group.
|
||||
*/
|
||||
public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata(String groupId) {
|
||||
return delegate.partitionsToOffsetAndMetadata(groupId);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.clients.admin;
|
||||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* Specification of Streams group offsets to list using {@link Admin#listStreamsGroupOffsets(Map, ListStreamsGroupOffsetsOptions)}.
|
||||
* <p>
|
||||
* The API of this class is evolving, see {@link Admin} for details.
|
||||
*/
|
||||
@InterfaceStability.Evolving
|
||||
public class ListStreamsGroupOffsetsSpec {
|
||||
|
||||
private Collection<TopicPartition> topicPartitions;
|
||||
|
||||
/**
|
||||
* Set the topic partitions whose offsets are to be listed for a Streams group.
|
||||
*/
|
||||
ListStreamsGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions) {
|
||||
this.topicPartitions = topicPartitions;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the topic partitions whose offsets are to be listed for a Streams group.
|
||||
*/
|
||||
Collection<TopicPartition> topicPartitions() {
|
||||
return topicPartitions;
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -756,16 +756,36 @@ public class MockAdminClient extends AdminClient {
|
|||
return new ListConsumerGroupOffsetsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group), future));
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs, ListStreamsGroupOffsetsOptions options) {
|
||||
Map<String, ListConsumerGroupOffsetsSpec> consumerGroupSpecs = groupSpecs.entrySet().stream()
|
||||
.collect(Collectors.toMap(
|
||||
Map.Entry::getKey,
|
||||
entry -> new ListConsumerGroupOffsetsSpec().topicPartitions(entry.getValue().topicPartitions())
|
||||
));
|
||||
return new ListStreamsGroupOffsetsResult(listConsumerGroupOffsets(consumerGroupSpecs, new ListConsumerGroupOffsetsOptions()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options) {
|
||||
throw new UnsupportedOperationException("Not implemented yet");
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds, DeleteStreamsGroupsOptions options) {
|
||||
throw new UnsupportedOperationException("Not implemented yet");
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteConsumerGroupOffsetsOptions options) {
|
||||
throw new UnsupportedOperationException("Not implemented yet");
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteStreamsGroupOffsetsOptions options) {
|
||||
throw new UnsupportedOperationException("Not implemented yet");
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized ElectLeadersResult electLeaders(
|
||||
ElectionType electionType,
|
||||
|
@ -1194,6 +1214,11 @@ public class MockAdminClient extends AdminClient {
|
|||
throw new UnsupportedOperationException("Not implement yet");
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterStreamsGroupOffsetsOptions options) {
|
||||
throw new UnsupportedOperationException("Not implement yet");
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options) {
|
||||
Map<TopicPartition, KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>> futures = new HashMap<>();
|
||||
|
|
|
@ -37,6 +37,8 @@ import org.apache.kafka.clients.admin.AlterReplicaLogDirsOptions;
|
|||
import org.apache.kafka.clients.admin.AlterReplicaLogDirsResult;
|
||||
import org.apache.kafka.clients.admin.AlterShareGroupOffsetsOptions;
|
||||
import org.apache.kafka.clients.admin.AlterShareGroupOffsetsResult;
|
||||
import org.apache.kafka.clients.admin.AlterStreamsGroupOffsetsOptions;
|
||||
import org.apache.kafka.clients.admin.AlterStreamsGroupOffsetsResult;
|
||||
import org.apache.kafka.clients.admin.AlterUserScramCredentialsOptions;
|
||||
import org.apache.kafka.clients.admin.AlterUserScramCredentialsResult;
|
||||
import org.apache.kafka.clients.admin.CreateAclsOptions;
|
||||
|
@ -57,6 +59,10 @@ import org.apache.kafka.clients.admin.DeleteRecordsOptions;
|
|||
import org.apache.kafka.clients.admin.DeleteRecordsResult;
|
||||
import org.apache.kafka.clients.admin.DeleteShareGroupsOptions;
|
||||
import org.apache.kafka.clients.admin.DeleteShareGroupsResult;
|
||||
import org.apache.kafka.clients.admin.DeleteStreamsGroupOffsetsOptions;
|
||||
import org.apache.kafka.clients.admin.DeleteStreamsGroupOffsetsResult;
|
||||
import org.apache.kafka.clients.admin.DeleteStreamsGroupsOptions;
|
||||
import org.apache.kafka.clients.admin.DeleteStreamsGroupsResult;
|
||||
import org.apache.kafka.clients.admin.DeleteTopicsOptions;
|
||||
import org.apache.kafka.clients.admin.DeleteTopicsResult;
|
||||
import org.apache.kafka.clients.admin.DescribeAclsOptions;
|
||||
|
@ -114,6 +120,9 @@ import org.apache.kafka.clients.admin.ListPartitionReassignmentsResult;
|
|||
import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions;
|
||||
import org.apache.kafka.clients.admin.ListShareGroupOffsetsResult;
|
||||
import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
|
||||
import org.apache.kafka.clients.admin.ListStreamsGroupOffsetsOptions;
|
||||
import org.apache.kafka.clients.admin.ListStreamsGroupOffsetsResult;
|
||||
import org.apache.kafka.clients.admin.ListStreamsGroupOffsetsSpec;
|
||||
import org.apache.kafka.clients.admin.ListTopicsOptions;
|
||||
import org.apache.kafka.clients.admin.ListTopicsResult;
|
||||
import org.apache.kafka.clients.admin.ListTransactionsOptions;
|
||||
|
@ -290,16 +299,31 @@ public class TestingMetricsInterceptingAdminClient extends AdminClient {
|
|||
return adminDelegate.listConsumerGroupOffsets(groupSpecs, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListStreamsGroupOffsetsResult listStreamsGroupOffsets(final Map<String, ListStreamsGroupOffsetsSpec> groupSpecs, final ListStreamsGroupOffsetsOptions options) {
|
||||
return adminDelegate.listStreamsGroupOffsets(groupSpecs, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeleteConsumerGroupsResult deleteConsumerGroups(final Collection<String> groupIds, final DeleteConsumerGroupsOptions options) {
|
||||
return adminDelegate.deleteConsumerGroups(groupIds, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeleteStreamsGroupsResult deleteStreamsGroups(final Collection<String> groupIds, final DeleteStreamsGroupsOptions options) {
|
||||
return adminDelegate.deleteStreamsGroups(groupIds, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(final String groupId, final Set<TopicPartition> partitions, final DeleteConsumerGroupOffsetsOptions options) {
|
||||
return adminDelegate.deleteConsumerGroupOffsets(groupId, partitions, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(final String groupId, final Set<TopicPartition> partitions, final DeleteStreamsGroupOffsetsOptions options) {
|
||||
return adminDelegate.deleteStreamsGroupOffsets(groupId, partitions, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ElectLeadersResult electLeaders(final ElectionType electionType, final Set<TopicPartition> partitions, final ElectLeadersOptions options) {
|
||||
return adminDelegate.electLeaders(electionType, partitions, options);
|
||||
|
@ -325,6 +349,11 @@ public class TestingMetricsInterceptingAdminClient extends AdminClient {
|
|||
return adminDelegate.alterConsumerGroupOffsets(groupId, offsets, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(final String groupId, final Map<TopicPartition, OffsetAndMetadata> offsets, final AlterStreamsGroupOffsetsOptions options) {
|
||||
return adminDelegate.alterStreamsGroupOffsets(groupId, offsets, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListOffsetsResult listOffsets(final Map<TopicPartition, OffsetSpec> topicPartitionOffsets, final ListOffsetsOptions options) {
|
||||
return adminDelegate.listOffsets(topicPartitionOffsets, options);
|
||||
|
|
Loading…
Reference in New Issue