KAFKA-18629: ShareGroupDeleteState admin client impl. (#18928)

* In this PR, we add various infra classes needed to support the
`deleteShareGroups` functionality via the `kafka-share-groups.sh`
script, as well as the implementation of `kafka-share-groups.sh --delete`.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Sushant Mahajan 2025-02-22 21:51:10 +05:30 committed by GitHub
parent 6e45ab7d84
commit 3fc103b48b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 886 additions and 251 deletions

View File

@ -1841,6 +1841,23 @@ public interface Admin extends AutoCloseable {
return listShareGroupOffsets(groupSpecs, new ListShareGroupOffsetsOptions());
}
/**
* Delete share groups from the cluster with the default options.
*
* @return The DeleteShareGroupsResult.
*/
default DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds) {
return deleteShareGroups(groupIds, new DeleteShareGroupsOptions());
}
/**
* Delete share groups from the cluster.
*
* @param options The options to use when deleting a share group.
* @return The DeleteShareGroupsResult.
*/
DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options);
/**
* Describe some classic groups in the cluster.
*

View File

@ -14,42 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
/**
* The result of the {@link Admin#deleteConsumerGroups(Collection)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
* The result of the {@link Admin#deleteConsumerGroups(Collection <String>, DeleteConsumerGroupsOptions)} call.
*/
@InterfaceStability.Evolving
public class DeleteConsumerGroupsResult {
private final Map<String, KafkaFuture<Void>> futures;
DeleteConsumerGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
this.futures = futures;
}
/**
* Return a map from group id to futures which can be used to check the status of
* individual deletions.
*/
public Map<String, KafkaFuture<Void>> deletedGroups() {
Map<String, KafkaFuture<Void>> deletedGroups = new HashMap<>(futures.size());
deletedGroups.putAll(futures);
return deletedGroups;
}
/**
* Return a future which succeeds only if all the consumer group deletions succeed.
*/
public KafkaFuture<Void> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture<?>[0]));
public class DeleteConsumerGroupsResult extends DeleteGroupsResult {
public DeleteConsumerGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
super(futures);
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
/**
* The parent class of result of the {@link Admin#deleteConsumerGroups(Collection)},
* {@link Admin#deleteShareGroups(Collection)} calls.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public abstract class DeleteGroupsResult {
private final Map<String, KafkaFuture<Void>> futures;
DeleteGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
this.futures = futures;
}
/**
* Return a map from group id to futures which can be used to check the status of
* individual deletions.
*/
public Map<String, KafkaFuture<Void>> deletedGroups() {
Map<String, KafkaFuture<Void>> deletedGroups = new HashMap<>(futures.size());
deletedGroups.putAll(futures);
return deletedGroups;
}
/**
* Return a future which succeeds only if all the group deletions succeed.
*/
public KafkaFuture<Void> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture<?>[0]));
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
/**
* Options for the {@link Admin#deleteShareGroups(Collection <String>, DeleteShareGroupsOptions)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DeleteShareGroupsOptions extends AbstractOptions<DeleteShareGroupsOptions> {
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import java.util.Collection;
import java.util.Map;
/**
* The result of the {@link Admin#deleteShareGroups(Collection <String>, DeleteShareGroupsOptions)} call.
*/
public class DeleteShareGroupsResult extends DeleteGroupsResult {
DeleteShareGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
super(futures);
}
}

View File

@ -308,6 +308,11 @@ public class ForwardingAdmin implements Admin {
return delegate.listShareGroupOffsets(groupSpecs, options);
}
@Override
public DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options) {
return delegate.deleteShareGroups(groupIds, options);
}
@Override
public ListGroupsResult listGroups(ListGroupsOptions options) {
return delegate.listGroups(options);

View File

@ -50,6 +50,7 @@ import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DeleteRecordsHandler;
import org.apache.kafka.clients.admin.internals.DeleteShareGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeClassicGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeProducersHandler;
@ -3829,6 +3830,16 @@ public class KafkaAdminClient extends AdminClient {
.collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
}
@Override
public DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, Void> future =
DeleteShareGroupsHandler.newFuture(groupIds);
DeleteShareGroupsHandler handler = new DeleteShareGroupsHandler(logContext);
invokeDriver(handler, future, options.timeoutMs);
return new DeleteShareGroupsResult(future.all().entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
}
@Override
public Map<MetricName, ? extends Metric> metrics() {
return Collections.unmodifiableMap(this.metrics.metrics());

View File

@ -16,37 +16,14 @@
*/
package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.message.DeleteGroupsRequestData;
import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class DeleteConsumerGroupsHandler extends AdminApiHandler.Batched<CoordinatorKey, Void> {
private final Logger log;
private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
public class DeleteConsumerGroupsHandler extends DeleteGroupsHandler {
public DeleteConsumerGroupsHandler(
LogContext logContext
) {
this.log = logContext.logger(DeleteConsumerGroupsHandler.class);
this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext);
super(logContext, DeleteConsumerGroupsHandler.class);
}
@Override
@ -55,92 +32,7 @@ public class DeleteConsumerGroupsHandler extends AdminApiHandler.Batched<Coordin
}
@Override
public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
return lookupStrategy;
public String displayName() {
return "DeleteConsumerGroups";
}
public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Void> newFuture(
Collection<String> groupIds
) {
return AdminApiFuture.forKeys(buildKeySet(groupIds));
}
private static Set<CoordinatorKey> buildKeySet(Collection<String> groupIds) {
return groupIds.stream()
.map(CoordinatorKey::byGroupId)
.collect(Collectors.toSet());
}
@Override
public DeleteGroupsRequest.Builder buildBatchedRequest(
int coordinatorId,
Set<CoordinatorKey> keys
) {
List<String> groupIds = keys.stream().map(key -> key.idValue).collect(Collectors.toList());
DeleteGroupsRequestData data = new DeleteGroupsRequestData()
.setGroupsNames(groupIds);
return new DeleteGroupsRequest.Builder(data);
}
@Override
public ApiResult<CoordinatorKey, Void> handleResponse(
Node coordinator,
Set<CoordinatorKey> groupIds,
AbstractResponse abstractResponse
) {
final DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
final Map<CoordinatorKey, Void> completed = new HashMap<>();
final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
for (DeletableGroupResult deletedGroup : response.data().results()) {
CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(deletedGroup.groupId());
Errors error = Errors.forCode(deletedGroup.errorCode());
if (error != Errors.NONE) {
handleError(groupIdKey, error, failed, groupsToUnmap);
continue;
}
completed.put(groupIdKey, null);
}
return new ApiResult<>(completed, failed, new ArrayList<>(groupsToUnmap));
}
private void handleError(
CoordinatorKey groupId,
Errors error,
Map<CoordinatorKey, Throwable> failed,
Set<CoordinatorKey> groupsToUnmap
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
case INVALID_GROUP_ID:
case NON_EMPTY_GROUP:
case GROUP_ID_NOT_FOUND:
log.debug("`DeleteConsumerGroups` request for group id {} failed due to error {}", groupId.idValue, error);
failed.put(groupId, error.exception());
break;
case COORDINATOR_LOAD_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("`DeleteConsumerGroups` request for group id {} failed because the coordinator " +
"is still in the process of loading state. Will retry", groupId.idValue);
break;
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
// If the coordinator is unavailable or there was a coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("`DeleteConsumerGroups` request for group id {} returned error {}. " +
"Will attempt to find the coordinator again and retry", groupId.idValue, error);
groupsToUnmap.add(groupId);
break;
default:
log.error("`DeleteConsumerGroups` request for group id {} failed due to unexpected error {}", groupId.idValue, error);
failed.put(groupId, error.exception());
}
}
}

View File

@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.message.DeleteGroupsRequestData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public abstract class DeleteGroupsHandler extends AdminApiHandler.Batched<CoordinatorKey, Void> {
private final Logger log;
private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
public DeleteGroupsHandler(
LogContext logContext,
Class<?> loggerClass
) {
this.log = logContext.logger(loggerClass);
this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.GROUP, logContext);
}
public abstract String apiName();
public abstract String displayName();
@Override
public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
return lookupStrategy;
}
public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Void> newFuture(
Collection<String> groupIds
) {
return AdminApiFuture.forKeys(buildKeySet(groupIds));
}
private static Set<CoordinatorKey> buildKeySet(Collection<String> groupIds) {
return groupIds.stream()
.map(CoordinatorKey::byGroupId)
.collect(Collectors.toSet());
}
@Override
public DeleteGroupsRequest.Builder buildBatchedRequest(
int coordinatorId,
Set<CoordinatorKey> keys
) {
List<String> groupIds = keys.stream().map(key -> key.idValue).collect(Collectors.toList());
DeleteGroupsRequestData data = new DeleteGroupsRequestData()
.setGroupsNames(groupIds);
return new DeleteGroupsRequest.Builder(data);
}
@Override
public ApiResult<CoordinatorKey, Void> handleResponse(
Node coordinator,
Set<CoordinatorKey> groupIds,
AbstractResponse abstractResponse
) {
final DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
final Map<CoordinatorKey, Void> completed = new HashMap<>();
final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
for (DeleteGroupsResponseData.DeletableGroupResult deletedGroup : response.data().results()) {
CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(deletedGroup.groupId());
Errors error = Errors.forCode(deletedGroup.errorCode());
if (error != Errors.NONE) {
handleError(groupIdKey, error, failed, groupsToUnmap);
continue;
}
completed.put(groupIdKey, null);
}
return new ApiResult<>(completed, failed, new ArrayList<>(groupsToUnmap));
}
private void handleError(
CoordinatorKey groupId,
Errors error,
Map<CoordinatorKey, Throwable> failed,
Set<CoordinatorKey> groupsToUnmap
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
case INVALID_GROUP_ID:
case NON_EMPTY_GROUP:
case GROUP_ID_NOT_FOUND:
log.debug("`{}` request for group id {} failed due to error {}", displayName(), groupId.idValue, error);
failed.put(groupId, error.exception());
break;
case COORDINATOR_LOAD_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("`{}` request for group id {} failed because the coordinator " +
"is still in the process of loading state. Will retry", displayName(), groupId.idValue);
break;
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
// If the coordinator is unavailable or there was a coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("`{}` request for group id {} returned error {}. " +
"Will attempt to find the coordinator again and retry", displayName(), groupId.idValue, error);
groupsToUnmap.add(groupId);
break;
default:
log.error("`{}` request for group id {} failed due to unexpected error {}", displayName(), groupId.idValue, error);
failed.put(groupId, error.exception());
}
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.common.utils.LogContext;
public class DeleteShareGroupsHandler extends DeleteGroupsHandler {
public DeleteShareGroupsHandler(
LogContext logContext
) {
super(logContext, DeleteShareGroupsHandler.class);
}
@Override
public String apiName() {
return "deleteShareGroups";
}
@Override
public String displayName() {
return "DeleteShareGroups";
}
}

View File

@ -1399,6 +1399,11 @@ public class MockAdminClient extends AdminClient {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public synchronized DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public synchronized DescribeClassicGroupsResult describeClassicGroups(Collection<String> groupIds, DescribeClassicGroupsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");

View File

@ -14,117 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.apache.kafka.common.utils.LogContext;
import org.junit.jupiter.api.Test;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
public class DeleteConsumerGroupsHandlerTest {
private final LogContext logContext = new LogContext();
private final String groupId1 = "group-id1";
@Test
public void testBuildRequest() {
DeleteConsumerGroupsHandler handler = new DeleteConsumerGroupsHandler(logContext);
DeleteGroupsRequest request = handler.buildBatchedRequest(1, singleton(CoordinatorKey.byGroupId(groupId1))).build();
assertEquals(1, request.data().groupsNames().size());
assertEquals(groupId1, request.data().groupsNames().get(0));
}
@Test
public void testSuccessfulHandleResponse() {
assertCompleted(handleWithError(Errors.NONE));
}
@Test
public void testUnmappedHandleResponse() {
assertUnmapped(handleWithError(Errors.NOT_COORDINATOR));
assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE));
}
@Test
public void testRetriableHandleResponse() {
assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
}
@Test
public void testFailedHandleResponse() {
assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED));
assertFailed(GroupIdNotFoundException.class, handleWithError(Errors.GROUP_ID_NOT_FOUND));
assertFailed(InvalidGroupIdException.class, handleWithError(Errors.INVALID_GROUP_ID));
assertFailed(GroupNotEmptyException.class, handleWithError(Errors.NON_EMPTY_GROUP));
}
private DeleteGroupsResponse buildResponse(Errors error) {
return new DeleteGroupsResponse(
new DeleteGroupsResponseData()
.setResults(new DeletableGroupResultCollection(singletonList(
new DeletableGroupResult()
.setErrorCode(error.code())
.setGroupId(groupId1)).iterator())));
}
private AdminApiHandler.ApiResult<CoordinatorKey, Void> handleWithError(
Errors error
) {
DeleteConsumerGroupsHandler handler = new DeleteConsumerGroupsHandler(logContext);
DeleteGroupsResponse response = buildResponse(error);
return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId1)), response);
}
private void assertUnmapped(
AdminApiHandler.ApiResult<CoordinatorKey, Void> result
) {
assertEquals(emptySet(), result.completedKeys.keySet());
assertEquals(emptySet(), result.failedKeys.keySet());
assertEquals(singletonList(CoordinatorKey.byGroupId(groupId1)), result.unmappedKeys);
}
private void assertRetriable(
AdminApiHandler.ApiResult<CoordinatorKey, Void> result
) {
assertEquals(emptySet(), result.completedKeys.keySet());
assertEquals(emptySet(), result.failedKeys.keySet());
assertEquals(emptyList(), result.unmappedKeys);
}
private void assertCompleted(
AdminApiHandler.ApiResult<CoordinatorKey, Void> result
) {
CoordinatorKey key = CoordinatorKey.byGroupId(groupId1);
assertEquals(emptySet(), result.failedKeys.keySet());
assertEquals(emptyList(), result.unmappedKeys);
assertEquals(singleton(key), result.completedKeys.keySet());
}
private void assertFailed(
Class<? extends Throwable> expectedExceptionType,
AdminApiHandler.ApiResult<CoordinatorKey, Void> result
) {
CoordinatorKey key = CoordinatorKey.byGroupId(groupId1);
assertEquals(emptySet(), result.completedKeys.keySet());
assertEquals(emptyList(), result.unmappedKeys);
assertEquals(singleton(key), result.failedKeys.keySet());
assertInstanceOf(expectedExceptionType, result.failedKeys.get(key));
public class DeleteConsumerGroupsHandlerTest extends DeleteGroupsHandlerTest {
protected DeleteGroupsHandler handler() {
return new DeleteConsumerGroupsHandler(new LogContext());
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
public abstract class DeleteGroupsHandlerTest {
private final String groupId1 = "group-id1";
private DeleteGroupsHandler handler;
@BeforeEach
public void setup() {
handler = handler();
}
protected abstract DeleteGroupsHandler handler();
@Test
public void testBuildRequest() {
DeleteGroupsRequest request = handler.buildBatchedRequest(1, singleton(CoordinatorKey.byGroupId(groupId1))).build();
assertEquals(1, request.data().groupsNames().size());
assertEquals(groupId1, request.data().groupsNames().get(0));
}
@Test
public void testSuccessfulHandleResponse() {
assertCompleted(handleWithError(Errors.NONE));
}
@Test
public void testUnmappedHandleResponse() {
assertUnmapped(handleWithError(Errors.NOT_COORDINATOR));
assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE));
}
@Test
public void testRetriableHandleResponse() {
assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
}
@Test
public void testFailedHandleResponse() {
assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED));
assertFailed(GroupIdNotFoundException.class, handleWithError(Errors.GROUP_ID_NOT_FOUND));
assertFailed(InvalidGroupIdException.class, handleWithError(Errors.INVALID_GROUP_ID));
assertFailed(GroupNotEmptyException.class, handleWithError(Errors.NON_EMPTY_GROUP));
}
private DeleteGroupsResponse buildResponse(Errors error) {
return new DeleteGroupsResponse(
new DeleteGroupsResponseData()
.setResults(new DeletableGroupResultCollection(singletonList(
new DeletableGroupResult()
.setErrorCode(error.code())
.setGroupId(groupId1)).iterator())));
}
private AdminApiHandler.ApiResult<CoordinatorKey, Void> handleWithError(
Errors error
) {
DeleteGroupsResponse response = buildResponse(error);
return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId1)), response);
}
private void assertUnmapped(
AdminApiHandler.ApiResult<CoordinatorKey, Void> result
) {
assertEquals(emptySet(), result.completedKeys.keySet());
assertEquals(emptySet(), result.failedKeys.keySet());
assertEquals(singletonList(CoordinatorKey.byGroupId(groupId1)), result.unmappedKeys);
}
private void assertRetriable(
AdminApiHandler.ApiResult<CoordinatorKey, Void> result
) {
assertEquals(emptySet(), result.completedKeys.keySet());
assertEquals(emptySet(), result.failedKeys.keySet());
assertEquals(emptyList(), result.unmappedKeys);
}
private void assertCompleted(
AdminApiHandler.ApiResult<CoordinatorKey, Void> result
) {
CoordinatorKey key = CoordinatorKey.byGroupId(groupId1);
assertEquals(emptySet(), result.failedKeys.keySet());
assertEquals(emptyList(), result.unmappedKeys);
assertEquals(singleton(key), result.completedKeys.keySet());
}
private void assertFailed(
Class<? extends Throwable> expectedExceptionType,
AdminApiHandler.ApiResult<CoordinatorKey, Void> result
) {
CoordinatorKey key = CoordinatorKey.byGroupId(groupId1);
assertEquals(emptySet(), result.completedKeys.keySet());
assertEquals(emptyList(), result.unmappedKeys);
assertEquals(singleton(key), result.failedKeys.keySet());
assertInstanceOf(expectedExceptionType, result.failedKeys.get(key));
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.common.utils.LogContext;
public class DeleteShareGroupsHandlerTest extends DeleteGroupsHandlerTest {
protected DeleteGroupsHandler handler() {
return new DeleteShareGroupsHandler(new LogContext());
}
}

View File

@ -55,6 +55,8 @@ import org.apache.kafka.clients.admin.DeleteConsumerGroupsOptions;
import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
import org.apache.kafka.clients.admin.DeleteRecordsOptions;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.DeleteShareGroupsOptions;
import org.apache.kafka.clients.admin.DeleteShareGroupsResult;
import org.apache.kafka.clients.admin.DeleteTopicsOptions;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeAclsOptions;
@ -428,6 +430,11 @@ public class TestingMetricsInterceptingAdminClient extends AdminClient {
return adminDelegate.listShareGroupOffsets(groupSpecs, options);
}
@Override
public DeleteShareGroupsResult deleteShareGroups(final Collection<String> groupIds, final DeleteShareGroupsOptions options) {
return adminDelegate.deleteShareGroups(groupIds, options);
}
@Override
public DescribeClassicGroupsResult describeClassicGroups(final Collection<String> groupIds, final DescribeClassicGroupsOptions options) {
return adminDelegate.describeClassicGroups(groupIds, options);

View File

@ -17,7 +17,9 @@
package org.apache.kafka.tools.consumer.group;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AbstractOptions;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteShareGroupsOptions;
import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListGroupsOptions;
@ -30,6 +32,7 @@ import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandLineUtils;
@ -40,6 +43,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -81,7 +85,7 @@ public class ShareGroupCommand {
} else if (opts.options.has(opts.describeOpt)) {
shareGroupService.describeGroups();
} else if (opts.options.has(opts.deleteOpt)) {
throw new UnsupportedOperationException("--delete option is not yet implemented");
shareGroupService.deleteShareGroups();
} else if (opts.options.has(opts.resetOffsetsOpt)) {
throw new UnsupportedOperationException("--reset-offsets option is not yet implemented");
} else if (opts.options.has(opts.deleteOffsetsOpt)) {
@ -154,6 +158,18 @@ public class ShareGroupCommand {
}
}
List<GroupListing> listDetailedShareGroups() {
try {
ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions()
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
.withTypes(Set.of(GroupType.SHARE)));
Collection<GroupListing> listings = result.all().get();
return listings.stream().toList();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
List<GroupListing> listShareGroupsInStates(Set<GroupState> states) throws ExecutionException, InterruptedException {
ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions()
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
@ -208,6 +224,72 @@ public class ShareGroupCommand {
}
}
Map<String, Throwable> deleteShareGroups() {
List<GroupListing> shareGroupIds = listDetailedShareGroups();
List<String> groupIds = opts.options.has(opts.allGroupsOpt)
? shareGroupIds.stream().map(GroupListing::groupId).toList()
: opts.options.valuesOf(opts.groupOpt);
// Pre admin call checks
LinkedHashSet<String> groupIdSet = new LinkedHashSet<>(groupIds);
Map<String, Exception> errGroups = new HashMap<>();
for (String groupId : groupIdSet) {
Optional<GroupListing> listing = shareGroupIds.stream().filter(item -> item.groupId().equals(groupId)).findAny();
if (listing.isEmpty()) {
errGroups.put(groupId, new IllegalArgumentException("Group '" + groupId + "' is not a share group."));
} else {
Optional<GroupState> groupState = listing.get().groupState();
groupState.ifPresent(state -> {
if (state == GroupState.DEAD) {
errGroups.put(groupId, new IllegalStateException("Share group '" + groupId + "' group state is DEAD."));
} else if (state != GroupState.EMPTY) {
errGroups.put(groupId, new GroupNotEmptyException("Share group '" + groupId + "' is not EMPTY."));
}
});
}
}
groupIdSet.removeAll(errGroups.keySet());
Map<String, KafkaFuture<Void>> groupsToDelete = groupIdSet.isEmpty() ? Map.of() : adminClient.deleteShareGroups(
groupIdSet.stream().toList(),
withTimeoutMs(new DeleteShareGroupsOptions())
).deletedGroups();
Map<String, Throwable> success = new HashMap<>();
Map<String, Throwable> failed = new HashMap<>(errGroups);
groupsToDelete.forEach((g, f) -> {
try {
f.get();
success.put(g, null);
} catch (InterruptedException ie) {
failed.put(g, ie);
} catch (ExecutionException e) {
failed.put(g, e.getCause());
}
});
if (failed.isEmpty())
System.out.println("Deletion of requested share groups (" + success.keySet().stream().map(group -> "'" + group + "'").collect(Collectors.joining(", ")) + ") was successful.");
else {
printError("Deletion of some share groups failed:", Optional.empty());
failed.forEach((group, error) -> System.out.println("* Share group '" + group + "' could not be deleted due to: " + error));
if (!success.isEmpty())
System.out.println("\nThese share groups were deleted successfully: " + success.keySet().stream().map(group -> "'" + group + "'").collect(Collectors.joining(",")));
}
failed.putAll(success);
return failed;
}
private <T extends AbstractOptions<T>> T withTimeoutMs(T options) {
int t = opts.options.valueOf(opts.timeoutMsOpt).intValue();
return options.timeoutMs(t);
}
Map<String, ShareGroupDescription> describeShareGroups(Collection<String> groupIds) throws ExecutionException, InterruptedException {
Map<String, ShareGroupDescription> res = new HashMap<>();
Map<String, KafkaFuture<ShareGroupDescription>> stringKafkaFutureMap = adminClient.describeShareGroups(

View File

@ -173,9 +173,12 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions {
}
if (options.has(deleteOpt)) {
if (!options.has(groupOpt))
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
CommandLineUtils.printUsageAndExit(parser,
"Option " + deleteOpt + " takes the option: " + groupOpt);
String.format("Option %s takes the options %s or %s", deleteOpt, groupOpt, allGroupsOpt));
if (options.has(allGroupsOpt) && options.has(groupOpt))
CommandLineUtils.printUsageAndExit(parser,
String.format("Option %s takes either %s or %s, not both.", deleteOpt, groupOpt, allGroupsOpt));
if (options.has(topicOpt))
CommandLineUtils.printUsageAndExit(parser,
"Option " + deleteOpt + " does not take the option: " + topicOpt);

View File

@ -16,8 +16,10 @@
*/
package org.apache.kafka.tools.consumer.group;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
import org.apache.kafka.clients.admin.DeleteShareGroupsResult;
import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
import org.apache.kafka.clients.admin.DescribeShareGroupsResult;
import org.apache.kafka.clients.admin.GroupListing;
@ -36,11 +38,14 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.ToolsTestUtils;
import org.apache.kafka.tools.consumer.group.ShareGroupCommand.ShareGroupService;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
@ -48,7 +53,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -56,6 +63,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import joptsimple.OptionException;
@ -63,11 +71,15 @@ import joptsimple.OptionException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ShareGroupCommandTest {
@ -76,6 +88,18 @@ public class ShareGroupCommandTest {
private static final List<List<String>> DESCRIBE_TYPE_STATE = List.of(List.of("--state"), List.of("--state", "--verbose"));
private static final List<List<String>> DESCRIBE_TYPES = Stream.of(DESCRIBE_TYPE_OFFSETS, DESCRIBE_TYPE_MEMBERS, DESCRIBE_TYPE_STATE).flatMap(Collection::stream).toList();
@BeforeEach
public void setup() {
// nothing by default
Exit.setExitProcedure(((statusCode, message) -> {
}));
}
@AfterEach
public void teardown() {
Exit.resetExitProcedure();
}
@Test
public void testListShareGroups() throws Exception {
String firstGroup = "first-group";
@ -588,8 +612,260 @@ public class ShareGroupCommandTest {
assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.groupStatesFromString(" , ,"));
}
@Test
public void testDeleteShareGroupsArgs() {
String bootstrapServer = "localhost:9092";
Admin adminClient = mock(KafkaAdminClient.class);
mockListShareGroups(adminClient, new LinkedHashMap<>());
// no group spec args
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--delete"};
AtomicBoolean exited = new AtomicBoolean(false);
Exit.setExitProcedure(((statusCode, message) -> {
assertNotEquals(0, statusCode);
assertTrue(message.contains("Option [delete] takes the options [group] or [all-groups]"));
exited.set(true);
}));
try {
getShareGroupService(cgcArgs, adminClient);
} finally {
assertTrue(exited.get());
}
}
@Test
public void testDeleteShareGroupsSuccess() {
String firstGroup = "first-group";
String secondGroup = "second-group";
String bootstrapServer = "localhost:9092";
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--delete", "--group", firstGroup, "--group", secondGroup};
Admin adminClient = mock(KafkaAdminClient.class);
DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class);
Map<String, KafkaFuture<Void>> deletedGroups = Map.of(
firstGroup, KafkaFuture.completedFuture(null),
secondGroup, KafkaFuture.completedFuture(null)
);
LinkedHashMap<String, GroupState> shareGroupMap = new LinkedHashMap<>();
shareGroupMap.put(firstGroup, GroupState.EMPTY);
shareGroupMap.put(secondGroup, GroupState.EMPTY);
mockListShareGroups(adminClient, shareGroupMap);
when(result.deletedGroups()).thenReturn(deletedGroups);
Map<String, Throwable> expectedResults = new HashMap<>();
expectedResults.put(firstGroup, null);
expectedResults.put(secondGroup, null);
when(adminClient.deleteShareGroups(anyList(), any())).thenReturn(result);
try (ShareGroupService service = getShareGroupService(cgcArgs, adminClient)) {
assertEquals(expectedResults, service.deleteShareGroups());
}
}
@Test
public void testDeleteShareGroupsAllGroupsSuccess() {
String firstGroup = "first-group";
String secondGroup = "second-group";
String bootstrapServer = "localhost:9092";
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--delete", "--all-groups"};
Admin adminClient = mock(KafkaAdminClient.class);
DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class);
Map<String, KafkaFuture<Void>> deletedGroups = Map.of(
firstGroup, KafkaFuture.completedFuture(null),
secondGroup, KafkaFuture.completedFuture(null)
);
LinkedHashMap<String, GroupState> shareGroupMap = new LinkedHashMap<>();
shareGroupMap.put(firstGroup, GroupState.EMPTY);
shareGroupMap.put(secondGroup, GroupState.EMPTY);
mockListShareGroups(adminClient, shareGroupMap);
when(result.deletedGroups()).thenReturn(deletedGroups);
Map<String, Throwable> expectedResults = new HashMap<>();
expectedResults.put(firstGroup, null);
expectedResults.put(secondGroup, null);
when(adminClient.deleteShareGroups(anyList(), any())).thenReturn(result);
try (ShareGroupService service = getShareGroupService(cgcArgs, adminClient)) {
assertEquals(expectedResults, service.deleteShareGroups());
}
}
@Test
public void testDeleteShareGroupsAllGroupsPartialFail() {
String firstGroup = "first-group";
String secondGroup = "second-group";
String bootstrapServer = "localhost:9092";
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--delete", "--all-groups"};
Admin adminClient = mock(KafkaAdminClient.class);
DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class);
KafkaFutureImpl<Void> future1 = new KafkaFutureImpl<>();
KafkaFutureImpl<Void> future2 = new KafkaFutureImpl<>();
future1.complete(null);
Exception exp = new Exception("bad");
future2.completeExceptionally(exp);
Map<String, KafkaFuture<Void>> deletedGroups = Map.of(
firstGroup, future1,
secondGroup, future2
);
LinkedHashMap<String, GroupState> shareGroupMap = new LinkedHashMap<>();
shareGroupMap.put(firstGroup, GroupState.EMPTY);
shareGroupMap.put(secondGroup, GroupState.EMPTY);
mockListShareGroups(adminClient, shareGroupMap);
when(result.deletedGroups()).thenReturn(deletedGroups);
Map<String, Throwable> expectedResults = new HashMap<>();
expectedResults.put(firstGroup, null);
expectedResults.put(secondGroup, exp);
when(adminClient.deleteShareGroups(anyList(), any())).thenReturn(result);
try (ShareGroupService service = getShareGroupService(cgcArgs, adminClient)) {
assertEquals(expectedResults, service.deleteShareGroups());
}
}
@Test
public void testDeleteShareGroupsDeleteFailure() {
String firstGroup = "first-group";
String secondGroup = "second-group";
String bootstrapServer = "localhost:9092";
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--delete", "--group", firstGroup, "--group", secondGroup};
Admin adminClient = mock(KafkaAdminClient.class);
DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class);
LinkedHashMap<String, GroupState> shareGroupMap = new LinkedHashMap<>();
shareGroupMap.put(firstGroup, GroupState.EMPTY);
shareGroupMap.put(secondGroup, GroupState.EMPTY);
mockListShareGroups(adminClient, shareGroupMap);
KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
Exception exp = new Exception("bad");
future.completeExceptionally(exp);
Map<String, KafkaFuture<Void>> deletedGroups = Map.of(
firstGroup, future,
secondGroup, future
);
when(result.deletedGroups()).thenReturn(deletedGroups);
Map<String, Throwable> expectedResults = new HashMap<>();
expectedResults.put(firstGroup, exp);
expectedResults.put(secondGroup, exp);
when(adminClient.deleteShareGroups(anyList(), any())).thenReturn(result);
try (ShareGroupService service = getShareGroupService(cgcArgs, adminClient)) {
assertEquals(expectedResults, service.deleteShareGroups());
}
}
@Test
public void testDeleteShareGroupsFailureNonShareGroup() {
String firstGroup = "first-group";
String bootstrapServer = "localhost:9092";
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--delete", "--group", firstGroup};
Admin adminClient = mock(KafkaAdminClient.class);
DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class);
mockListShareGroups(adminClient, new LinkedHashMap<>());
when(result.deletedGroups()).thenReturn(Map.of());
when(adminClient.deleteShareGroups(anyList(), any())).thenReturn(result);
try (ShareGroupService service = getShareGroupService(cgcArgs, adminClient)) {
service.deleteShareGroups();
verify(result, times(0)).deletedGroups();
verify(adminClient, times(0)).deleteShareGroups(anyList());
}
}
@Test
public void testDeleteShareGroupsFailureNonEmptyGroup() {
String firstGroup = "first-group";
String bootstrapServer = "localhost:9092";
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--delete", "--group", firstGroup};
Admin adminClient = mock(KafkaAdminClient.class);
DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class);
LinkedHashMap<String, GroupState> shareGroupMap = new LinkedHashMap<>();
shareGroupMap.put(firstGroup, GroupState.STABLE);
mockListShareGroups(adminClient, shareGroupMap);
when(result.deletedGroups()).thenReturn(Map.of());
when(adminClient.deleteShareGroups(anyList(), any())).thenReturn(result);
try (ShareGroupService service = getShareGroupService(cgcArgs, adminClient)) {
service.deleteShareGroups();
verify(result, times(0)).deletedGroups();
verify(adminClient, times(0)).deleteShareGroups(anyList());
}
}
@Test
public void testDeleteShareGroupsPartialFailure() {
String firstGroup = "first-group";
String secondGroup = "second-group";
String bootstrapServer = "localhost:9092";
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--delete", "--group", firstGroup, "--group", secondGroup};
Admin adminClient = mock(KafkaAdminClient.class);
DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class);
LinkedHashMap<String, GroupState> shareGroupMap = new LinkedHashMap<>();
shareGroupMap.put(firstGroup, GroupState.EMPTY);
shareGroupMap.put(secondGroup, GroupState.EMPTY);
mockListShareGroups(adminClient, shareGroupMap);
KafkaFutureImpl<Void> future1 = new KafkaFutureImpl<>();
KafkaFutureImpl<Void> future2 = new KafkaFutureImpl<>();
future1.complete(null);
Exception exp = new Exception("bad");
future2.completeExceptionally(exp);
Map<String, KafkaFuture<Void>> deletedGroups = Map.of(
firstGroup, future1,
secondGroup, future2
);
when(result.deletedGroups()).thenReturn(deletedGroups);
when(adminClient.deleteShareGroups(anyList(), any())).thenReturn(result);
Map<String, Throwable> expectedResults = new HashMap<>();
expectedResults.put(firstGroup, null);
expectedResults.put(secondGroup, exp);
try (ShareGroupService service = getShareGroupService(cgcArgs, adminClient)) {
assertEquals(expectedResults, service.deleteShareGroups());
}
}
private void mockListShareGroups(Admin client, LinkedHashMap<String, GroupState> groupIds) {
ListGroupsResult listResult = mock(ListGroupsResult.class);
KafkaFutureImpl<Collection<GroupListing>> listFuture = new KafkaFutureImpl<>();
List<GroupListing> groupListings = new ArrayList<>();
groupIds.forEach((groupId, state) -> groupListings.add(
new GroupListing(groupId, Optional.of(GroupType.SHARE), "share", Optional.of(state))
));
listFuture.complete(groupListings);
when(listResult.all()).thenReturn(listFuture);
when(client.listGroups(any())).thenReturn(listResult);
}
ShareGroupService getShareGroupService(String[] args, Admin adminClient) {
ShareGroupCommandOptions opts = new ShareGroupCommandOptions(args);
opts.checkArgs();
return new ShareGroupService(opts, adminClient);
}