KAFKA-6058: KIP-222; Add Consumer Group operations to Admin API

KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-222+-+Add+Consumer+Group+operations+to+Admin+API

Author: Jorge Quilcate Otoya <quilcate.jorge@gmail.com>
Author: Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com>
Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Guozhang Wang <wangguoz@gmail.com>

Closes #4454 from jeqo/feature/admin-client-describe-consumer-group
This commit is contained in:
Jorge Quilcate Otoya 2018-04-11 14:17:46 -07:00 committed by Guozhang Wang
parent 47918f2d79
commit 6a99da87ab
19 changed files with 1290 additions and 2 deletions

View File

@ -105,7 +105,7 @@
</module>
<module name="ClassDataAbstractionCoupling">
<!-- default is 7 -->
<property name="max" value="20"/>
<property name="max" value="25"/>
</module>
<module name="BooleanExpressionComplexity">
<!-- default is 3 -->
@ -114,7 +114,7 @@
<module name="ClassFanOutComplexity">
<!-- default is 20 -->
<property name="max" value="40"/>
<property name="max" value="50"/>
</module>
<module name="CyclomaticComplexity">
<!-- default is 10-->

View File

@ -164,6 +164,8 @@
<subpackage name="admin">
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="org.apache.kafka.clients.consumer.internals" />
<allow pkg="org.apache.kafka.clients.consumer" />
</subpackage>
</subpackage>

View File

@ -689,4 +689,83 @@ public abstract class AdminClient implements AutoCloseable {
*/
public abstract DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options);
/**
* Describe some group IDs in the cluster.
*
* @param groupIds The IDs of the groups to describe.
* @param options The options to use when describing the groups.
* @return The DescribeConsumerGroupResult.
*/
public abstract DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds,
DescribeConsumerGroupsOptions options);
/**
* Describe some group IDs in the cluster, with the default options.
* <p>
* This is a convenience method for
* #{@link AdminClient#describeConsumerGroups(Collection, DescribeConsumerGroupsOptions)} with
* default options. See the overload for more details.
*
* @param groupIds The IDs of the groups to describe.
* @return The DescribeConsumerGroupResult.
*/
public DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds) {
return describeConsumerGroups(groupIds, new DescribeConsumerGroupsOptions());
}
/**
* List the consumer groups available in the cluster.
*
* @param options The options to use when listing the consumer groups.
* @return The ListGroupsResult.
*/
public abstract ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options);
/**
* List the consumer groups available in the cluster with the default options.
*
* This is a convenience method for #{@link AdminClient#listConsumerGroups(ListConsumerGroupsOptions)} with default options.
* See the overload for more details.
*
* @return The ListGroupsResult.
*/
public ListConsumerGroupsResult listConsumerGroups() {
return listConsumerGroups(new ListConsumerGroupsOptions());
}
/**
* List the consumer group offsets available in the cluster.
*
* @param options The options to use when listing the consumer group offsets.
* @return The ListGroupOffsetsResult
*/
public abstract ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options);
/**
* List the consumer group offsets available in the cluster with the default options.
*
* This is a convenience method for #{@link AdminClient#listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)} with default options.
*
* @return The ListGroupOffsetsResult.
*/
public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) {
return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions());
}
/**
* Delete consumer groups from the cluster.
*
* @param options The options to use when deleting a consumer group.
* @return The DeletConsumerGroupResult.
*/
public abstract DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options);
/**
* Delete consumer groups from the cluster with the default options.
*
* @return The DeleteConsumerGroupResult.
*/
public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds) {
return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions());
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
/**
* A detailed description of a single consumer group in the cluster.
*/
public class ConsumerGroupDescription {
private final String groupId;
private final boolean isSimpleConsumerGroup;
private final List<MemberDescription> members;
private final String partitionAssignor;
/**
* Creates an instance with the specified parameters.
*
* @param groupId The consumer group id
* @param isSimpleConsumerGroup If Consumer Group is simple
* @param members The consumer group members
* @param partitionAssignor The consumer group partition assignor
*/
public ConsumerGroupDescription(String groupId, boolean isSimpleConsumerGroup, List<MemberDescription> members, String partitionAssignor) {
this.groupId = groupId;
this.isSimpleConsumerGroup = isSimpleConsumerGroup;
this.members = members;
this.partitionAssignor = partitionAssignor;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ConsumerGroupDescription that = (ConsumerGroupDescription) o;
if (isSimpleConsumerGroup != that.isSimpleConsumerGroup) return false;
if (groupId != null ? !groupId.equals(that.groupId) : that.groupId != null) return false;
if (members != null ? !members.equals(that.members) : that.members != null) return false;
return partitionAssignor != null ? partitionAssignor.equals(that.partitionAssignor) : that.partitionAssignor == null;
}
@Override
public int hashCode() {
int result = groupId != null ? groupId.hashCode() : 0;
result = 31 * result + (isSimpleConsumerGroup ? 1 : 0);
result = 31 * result + (members != null ? members.hashCode() : 0);
result = 31 * result + (partitionAssignor != null ? partitionAssignor.hashCode() : 0);
return result;
}
/**
* The id of the consumer group.
*/
public String groupId() {
return groupId;
}
/**
* If consumer group is simple or not.
*/
public boolean isSimpleConsumerGroup() {
return isSimpleConsumerGroup;
}
/**
* A list of the members of the consumer group.
*/
public List<MemberDescription> members() {
return members;
}
/**
* The consumer group partition assignor.
*/
public String partitionAssignor() {
return partitionAssignor;
}
@Override
public String toString() {
return "(groupId=" + groupId + ", isSimpleConsumerGroup=" + isSimpleConsumerGroup + ", members=" +
Utils.join(members, ",") + ", partitionAssignor=" + partitionAssignor + ")";
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
/**
* A listing of a consumer group in the cluster.
*/
public class ConsumerGroupListing {
private final String groupId;
private final boolean isSimpleConsumerGroup;
/**
* Create an instance with the specified parameters.
*
* @param groupId Group Id
* @param isSimpleConsumerGroup If consumer group is simple or not.
*/
public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup) {
this.groupId = groupId;
this.isSimpleConsumerGroup = isSimpleConsumerGroup;
}
/**
* Consumer Group Id
*/
public String groupId() {
return groupId;
}
/**
* If Consumer Group is simple or not.
*/
public boolean isSimpleConsumerGroup() {
return isSimpleConsumerGroup;
}
@Override
public String toString() {
return "(" +
"groupId='" + groupId + '\'' +
", isSimpleConsumerGroup=" + isSimpleConsumerGroup +
')';
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
/**
* Options for the {@link AdminClient#deleteConsumerGroups(Collection)} call.
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class DeleteConsumerGroupsOptions extends AbstractOptions<DeleteConsumerGroupsOptions> {
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
import java.util.Map;
/**
* The result of the {@link AdminClient#deleteConsumerGroups(Collection)} call.
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class DeleteConsumerGroupsResult {
final KafkaFuture<Map<String, KafkaFuture<Void>>> futures;
DeleteConsumerGroupsResult(KafkaFuture<Map<String, KafkaFuture<Void>>> futures) {
this.futures = futures;
}
public KafkaFuture<Map<String, KafkaFuture<Void>>> deletedGroups() {
return futures;
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
/**
* Options for {@link AdminClient#describeConsumerGroups(Collection, DescribeConsumerGroupsOptions)}.
* <p>
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class DescribeConsumerGroupsOptions extends AbstractOptions<DescribeConsumerGroupsOptions> {
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
import java.util.Map;
/**
* The result of the {@link KafkaAdminClient#describeConsumerGroups(Collection, DescribeConsumerGroupsOptions)}} call.
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class DescribeConsumerGroupsResult {
private final KafkaFuture<Map<String, KafkaFuture<ConsumerGroupDescription>>> futures;
public DescribeConsumerGroupsResult(KafkaFuture<Map<String, KafkaFuture<ConsumerGroupDescription>>> futures) {
this.futures = futures;
}
/**
* Return a map from group name to futures which can be used to check the description of a consumer group.
*/
public KafkaFuture<Map<String, KafkaFuture<ConsumerGroupDescription>>> describedGroups() {
return futures;
}
}

View File

@ -27,6 +27,9 @@ import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
@ -79,6 +82,8 @@ import org.apache.kafka.common.requests.DeleteAclsRequest;
import org.apache.kafka.common.requests.DeleteAclsResponse;
import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.apache.kafka.common.requests.DeleteRecordsRequest;
import org.apache.kafka.common.requests.DeleteRecordsResponse;
import org.apache.kafka.common.requests.DeleteTopicsRequest;
@ -97,6 +102,14 @@ import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DescribeGroupsResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.Resource;
import org.apache.kafka.common.requests.ResourceType;
import org.apache.kafka.common.security.token.delegation.DelegationToken;
@ -105,9 +118,11 @@ import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -2209,4 +2224,325 @@ public class KafkaAdminClient extends AdminClient {
return new DescribeDelegationTokenResult(tokensFuture);
}
@Override
public DescribeConsumerGroupsResult describeConsumerGroups(final Collection<String> groupIds,
final DescribeConsumerGroupsOptions options) {
final KafkaFutureImpl<Map<String, KafkaFuture<ConsumerGroupDescription>>> resultFutures = new KafkaFutureImpl<>();
final Map<String, KafkaFutureImpl<ConsumerGroupDescription>> consumerGroupFutures = new HashMap<>(groupIds.size());
final ArrayList<String> groupIdList = new ArrayList<>();
for (String groupId : groupIds) {
if (!consumerGroupFutures.containsKey(groupId)) {
consumerGroupFutures.put(groupId, new KafkaFutureImpl<ConsumerGroupDescription>());
groupIdList.add(groupId);
}
}
for (final String groupId : groupIdList) {
final long nowFindCoordinator = time.milliseconds();
final long deadline = calcDeadlineMs(nowFindCoordinator, options.timeoutMs());
runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId);
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
final long nowDescribeConsumerGroups = time.milliseconds();
final int nodeId = response.node().id();
runnable.call(new Call("describeConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId)) {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new DescribeGroupsRequest.Builder(groupIdList);
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse;
// Handle server responses for particular groupId.
for (Map.Entry<String, KafkaFutureImpl<ConsumerGroupDescription>> entry : consumerGroupFutures.entrySet()) {
final String groupId = entry.getKey();
final KafkaFutureImpl<ConsumerGroupDescription> future = entry.getValue();
final DescribeGroupsResponse.GroupMetadata groupMetadata = response.groups().get(groupId);
final Errors groupError = groupMetadata.error();
if (groupError != Errors.NONE) {
future.completeExceptionally(groupError.exception());
continue;
}
final String protocolType = groupMetadata.protocolType();
if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) {
final List<DescribeGroupsResponse.GroupMember> members = groupMetadata.members();
final List<MemberDescription> consumers = new ArrayList<>(members.size());
for (DescribeGroupsResponse.GroupMember groupMember : members) {
final PartitionAssignor.Assignment assignment =
ConsumerProtocol.deserializeAssignment(
ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment())));
final MemberDescription memberDescription =
new MemberDescription(
groupMember.memberId(),
groupMember.clientId(),
groupMember.clientHost(),
new MemberAssignment(assignment.partitions()));
consumers.add(memberDescription);
}
final String protocol = groupMetadata.protocol();
final ConsumerGroupDescription consumerGroupDescription =
new ConsumerGroupDescription(groupId, protocolType.isEmpty(), consumers, protocol);
future.complete(consumerGroupDescription);
}
}
}
@Override
void handleFailure(Throwable throwable) {
completeAllExceptionally(consumerGroupFutures.values(), throwable);
}
}, nowDescribeConsumerGroups);
resultFutures.complete(new HashMap<String, KafkaFuture<ConsumerGroupDescription>>(consumerGroupFutures));
}
@Override
void handleFailure(Throwable throwable) {
resultFutures.completeExceptionally(throwable);
}
}, nowFindCoordinator);
}
return new DescribeConsumerGroupsResult(resultFutures);
}
@Override
public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) {
//final KafkaFutureImpl<Map<Node, KafkaFuture<Collection<ConsumerGroupListing>>>> nodeAndConsumerGroupListing = new KafkaFutureImpl<>();
final KafkaFutureImpl<Collection<ConsumerGroupListing>> future = new KafkaFutureImpl<Collection<ConsumerGroupListing>>();
final long nowMetadata = time.milliseconds();
final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs());
runnable.call(new Call("listNodes", deadline, new LeastLoadedNodeProvider()) {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new MetadataRequest.Builder(Collections.<String>emptyList(), true);
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
MetadataResponse metadataResponse = (MetadataResponse) abstractResponse;
final Map<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> futures = new HashMap<>();
for (final Node node : metadataResponse.brokers()) {
futures.put(node, new KafkaFutureImpl<Collection<ConsumerGroupListing>>());
}
future.combine(futures.values().toArray(new KafkaFuture[0])).thenApply(
new KafkaFuture.BaseFunction<Collection<ConsumerGroupListing>, Collection<ConsumerGroupListing>>() {
@Override
public Collection<ConsumerGroupListing> apply(Collection<ConsumerGroupListing> v) {
List<ConsumerGroupListing> listings = new ArrayList<>();
for (Map.Entry<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> entry : futures.entrySet()) {
Collection<ConsumerGroupListing> results;
try {
results = entry.getValue().get();
} catch (Throwable e) {
// This should be unreachable, since the future returned by KafkaFuture#allOf should
// have failed if any Future failed.
throw new KafkaException("ListConsumerGroupsResult#listings(): internal error", e);
}
listings.addAll(results);
}
return listings;
}
});
for (final Map.Entry<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> entry : futures.entrySet()) {
final long nowList = time.milliseconds();
final int brokerId = entry.getKey().id();
runnable.call(new Call("listConsumerGroups", deadline, new ConstantNodeIdProvider(brokerId)) {
private final KafkaFutureImpl<Collection<ConsumerGroupListing>> future = entry.getValue();
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new ListGroupsRequest.Builder();
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
final ListGroupsResponse response = (ListGroupsResponse) abstractResponse;
final List<ConsumerGroupListing> groupsListing = new ArrayList<>();
for (ListGroupsResponse.Group group : response.groups()) {
if (group.protocolType().equals(ConsumerProtocol.PROTOCOL_TYPE) || group.protocolType().isEmpty()) {
final String groupId = group.groupId();
final String protocolType = group.protocolType();
final ConsumerGroupListing groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty());
groupsListing.add(groupListing);
}
}
future.complete(groupsListing);
}
@Override
void handleFailure(Throwable throwable) {
completeAllExceptionally(futures.values(), throwable);
}
}, nowList);
}
}
@Override
void handleFailure(Throwable throwable) {
future.completeExceptionally(throwable);
}
}, nowMetadata);
return new ListConsumerGroupsResult(future);
}
@Override
public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId, final ListConsumerGroupOffsetsOptions options) {
final KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>> groupOffsetListingFuture = new KafkaFutureImpl<>();
final long nowFindCoordinator = time.milliseconds();
final long deadline = calcDeadlineMs(nowFindCoordinator, options.timeoutMs());
runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId);
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
final long nowListConsumerGroupOffsets = time.milliseconds();
final int nodeId = response.node().id();
runnable.call(new Call("listConsumerGroupOffsets", deadline, new ConstantNodeIdProvider(nodeId)) {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new OffsetFetchRequest.Builder(groupId, options.topicPartitions());
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse;
final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry :
response.responseData().entrySet()) {
final TopicPartition topicPartition = entry.getKey();
final Long offset = entry.getValue().offset;
final String metadata = entry.getValue().metadata;
groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, metadata));
}
groupOffsetListingFuture.complete(groupOffsetsListing);
}
@Override
void handleFailure(Throwable throwable) {
groupOffsetListingFuture.completeExceptionally(throwable);
}
}, nowListConsumerGroupOffsets);
}
@Override
void handleFailure(Throwable throwable) {
groupOffsetListingFuture.completeExceptionally(throwable);
}
}, nowFindCoordinator);
return new ListConsumerGroupOffsetsResult(groupOffsetListingFuture);
}
@Override
public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options) {
final KafkaFutureImpl<Map<String, KafkaFuture<Void>>> deleteConsumerGroupsFuture = new KafkaFutureImpl<>();
final Map<String, KafkaFutureImpl<Void>> deleteConsumerGroupFutures = new HashMap<>(groupIds.size());
final Set<String> groupIdList = new HashSet<>();
for (String groupId : groupIds) {
if (!deleteConsumerGroupFutures.containsKey(groupId)) {
deleteConsumerGroupFutures.put(groupId, new KafkaFutureImpl<Void>());
groupIdList.add(groupId);
}
}
for (final String groupId : groupIdList) {
final long nowFindCoordinator = time.milliseconds();
final long deadline = calcDeadlineMs(nowFindCoordinator, options.timeoutMs());
runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId);
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
final long nowDeleteConsumerGroups = time.milliseconds();
final int nodeId = response.node().id();
runnable.call(new Call("deleteConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId)) {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new DeleteGroupsRequest.Builder(Collections.singleton(groupId));
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
final DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
// Handle server responses for particular groupId.
for (Map.Entry<String, KafkaFutureImpl<Void>> entry : deleteConsumerGroupFutures.entrySet()) {
final String groupId = entry.getKey();
final KafkaFutureImpl<Void> future = entry.getValue();
final Errors groupError = response.get(groupId);
if (groupError != Errors.NONE) {
future.completeExceptionally(groupError.exception());
continue;
}
future.complete(null);
}
}
@Override
void handleFailure(Throwable throwable) {
completeAllExceptionally(deleteConsumerGroupFutures.values(), throwable);
}
}, nowDeleteConsumerGroups);
deleteConsumerGroupsFuture.complete(new HashMap<String, KafkaFuture<Void>>(deleteConsumerGroupFutures));
}
@Override
void handleFailure(Throwable throwable) {
deleteConsumerGroupsFuture.completeExceptionally(throwable);
}
}, nowFindCoordinator);
}
return new DeleteConsumerGroupsResult(deleteConsumerGroupsFuture);
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.List;
/**
* Options for {@link AdminClient#listConsumerGroupOffsets(String)}.
* <p>
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class ListConsumerGroupOffsetsOptions extends AbstractOptions<ListConsumerGroupOffsetsOptions> {
private List<TopicPartition> topicPartitions = null;
/**
* Set the topic partitions to list as part of the result.
* {@code null} includes all topic partitions.
*
* @param topicPartitions List of topic partitions to include
* @return This ListGroupOffsetsOptions
*/
public ListConsumerGroupOffsetsOptions topicPartitions(List<TopicPartition> topicPartitions) {
this.topicPartitions = topicPartitions;
return this;
}
/**
* Returns a list of topic partitions to add as part of the result.
*/
public List<TopicPartition> topicPartitions() {
return topicPartitions;
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Map;
/**
* The result of the {@link AdminClient#listConsumerGroupOffsets(String)} call.
* <p>
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class ListConsumerGroupOffsetsResult {
final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future;
ListConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
this.future = future;
}
/**
* Return a future which yields a map of topic partitions to OffsetAndMetadata objects.
*/
public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata() {
return future;
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* Options for {@link AdminClient#listConsumerGroups()}.
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> {
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
/**
* The result of the {@link AdminClient#listConsumerGroups()} call.
* <p>
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class ListConsumerGroupsResult {
private final KafkaFuture<Collection<ConsumerGroupListing>> future;
ListConsumerGroupsResult(KafkaFuture<Collection<ConsumerGroupListing>> future) {
this.future = future;
}
/**
* Return a future which yields a collection of ConsumerGroupListing objects.
*/
public KafkaFuture<Collection<ConsumerGroupListing>> listings() {
return future;
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
/**
* A description of the assignments of a specific group member.
*/
public class MemberAssignment {
private final List<TopicPartition> topicPartitions;
/**
* Creates an instance with the specified parameters.
*
* @param topicPartitions List of topic partitions
*/
public MemberAssignment(List<TopicPartition> topicPartitions) {
this.topicPartitions = topicPartitions;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MemberAssignment that = (MemberAssignment) o;
return topicPartitions != null ? topicPartitions.equals(that.topicPartitions) : that.topicPartitions == null;
}
@Override
public int hashCode() {
return topicPartitions != null ? topicPartitions.hashCode() : 0;
}
/**
* The topic partitions assigned to a group member.
*/
public List<TopicPartition> topicPartitions() {
return topicPartitions;
}
@Override
public String toString() {
return "(topicPartitions=" + Utils.join(topicPartitions, ",") + ")";
}
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
/**
* A detailed description of a single group instance in the cluster.
*/
public class MemberDescription {
private final String memberId;
private final String clientId;
private final String host;
private final MemberAssignment assignment;
/**
* Creates an instance with the specified parameters.
*
* @param memberId The consumer id
* @param clientId The client id
* @param host The host
* @param assignment The assignment
*/
public MemberDescription(String memberId, String clientId, String host, MemberAssignment assignment) {
this.memberId = memberId;
this.clientId = clientId;
this.host = host;
this.assignment = assignment;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MemberDescription that = (MemberDescription) o;
if (memberId != null ? !memberId.equals(that.memberId) : that.memberId != null) return false;
if (clientId != null ? !clientId.equals(that.clientId) : that.clientId != null) return false;
return assignment != null ? assignment.equals(that.assignment) : that.assignment == null;
}
@Override
public int hashCode() {
int result = memberId != null ? memberId.hashCode() : 0;
result = 31 * result + (clientId != null ? clientId.hashCode() : 0);
result = 31 * result + (assignment != null ? assignment.hashCode() : 0);
return result;
}
/**
* The consumer id of the group member.
*/
public String consumerId() {
return memberId;
}
/**
* The client id of the group member.
*/
public String clientId() {
return clientId;
}
/**
* The host where the group member is running.
*/
public String host() {
return host;
}
/**
* The assignment of the group member.
*/
public MemberAssignment assignment() {
return assignment;
}
@Override
public String toString() {
return "(memberId=" + memberId + ", clientId=" + clientId + ", host=" + host + ", assignment=" +
assignment + ")";
}
}

View File

@ -106,6 +106,15 @@ public abstract class KafkaFuture<T> implements Future<T> {
return allOfFuture;
}
public KafkaFuture<T> combine(KafkaFuture<?>... futures) {
AllOfAdapter<Object> allOfWaiter = new AllOfAdapter<>(futures.length, this);
for (KafkaFuture<?> future : futures) {
future.addWaiter(allOfWaiter);
}
return this;
}
/**
* Returns a new KafkaFuture that, when this future completes normally, is executed with this
* futures's result as the argument to the supplied function.

View File

@ -18,6 +18,9 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
@ -47,10 +50,15 @@ import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.DeleteAclsResponse;
import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
import org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.apache.kafka.common.requests.DeleteRecordsResponse;
import org.apache.kafka.common.requests.DescribeAclsResponse;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeGroupsResponse;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.resource.Resource;
import org.apache.kafka.common.resource.ResourceFilter;
import org.apache.kafka.common.resource.ResourceType;
@ -64,6 +72,7 @@ import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -81,6 +90,7 @@ import static org.apache.kafka.common.requests.ResourceType.BROKER;
import static org.apache.kafka.common.requests.ResourceType.TOPIC;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -636,6 +646,188 @@ public class KafkaAdminClientTest {
}
}
//Ignoring test to be fixed on follow-up PR
@Ignore
@Test
public void testListConsumerGroups() throws Exception {
final HashMap<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
final Cluster cluster =
new Cluster(
"mockClusterId",
nodes.values(),
Collections.<PartitionInfo>emptyList(),
Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
env.kafkaClient().setNode(env.cluster().controller());
env.kafkaClient().prepareResponse(
new MetadataResponse(
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
env.cluster().controller().id(),
new ArrayList<MetadataResponse.TopicMetadata>()));
env.kafkaClient().prepareResponse(
new ListGroupsResponse(
Errors.NONE,
Arrays.asList(
new ListGroupsResponse.Group("group-1", ConsumerProtocol.PROTOCOL_TYPE),
new ListGroupsResponse.Group("group-connect-1", "connector")
)));
final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups();
final List<ConsumerGroupListing> consumerGroups = new ArrayList<>();
final KafkaFuture<Collection<ConsumerGroupListing>> listings = result.listings();
consumerGroups.addAll(listings.get());
assertEquals(1, consumerGroups.size());
}
}
@Test
public void testDescribeConsumerGroups() throws Exception {
final HashMap<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
final Cluster cluster =
new Cluster(
"mockClusterId",
nodes.values(),
Collections.<PartitionInfo>emptyList(),
Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
env.kafkaClient().setNode(env.cluster().controller());
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
final Map<String, DescribeGroupsResponse.GroupMetadata> groupMetadataMap = new HashMap<>();
TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
final List<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(0, myTopicPartition0);
topicPartitions.add(1, myTopicPartition1);
topicPartitions.add(2, myTopicPartition2);
final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(topicPartitions));
groupMetadataMap.put(
"group-0",
new DescribeGroupsResponse.GroupMetadata(
Errors.NONE,
"",
ConsumerProtocol.PROTOCOL_TYPE,
"",
Arrays.asList(
new DescribeGroupsResponse.GroupMember("0", "clientId0", "clientHost", null, memberAssignment),
new DescribeGroupsResponse.GroupMember("1", "clientId1", "clientHost", null, memberAssignment))));
groupMetadataMap.put(
"group-connect-0",
new DescribeGroupsResponse.GroupMetadata(
Errors.NONE,
"",
"connect",
"",
Arrays.asList(
new DescribeGroupsResponse.GroupMember("0", "clientId0", "clientHost", null, memberAssignment),
new DescribeGroupsResponse.GroupMember("1", "clientId1", "clientHost", null, memberAssignment))));
env.kafkaClient().prepareResponse(new DescribeGroupsResponse(groupMetadataMap));
final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(Collections.singletonList("group-0"));
final KafkaFuture<ConsumerGroupDescription> groupDescriptionFuture = result.describedGroups().get().get("group-0");
final ConsumerGroupDescription groupDescription = groupDescriptionFuture.get();
assertEquals(1, result.describedGroups().get().size());
assertEquals("group-0", groupDescription.groupId());
assertEquals(2, groupDescription.members().size());
}
}
@Test
public void testDescribeConsumerGroupOffsets() throws Exception {
final HashMap<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
final Cluster cluster =
new Cluster(
"mockClusterId",
nodes.values(),
Collections.<PartitionInfo>emptyList(),
Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
env.kafkaClient().setNode(env.cluster().controller());
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
final Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<>();
responseData.put(myTopicPartition0, new OffsetFetchResponse.PartitionData(10, "", Errors.NONE));
responseData.put(myTopicPartition1, new OffsetFetchResponse.PartitionData(0, "", Errors.NONE));
responseData.put(myTopicPartition2, new OffsetFetchResponse.PartitionData(20, "", Errors.NONE));
env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NONE, responseData));
final ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets("group-0");
assertEquals(3, result.partitionsToOffsetAndMetadata().get().size());
final TopicPartition topicPartition = result.partitionsToOffsetAndMetadata().get().keySet().iterator().next();
assertEquals("my_topic", topicPartition.topic());
final OffsetAndMetadata offsetAndMetadata = result.partitionsToOffsetAndMetadata().get().values().iterator().next();
assertEquals(10, offsetAndMetadata.offset());
}
}
@Test
public void testDeleteConsumerGroups() throws Exception {
final HashMap<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
final Cluster cluster =
new Cluster(
"mockClusterId",
nodes.values(),
Collections.<PartitionInfo>emptyList(),
Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
final List<String> groupIds = Collections.singletonList("group-0");
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
env.kafkaClient().setNode(env.cluster().controller());
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
final Map<String, Errors> response = new HashMap<>();
response.put("group-0", Errors.NONE);
env.kafkaClient().prepareResponse(new DeleteGroupsResponse(response));
final DeleteConsumerGroupsResult result = env.adminClient().deleteConsumerGroups(groupIds);
final Map<String, KafkaFuture<Void>> results = result.deletedGroups().get();
assertNull(results.get("group-0").get());
}
}
private static <T> void assertCollectionIs(Collection<T> collection, T... elements) {
for (T element : elements) {
assertTrue("Did not find " + element, collection.contains(element));

View File

@ -296,6 +296,26 @@ public class MockAdminClient extends AdminClient {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");