diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index b0248b43842..d7479b5ba1a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -1841,6 +1841,23 @@ public interface Admin extends AutoCloseable { return listShareGroupOffsets(groupSpecs, new ListShareGroupOffsetsOptions()); } + /** + * Delete share groups from the cluster with the default options. + * + * @return The DeleteShareGroupsResult. + */ + default DeleteShareGroupsResult deleteShareGroups(Collection groupIds) { + return deleteShareGroups(groupIds, new DeleteShareGroupsOptions()); + } + + /** + * Delete share groups from the cluster. + * + * @param options The options to use when deleting a share group. + * @return The DeleteShareGroupsResult. + */ + DeleteShareGroupsResult deleteShareGroups(Collection groupIds, DeleteShareGroupsOptions options); + /** * Describe some classic groups in the cluster. * diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java index 5a7ad396935..902dfd101c1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java @@ -14,42 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.kafka.clients.admin; import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.annotation.InterfaceStability; import java.util.Collection; -import java.util.HashMap; import java.util.Map; /** - * The result of the {@link Admin#deleteConsumerGroups(Collection)} call. - * - * The API of this class is evolving, see {@link Admin} for details. + * The result of the {@link Admin#deleteConsumerGroups(Collection , DeleteConsumerGroupsOptions)} call. */ -@InterfaceStability.Evolving -public class DeleteConsumerGroupsResult { - private final Map> futures; - - DeleteConsumerGroupsResult(final Map> futures) { - this.futures = futures; - } - - /** - * Return a map from group id to futures which can be used to check the status of - * individual deletions. - */ - public Map> deletedGroups() { - Map> deletedGroups = new HashMap<>(futures.size()); - deletedGroups.putAll(futures); - return deletedGroups; - } - - /** - * Return a future which succeeds only if all the consumer group deletions succeed. - */ - public KafkaFuture all() { - return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])); +public class DeleteConsumerGroupsResult extends DeleteGroupsResult { + public DeleteConsumerGroupsResult(final Map> futures) { + super(futures); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteGroupsResult.java new file mode 100644 index 00000000000..2be42025e04 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteGroupsResult.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * The parent class of result of the {@link Admin#deleteConsumerGroups(Collection)}, + * {@link Admin#deleteShareGroups(Collection)} calls. + *

+ * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public abstract class DeleteGroupsResult { + private final Map> futures; + + DeleteGroupsResult(final Map> futures) { + this.futures = futures; + } + + /** + * Return a map from group id to futures which can be used to check the status of + * individual deletions. + */ + public Map> deletedGroups() { + Map> deletedGroups = new HashMap<>(futures.size()); + deletedGroups.putAll(futures); + return deletedGroups; + } + + /** + * Return a future which succeeds only if all the group deletions succeed. + */ + public KafkaFuture all() { + return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsOptions.java new file mode 100644 index 00000000000..a41ec6d00b3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsOptions.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Collection; + +/** + * Options for the {@link Admin#deleteShareGroups(Collection , DeleteShareGroupsOptions)} call. + *

+ * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DeleteShareGroupsOptions extends AbstractOptions { +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsResult.java new file mode 100644 index 00000000000..8abd3e73263 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsResult.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; + +import java.util.Collection; +import java.util.Map; + +/** + * The result of the {@link Admin#deleteShareGroups(Collection , DeleteShareGroupsOptions)} call. + */ +public class DeleteShareGroupsResult extends DeleteGroupsResult { + DeleteShareGroupsResult(final Map> futures) { + super(futures); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java index b52688e7d3b..ee2ac0942a9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java @@ -308,6 +308,11 @@ public class ForwardingAdmin implements Admin { return delegate.listShareGroupOffsets(groupSpecs, options); } + @Override + public DeleteShareGroupsResult deleteShareGroups(Collection groupIds, DeleteShareGroupsOptions options) { + return delegate.deleteShareGroups(groupIds, options); + } + @Override public ListGroupsResult listGroups(ListGroupsOptions options) { return delegate.listGroups(options); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 7c8842d4061..3b31efec17a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -50,6 +50,7 @@ import org.apache.kafka.clients.admin.internals.CoordinatorKey; import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupOffsetsHandler; import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler; import org.apache.kafka.clients.admin.internals.DeleteRecordsHandler; +import org.apache.kafka.clients.admin.internals.DeleteShareGroupsHandler; import org.apache.kafka.clients.admin.internals.DescribeClassicGroupsHandler; import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler; import org.apache.kafka.clients.admin.internals.DescribeProducersHandler; @@ -3829,6 +3830,16 @@ public class KafkaAdminClient extends AdminClient { .collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue))); } + @Override + public DeleteShareGroupsResult deleteShareGroups(Collection groupIds, DeleteShareGroupsOptions options) { + SimpleAdminApiFuture future = + DeleteShareGroupsHandler.newFuture(groupIds); + DeleteShareGroupsHandler handler = new DeleteShareGroupsHandler(logContext); + invokeDriver(handler, future, options.timeoutMs); + return new DeleteShareGroupsResult(future.all().entrySet().stream() + .collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue))); + } + @Override public Map metrics() { return Collections.unmodifiableMap(this.metrics.metrics()); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java index 0d581243ddc..c3b248838f3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java @@ -16,37 +16,14 @@ */ package org.apache.kafka.clients.admin.internals; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.message.DeleteGroupsRequestData; -import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.AbstractResponse; -import org.apache.kafka.common.requests.DeleteGroupsRequest; -import org.apache.kafka.common.requests.DeleteGroupsResponse; -import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; import org.apache.kafka.common.utils.LogContext; -import org.slf4j.Logger; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - -public class DeleteConsumerGroupsHandler extends AdminApiHandler.Batched { - - private final Logger log; - private final AdminApiLookupStrategy lookupStrategy; +public class DeleteConsumerGroupsHandler extends DeleteGroupsHandler { public DeleteConsumerGroupsHandler( LogContext logContext ) { - this.log = logContext.logger(DeleteConsumerGroupsHandler.class); - this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext); + super(logContext, DeleteConsumerGroupsHandler.class); } @Override @@ -55,92 +32,7 @@ public class DeleteConsumerGroupsHandler extends AdminApiHandler.Batched lookupStrategy() { - return lookupStrategy; + public String displayName() { + return "DeleteConsumerGroups"; } - - public static AdminApiFuture.SimpleAdminApiFuture newFuture( - Collection groupIds - ) { - return AdminApiFuture.forKeys(buildKeySet(groupIds)); - } - - private static Set buildKeySet(Collection groupIds) { - return groupIds.stream() - .map(CoordinatorKey::byGroupId) - .collect(Collectors.toSet()); - } - - @Override - public DeleteGroupsRequest.Builder buildBatchedRequest( - int coordinatorId, - Set keys - ) { - List groupIds = keys.stream().map(key -> key.idValue).collect(Collectors.toList()); - DeleteGroupsRequestData data = new DeleteGroupsRequestData() - .setGroupsNames(groupIds); - return new DeleteGroupsRequest.Builder(data); - } - - @Override - public ApiResult handleResponse( - Node coordinator, - Set groupIds, - AbstractResponse abstractResponse - ) { - final DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse; - final Map completed = new HashMap<>(); - final Map failed = new HashMap<>(); - final Set groupsToUnmap = new HashSet<>(); - - for (DeletableGroupResult deletedGroup : response.data().results()) { - CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(deletedGroup.groupId()); - Errors error = Errors.forCode(deletedGroup.errorCode()); - if (error != Errors.NONE) { - handleError(groupIdKey, error, failed, groupsToUnmap); - continue; - } - - completed.put(groupIdKey, null); - } - - return new ApiResult<>(completed, failed, new ArrayList<>(groupsToUnmap)); - } - - private void handleError( - CoordinatorKey groupId, - Errors error, - Map failed, - Set groupsToUnmap - ) { - switch (error) { - case GROUP_AUTHORIZATION_FAILED: - case INVALID_GROUP_ID: - case NON_EMPTY_GROUP: - case GROUP_ID_NOT_FOUND: - log.debug("`DeleteConsumerGroups` request for group id {} failed due to error {}", groupId.idValue, error); - failed.put(groupId, error.exception()); - break; - - case COORDINATOR_LOAD_IN_PROGRESS: - // If the coordinator is in the middle of loading, then we just need to retry - log.debug("`DeleteConsumerGroups` request for group id {} failed because the coordinator " + - "is still in the process of loading state. Will retry", groupId.idValue); - break; - - case COORDINATOR_NOT_AVAILABLE: - case NOT_COORDINATOR: - // If the coordinator is unavailable or there was a coordinator change, then we unmap - // the key so that we retry the `FindCoordinator` request - log.debug("`DeleteConsumerGroups` request for group id {} returned error {}. " + - "Will attempt to find the coordinator again and retry", groupId.idValue, error); - groupsToUnmap.add(groupId); - break; - - default: - log.error("`DeleteConsumerGroups` request for group id {} failed due to unexpected error {}", groupId.idValue, error); - failed.put(groupId, error.exception()); - } - } - } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteGroupsHandler.java new file mode 100644 index 00000000000..bf8b7a2210b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteGroupsHandler.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.message.DeleteGroupsRequestData; +import org.apache.kafka.common.message.DeleteGroupsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.DeleteGroupsRequest; +import org.apache.kafka.common.requests.DeleteGroupsResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.utils.LogContext; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public abstract class DeleteGroupsHandler extends AdminApiHandler.Batched { + private final Logger log; + private final AdminApiLookupStrategy lookupStrategy; + + public DeleteGroupsHandler( + LogContext logContext, + Class loggerClass + ) { + this.log = logContext.logger(loggerClass); + this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.GROUP, logContext); + } + + public abstract String apiName(); + + public abstract String displayName(); + + @Override + public AdminApiLookupStrategy lookupStrategy() { + return lookupStrategy; + } + + public static AdminApiFuture.SimpleAdminApiFuture newFuture( + Collection groupIds + ) { + return AdminApiFuture.forKeys(buildKeySet(groupIds)); + } + + private static Set buildKeySet(Collection groupIds) { + return groupIds.stream() + .map(CoordinatorKey::byGroupId) + .collect(Collectors.toSet()); + } + + @Override + public DeleteGroupsRequest.Builder buildBatchedRequest( + int coordinatorId, + Set keys + ) { + List groupIds = keys.stream().map(key -> key.idValue).collect(Collectors.toList()); + DeleteGroupsRequestData data = new DeleteGroupsRequestData() + .setGroupsNames(groupIds); + return new DeleteGroupsRequest.Builder(data); + } + + @Override + public ApiResult handleResponse( + Node coordinator, + Set groupIds, + AbstractResponse abstractResponse + ) { + final DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse; + final Map completed = new HashMap<>(); + final Map failed = new HashMap<>(); + final Set groupsToUnmap = new HashSet<>(); + + for (DeleteGroupsResponseData.DeletableGroupResult deletedGroup : response.data().results()) { + CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(deletedGroup.groupId()); + Errors error = Errors.forCode(deletedGroup.errorCode()); + if (error != Errors.NONE) { + handleError(groupIdKey, error, failed, groupsToUnmap); + continue; + } + + completed.put(groupIdKey, null); + } + + return new ApiResult<>(completed, failed, new ArrayList<>(groupsToUnmap)); + } + + private void handleError( + CoordinatorKey groupId, + Errors error, + Map failed, + Set groupsToUnmap + ) { + switch (error) { + case GROUP_AUTHORIZATION_FAILED: + case INVALID_GROUP_ID: + case NON_EMPTY_GROUP: + case GROUP_ID_NOT_FOUND: + log.debug("`{}` request for group id {} failed due to error {}", displayName(), groupId.idValue, error); + failed.put(groupId, error.exception()); + break; + + case COORDINATOR_LOAD_IN_PROGRESS: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`{}` request for group id {} failed because the coordinator " + + "is still in the process of loading state. Will retry", displayName(), groupId.idValue); + break; + + case COORDINATOR_NOT_AVAILABLE: + case NOT_COORDINATOR: + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`{}` request for group id {} returned error {}. " + + "Will attempt to find the coordinator again and retry", displayName(), groupId.idValue, error); + groupsToUnmap.add(groupId); + break; + + default: + log.error("`{}` request for group id {} failed due to unexpected error {}", displayName(), groupId.idValue, error); + failed.put(groupId, error.exception()); + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupsHandler.java new file mode 100644 index 00000000000..9b43509d415 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupsHandler.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.common.utils.LogContext; + +public class DeleteShareGroupsHandler extends DeleteGroupsHandler { + public DeleteShareGroupsHandler( + LogContext logContext + ) { + super(logContext, DeleteShareGroupsHandler.class); + } + + @Override + public String apiName() { + return "deleteShareGroups"; + } + + @Override + public String displayName() { + return "DeleteShareGroups"; + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index e91a2f92204..5c56665eb7d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -1399,6 +1399,11 @@ public class MockAdminClient extends AdminClient { throw new UnsupportedOperationException("Not implemented yet"); } + @Override + public synchronized DeleteShareGroupsResult deleteShareGroups(Collection groupIds, DeleteShareGroupsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + @Override public synchronized DescribeClassicGroupsResult describeClassicGroups(Collection groupIds, DescribeClassicGroupsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java index 8c4d9eb0eff..773708aa2f6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java @@ -14,117 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.kafka.clients.admin.internals; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.errors.GroupAuthorizationException; -import org.apache.kafka.common.errors.GroupIdNotFoundException; -import org.apache.kafka.common.errors.GroupNotEmptyException; -import org.apache.kafka.common.errors.InvalidGroupIdException; -import org.apache.kafka.common.message.DeleteGroupsResponseData; -import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult; -import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.DeleteGroupsRequest; -import org.apache.kafka.common.requests.DeleteGroupsResponse; import org.apache.kafka.common.utils.LogContext; -import org.junit.jupiter.api.Test; - -import static java.util.Collections.emptyList; -import static java.util.Collections.emptySet; -import static java.util.Collections.singleton; -import static java.util.Collections.singletonList; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertInstanceOf; - -public class DeleteConsumerGroupsHandlerTest { - - private final LogContext logContext = new LogContext(); - private final String groupId1 = "group-id1"; - - @Test - public void testBuildRequest() { - DeleteConsumerGroupsHandler handler = new DeleteConsumerGroupsHandler(logContext); - DeleteGroupsRequest request = handler.buildBatchedRequest(1, singleton(CoordinatorKey.byGroupId(groupId1))).build(); - assertEquals(1, request.data().groupsNames().size()); - assertEquals(groupId1, request.data().groupsNames().get(0)); - } - - @Test - public void testSuccessfulHandleResponse() { - assertCompleted(handleWithError(Errors.NONE)); - } - - @Test - public void testUnmappedHandleResponse() { - assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); - assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); - } - - @Test - public void testRetriableHandleResponse() { - assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); - } - - @Test - public void testFailedHandleResponse() { - assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED)); - assertFailed(GroupIdNotFoundException.class, handleWithError(Errors.GROUP_ID_NOT_FOUND)); - assertFailed(InvalidGroupIdException.class, handleWithError(Errors.INVALID_GROUP_ID)); - assertFailed(GroupNotEmptyException.class, handleWithError(Errors.NON_EMPTY_GROUP)); - } - - private DeleteGroupsResponse buildResponse(Errors error) { - return new DeleteGroupsResponse( - new DeleteGroupsResponseData() - .setResults(new DeletableGroupResultCollection(singletonList( - new DeletableGroupResult() - .setErrorCode(error.code()) - .setGroupId(groupId1)).iterator()))); - } - - private AdminApiHandler.ApiResult handleWithError( - Errors error - ) { - DeleteConsumerGroupsHandler handler = new DeleteConsumerGroupsHandler(logContext); - DeleteGroupsResponse response = buildResponse(error); - return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId1)), response); - } - - private void assertUnmapped( - AdminApiHandler.ApiResult result - ) { - assertEquals(emptySet(), result.completedKeys.keySet()); - assertEquals(emptySet(), result.failedKeys.keySet()); - assertEquals(singletonList(CoordinatorKey.byGroupId(groupId1)), result.unmappedKeys); - } - - private void assertRetriable( - AdminApiHandler.ApiResult result - ) { - assertEquals(emptySet(), result.completedKeys.keySet()); - assertEquals(emptySet(), result.failedKeys.keySet()); - assertEquals(emptyList(), result.unmappedKeys); - } - - private void assertCompleted( - AdminApiHandler.ApiResult result - ) { - CoordinatorKey key = CoordinatorKey.byGroupId(groupId1); - assertEquals(emptySet(), result.failedKeys.keySet()); - assertEquals(emptyList(), result.unmappedKeys); - assertEquals(singleton(key), result.completedKeys.keySet()); - } - - private void assertFailed( - Class expectedExceptionType, - AdminApiHandler.ApiResult result - ) { - CoordinatorKey key = CoordinatorKey.byGroupId(groupId1); - assertEquals(emptySet(), result.completedKeys.keySet()); - assertEquals(emptyList(), result.unmappedKeys); - assertEquals(singleton(key), result.failedKeys.keySet()); - assertInstanceOf(expectedExceptionType, result.failedKeys.get(key)); +public class DeleteConsumerGroupsHandlerTest extends DeleteGroupsHandlerTest { + protected DeleteGroupsHandler handler() { + return new DeleteConsumerGroupsHandler(new LogContext()); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteGroupsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteGroupsHandlerTest.java new file mode 100644 index 00000000000..6b65de8d716 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteGroupsHandlerTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.message.DeleteGroupsResponseData; +import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult; +import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.DeleteGroupsRequest; +import org.apache.kafka.common.requests.DeleteGroupsResponse; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; +import static java.util.Collections.singletonList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; + +public abstract class DeleteGroupsHandlerTest { + + private final String groupId1 = "group-id1"; + + private DeleteGroupsHandler handler; + + @BeforeEach + public void setup() { + handler = handler(); + } + + protected abstract DeleteGroupsHandler handler(); + + @Test + public void testBuildRequest() { + DeleteGroupsRequest request = handler.buildBatchedRequest(1, singleton(CoordinatorKey.byGroupId(groupId1))).build(); + assertEquals(1, request.data().groupsNames().size()); + assertEquals(groupId1, request.data().groupsNames().get(0)); + } + + @Test + public void testSuccessfulHandleResponse() { + assertCompleted(handleWithError(Errors.NONE)); + } + + @Test + public void testUnmappedHandleResponse() { + assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); + assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); + } + + @Test + public void testRetriableHandleResponse() { + assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); + } + + @Test + public void testFailedHandleResponse() { + assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED)); + assertFailed(GroupIdNotFoundException.class, handleWithError(Errors.GROUP_ID_NOT_FOUND)); + assertFailed(InvalidGroupIdException.class, handleWithError(Errors.INVALID_GROUP_ID)); + assertFailed(GroupNotEmptyException.class, handleWithError(Errors.NON_EMPTY_GROUP)); + } + + private DeleteGroupsResponse buildResponse(Errors error) { + return new DeleteGroupsResponse( + new DeleteGroupsResponseData() + .setResults(new DeletableGroupResultCollection(singletonList( + new DeletableGroupResult() + .setErrorCode(error.code()) + .setGroupId(groupId1)).iterator()))); + } + + private AdminApiHandler.ApiResult handleWithError( + Errors error + ) { + DeleteGroupsResponse response = buildResponse(error); + return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId1)), response); + } + + private void assertUnmapped( + AdminApiHandler.ApiResult result + ) { + assertEquals(emptySet(), result.completedKeys.keySet()); + assertEquals(emptySet(), result.failedKeys.keySet()); + assertEquals(singletonList(CoordinatorKey.byGroupId(groupId1)), result.unmappedKeys); + } + + private void assertRetriable( + AdminApiHandler.ApiResult result + ) { + assertEquals(emptySet(), result.completedKeys.keySet()); + assertEquals(emptySet(), result.failedKeys.keySet()); + assertEquals(emptyList(), result.unmappedKeys); + } + + private void assertCompleted( + AdminApiHandler.ApiResult result + ) { + CoordinatorKey key = CoordinatorKey.byGroupId(groupId1); + assertEquals(emptySet(), result.failedKeys.keySet()); + assertEquals(emptyList(), result.unmappedKeys); + assertEquals(singleton(key), result.completedKeys.keySet()); + } + + private void assertFailed( + Class expectedExceptionType, + AdminApiHandler.ApiResult result + ) { + CoordinatorKey key = CoordinatorKey.byGroupId(groupId1); + assertEquals(emptySet(), result.completedKeys.keySet()); + assertEquals(emptyList(), result.unmappedKeys); + assertEquals(singleton(key), result.failedKeys.keySet()); + assertInstanceOf(expectedExceptionType, result.failedKeys.get(key)); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupsHandlerTest.java new file mode 100644 index 00000000000..26f9d97f77e --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupsHandlerTest.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.common.utils.LogContext; + +public class DeleteShareGroupsHandlerTest extends DeleteGroupsHandlerTest { + protected DeleteGroupsHandler handler() { + return new DeleteShareGroupsHandler(new LogContext()); + } +} diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java index 888d690e1f6..c996e148b2b 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java @@ -55,6 +55,8 @@ import org.apache.kafka.clients.admin.DeleteConsumerGroupsOptions; import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult; import org.apache.kafka.clients.admin.DeleteRecordsOptions; import org.apache.kafka.clients.admin.DeleteRecordsResult; +import org.apache.kafka.clients.admin.DeleteShareGroupsOptions; +import org.apache.kafka.clients.admin.DeleteShareGroupsResult; import org.apache.kafka.clients.admin.DeleteTopicsOptions; import org.apache.kafka.clients.admin.DeleteTopicsResult; import org.apache.kafka.clients.admin.DescribeAclsOptions; @@ -428,6 +430,11 @@ public class TestingMetricsInterceptingAdminClient extends AdminClient { return adminDelegate.listShareGroupOffsets(groupSpecs, options); } + @Override + public DeleteShareGroupsResult deleteShareGroups(final Collection groupIds, final DeleteShareGroupsOptions options) { + return adminDelegate.deleteShareGroups(groupIds, options); + } + @Override public DescribeClassicGroupsResult describeClassicGroups(final Collection groupIds, final DescribeClassicGroupsOptions options) { return adminDelegate.describeClassicGroups(groupIds, options); diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java index e5b01a12d9e..246a5eecbf1 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java @@ -17,7 +17,9 @@ package org.apache.kafka.tools.consumer.group; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AbstractOptions; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.DeleteShareGroupsOptions; import org.apache.kafka.clients.admin.DescribeShareGroupsOptions; import org.apache.kafka.clients.admin.GroupListing; import org.apache.kafka.clients.admin.ListGroupsOptions; @@ -30,6 +32,7 @@ import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.util.CommandLineUtils; @@ -40,6 +43,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -81,7 +85,7 @@ public class ShareGroupCommand { } else if (opts.options.has(opts.describeOpt)) { shareGroupService.describeGroups(); } else if (opts.options.has(opts.deleteOpt)) { - throw new UnsupportedOperationException("--delete option is not yet implemented"); + shareGroupService.deleteShareGroups(); } else if (opts.options.has(opts.resetOffsetsOpt)) { throw new UnsupportedOperationException("--reset-offsets option is not yet implemented"); } else if (opts.options.has(opts.deleteOffsetsOpt)) { @@ -154,6 +158,18 @@ public class ShareGroupCommand { } } + List listDetailedShareGroups() { + try { + ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions() + .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue()) + .withTypes(Set.of(GroupType.SHARE))); + Collection listings = result.all().get(); + return listings.stream().toList(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + List listShareGroupsInStates(Set states) throws ExecutionException, InterruptedException { ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions() .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue()) @@ -208,6 +224,72 @@ public class ShareGroupCommand { } } + Map deleteShareGroups() { + List shareGroupIds = listDetailedShareGroups(); + List groupIds = opts.options.has(opts.allGroupsOpt) + ? shareGroupIds.stream().map(GroupListing::groupId).toList() + : opts.options.valuesOf(opts.groupOpt); + + // Pre admin call checks + LinkedHashSet groupIdSet = new LinkedHashSet<>(groupIds); + Map errGroups = new HashMap<>(); + for (String groupId : groupIdSet) { + Optional listing = shareGroupIds.stream().filter(item -> item.groupId().equals(groupId)).findAny(); + if (listing.isEmpty()) { + errGroups.put(groupId, new IllegalArgumentException("Group '" + groupId + "' is not a share group.")); + } else { + Optional groupState = listing.get().groupState(); + groupState.ifPresent(state -> { + if (state == GroupState.DEAD) { + errGroups.put(groupId, new IllegalStateException("Share group '" + groupId + "' group state is DEAD.")); + } else if (state != GroupState.EMPTY) { + errGroups.put(groupId, new GroupNotEmptyException("Share group '" + groupId + "' is not EMPTY.")); + } + }); + } + } + + groupIdSet.removeAll(errGroups.keySet()); + + Map> groupsToDelete = groupIdSet.isEmpty() ? Map.of() : adminClient.deleteShareGroups( + groupIdSet.stream().toList(), + withTimeoutMs(new DeleteShareGroupsOptions()) + ).deletedGroups(); + + Map success = new HashMap<>(); + Map failed = new HashMap<>(errGroups); + + groupsToDelete.forEach((g, f) -> { + try { + f.get(); + success.put(g, null); + } catch (InterruptedException ie) { + failed.put(g, ie); + } catch (ExecutionException e) { + failed.put(g, e.getCause()); + } + }); + + if (failed.isEmpty()) + System.out.println("Deletion of requested share groups (" + success.keySet().stream().map(group -> "'" + group + "'").collect(Collectors.joining(", ")) + ") was successful."); + else { + printError("Deletion of some share groups failed:", Optional.empty()); + failed.forEach((group, error) -> System.out.println("* Share group '" + group + "' could not be deleted due to: " + error)); + + if (!success.isEmpty()) + System.out.println("\nThese share groups were deleted successfully: " + success.keySet().stream().map(group -> "'" + group + "'").collect(Collectors.joining(","))); + } + + failed.putAll(success); + + return failed; + } + + private > T withTimeoutMs(T options) { + int t = opts.options.valueOf(opts.timeoutMsOpt).intValue(); + return options.timeoutMs(t); + } + Map describeShareGroups(Collection groupIds) throws ExecutionException, InterruptedException { Map res = new HashMap<>(); Map> stringKafkaFutureMap = adminClient.describeShareGroups( diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java index 0c1a2a285ec..3ba0a707ee5 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java @@ -173,9 +173,12 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions { } if (options.has(deleteOpt)) { - if (!options.has(groupOpt)) + if (!options.has(groupOpt) && !options.has(allGroupsOpt)) CommandLineUtils.printUsageAndExit(parser, - "Option " + deleteOpt + " takes the option: " + groupOpt); + String.format("Option %s takes the options %s or %s", deleteOpt, groupOpt, allGroupsOpt)); + if (options.has(allGroupsOpt) && options.has(groupOpt)) + CommandLineUtils.printUsageAndExit(parser, + String.format("Option %s takes either %s or %s, not both.", deleteOpt, groupOpt, allGroupsOpt)); if (options.has(topicOpt)) CommandLineUtils.printUsageAndExit(parser, "Option " + deleteOpt + " does not take the option: " + topicOpt); diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java index db58847d34a..46103e10f43 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java @@ -16,8 +16,10 @@ */ package org.apache.kafka.tools.consumer.group; + import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientTestUtils; +import org.apache.kafka.clients.admin.DeleteShareGroupsResult; import org.apache.kafka.clients.admin.DescribeShareGroupsOptions; import org.apache.kafka.clients.admin.DescribeShareGroupsResult; import org.apache.kafka.clients.admin.GroupListing; @@ -36,11 +38,14 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; import org.apache.kafka.tools.consumer.group.ShareGroupCommand.ShareGroupService; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatchers; @@ -48,7 +53,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -56,6 +63,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; import joptsimple.OptionException; @@ -63,11 +71,15 @@ import joptsimple.OptionException; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ShareGroupCommandTest { @@ -76,6 +88,18 @@ public class ShareGroupCommandTest { private static final List> DESCRIBE_TYPE_STATE = List.of(List.of("--state"), List.of("--state", "--verbose")); private static final List> DESCRIBE_TYPES = Stream.of(DESCRIBE_TYPE_OFFSETS, DESCRIBE_TYPE_MEMBERS, DESCRIBE_TYPE_STATE).flatMap(Collection::stream).toList(); + @BeforeEach + public void setup() { + // nothing by default + Exit.setExitProcedure(((statusCode, message) -> { + })); + } + + @AfterEach + public void teardown() { + Exit.resetExitProcedure(); + } + @Test public void testListShareGroups() throws Exception { String firstGroup = "first-group"; @@ -588,8 +612,260 @@ public class ShareGroupCommandTest { assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.groupStatesFromString(" , ,")); } + @Test + public void testDeleteShareGroupsArgs() { + String bootstrapServer = "localhost:9092"; + Admin adminClient = mock(KafkaAdminClient.class); + + mockListShareGroups(adminClient, new LinkedHashMap<>()); + + // no group spec args + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--delete"}; + AtomicBoolean exited = new AtomicBoolean(false); + Exit.setExitProcedure(((statusCode, message) -> { + assertNotEquals(0, statusCode); + assertTrue(message.contains("Option [delete] takes the options [group] or [all-groups]")); + exited.set(true); + })); + try { + getShareGroupService(cgcArgs, adminClient); + } finally { + assertTrue(exited.get()); + } + } + + @Test + public void testDeleteShareGroupsSuccess() { + String firstGroup = "first-group"; + String secondGroup = "second-group"; + String bootstrapServer = "localhost:9092"; + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--delete", "--group", firstGroup, "--group", secondGroup}; + Admin adminClient = mock(KafkaAdminClient.class); + DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class); + Map> deletedGroups = Map.of( + firstGroup, KafkaFuture.completedFuture(null), + secondGroup, KafkaFuture.completedFuture(null) + ); + + LinkedHashMap shareGroupMap = new LinkedHashMap<>(); + shareGroupMap.put(firstGroup, GroupState.EMPTY); + shareGroupMap.put(secondGroup, GroupState.EMPTY); + mockListShareGroups(adminClient, shareGroupMap); + + when(result.deletedGroups()).thenReturn(deletedGroups); + + Map expectedResults = new HashMap<>(); + expectedResults.put(firstGroup, null); + expectedResults.put(secondGroup, null); + + when(adminClient.deleteShareGroups(anyList(), any())).thenReturn(result); + + try (ShareGroupService service = getShareGroupService(cgcArgs, adminClient)) { + assertEquals(expectedResults, service.deleteShareGroups()); + } + } + + @Test + public void testDeleteShareGroupsAllGroupsSuccess() { + String firstGroup = "first-group"; + String secondGroup = "second-group"; + String bootstrapServer = "localhost:9092"; + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--delete", "--all-groups"}; + Admin adminClient = mock(KafkaAdminClient.class); + DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class); + Map> deletedGroups = Map.of( + firstGroup, KafkaFuture.completedFuture(null), + secondGroup, KafkaFuture.completedFuture(null) + ); + + LinkedHashMap shareGroupMap = new LinkedHashMap<>(); + shareGroupMap.put(firstGroup, GroupState.EMPTY); + shareGroupMap.put(secondGroup, GroupState.EMPTY); + mockListShareGroups(adminClient, shareGroupMap); + + when(result.deletedGroups()).thenReturn(deletedGroups); + + Map expectedResults = new HashMap<>(); + expectedResults.put(firstGroup, null); + expectedResults.put(secondGroup, null); + + when(adminClient.deleteShareGroups(anyList(), any())).thenReturn(result); + + try (ShareGroupService service = getShareGroupService(cgcArgs, adminClient)) { + assertEquals(expectedResults, service.deleteShareGroups()); + } + } + + @Test + public void testDeleteShareGroupsAllGroupsPartialFail() { + String firstGroup = "first-group"; + String secondGroup = "second-group"; + String bootstrapServer = "localhost:9092"; + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--delete", "--all-groups"}; + Admin adminClient = mock(KafkaAdminClient.class); + DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class); + KafkaFutureImpl future1 = new KafkaFutureImpl<>(); + KafkaFutureImpl future2 = new KafkaFutureImpl<>(); + future1.complete(null); + Exception exp = new Exception("bad"); + future2.completeExceptionally(exp); + Map> deletedGroups = Map.of( + firstGroup, future1, + secondGroup, future2 + ); + + LinkedHashMap shareGroupMap = new LinkedHashMap<>(); + shareGroupMap.put(firstGroup, GroupState.EMPTY); + shareGroupMap.put(secondGroup, GroupState.EMPTY); + mockListShareGroups(adminClient, shareGroupMap); + + when(result.deletedGroups()).thenReturn(deletedGroups); + + Map expectedResults = new HashMap<>(); + expectedResults.put(firstGroup, null); + expectedResults.put(secondGroup, exp); + + when(adminClient.deleteShareGroups(anyList(), any())).thenReturn(result); + + try (ShareGroupService service = getShareGroupService(cgcArgs, adminClient)) { + assertEquals(expectedResults, service.deleteShareGroups()); + } + } + + @Test + public void testDeleteShareGroupsDeleteFailure() { + String firstGroup = "first-group"; + String secondGroup = "second-group"; + String bootstrapServer = "localhost:9092"; + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--delete", "--group", firstGroup, "--group", secondGroup}; + Admin adminClient = mock(KafkaAdminClient.class); + DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class); + + LinkedHashMap shareGroupMap = new LinkedHashMap<>(); + shareGroupMap.put(firstGroup, GroupState.EMPTY); + shareGroupMap.put(secondGroup, GroupState.EMPTY); + mockListShareGroups(adminClient, shareGroupMap); + + KafkaFutureImpl future = new KafkaFutureImpl<>(); + Exception exp = new Exception("bad"); + future.completeExceptionally(exp); + Map> deletedGroups = Map.of( + firstGroup, future, + secondGroup, future + ); + + when(result.deletedGroups()).thenReturn(deletedGroups); + + Map expectedResults = new HashMap<>(); + expectedResults.put(firstGroup, exp); + expectedResults.put(secondGroup, exp); + + when(adminClient.deleteShareGroups(anyList(), any())).thenReturn(result); + + try (ShareGroupService service = getShareGroupService(cgcArgs, adminClient)) { + assertEquals(expectedResults, service.deleteShareGroups()); + } + } + + @Test + public void testDeleteShareGroupsFailureNonShareGroup() { + String firstGroup = "first-group"; + String bootstrapServer = "localhost:9092"; + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--delete", "--group", firstGroup}; + Admin adminClient = mock(KafkaAdminClient.class); + DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class); + mockListShareGroups(adminClient, new LinkedHashMap<>()); + + when(result.deletedGroups()).thenReturn(Map.of()); + + when(adminClient.deleteShareGroups(anyList(), any())).thenReturn(result); + + try (ShareGroupService service = getShareGroupService(cgcArgs, adminClient)) { + service.deleteShareGroups(); + verify(result, times(0)).deletedGroups(); + verify(adminClient, times(0)).deleteShareGroups(anyList()); + } + } + + @Test + public void testDeleteShareGroupsFailureNonEmptyGroup() { + String firstGroup = "first-group"; + String bootstrapServer = "localhost:9092"; + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--delete", "--group", firstGroup}; + Admin adminClient = mock(KafkaAdminClient.class); + DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class); + + LinkedHashMap shareGroupMap = new LinkedHashMap<>(); + shareGroupMap.put(firstGroup, GroupState.STABLE); + mockListShareGroups(adminClient, shareGroupMap); + + when(result.deletedGroups()).thenReturn(Map.of()); + + when(adminClient.deleteShareGroups(anyList(), any())).thenReturn(result); + + try (ShareGroupService service = getShareGroupService(cgcArgs, adminClient)) { + service.deleteShareGroups(); + verify(result, times(0)).deletedGroups(); + verify(adminClient, times(0)).deleteShareGroups(anyList()); + } + } + + @Test + public void testDeleteShareGroupsPartialFailure() { + String firstGroup = "first-group"; + String secondGroup = "second-group"; + String bootstrapServer = "localhost:9092"; + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--delete", "--group", firstGroup, "--group", secondGroup}; + Admin adminClient = mock(KafkaAdminClient.class); + DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class); + LinkedHashMap shareGroupMap = new LinkedHashMap<>(); + shareGroupMap.put(firstGroup, GroupState.EMPTY); + shareGroupMap.put(secondGroup, GroupState.EMPTY); + mockListShareGroups(adminClient, shareGroupMap); + KafkaFutureImpl future1 = new KafkaFutureImpl<>(); + KafkaFutureImpl future2 = new KafkaFutureImpl<>(); + future1.complete(null); + Exception exp = new Exception("bad"); + future2.completeExceptionally(exp); + Map> deletedGroups = Map.of( + firstGroup, future1, + secondGroup, future2 + ); + + when(result.deletedGroups()).thenReturn(deletedGroups); + + when(adminClient.deleteShareGroups(anyList(), any())).thenReturn(result); + Map expectedResults = new HashMap<>(); + expectedResults.put(firstGroup, null); + expectedResults.put(secondGroup, exp); + + try (ShareGroupService service = getShareGroupService(cgcArgs, adminClient)) { + assertEquals(expectedResults, service.deleteShareGroups()); + } + } + + private void mockListShareGroups(Admin client, LinkedHashMap groupIds) { + ListGroupsResult listResult = mock(ListGroupsResult.class); + KafkaFutureImpl> listFuture = new KafkaFutureImpl<>(); + List groupListings = new ArrayList<>(); + groupIds.forEach((groupId, state) -> groupListings.add( + new GroupListing(groupId, Optional.of(GroupType.SHARE), "share", Optional.of(state)) + )); + listFuture.complete(groupListings); + when(listResult.all()).thenReturn(listFuture); + when(client.listGroups(any())).thenReturn(listResult); + } + ShareGroupService getShareGroupService(String[] args, Admin adminClient) { ShareGroupCommandOptions opts = new ShareGroupCommandOptions(args); + opts.checkArgs(); return new ShareGroupService(opts, adminClient); }