diff --git a/checkstyle/import-control-group-coordinator.xml b/checkstyle/import-control-group-coordinator.xml
index 99ffeb8e70e..c8a0f49d593 100644
--- a/checkstyle/import-control-group-coordinator.xml
+++ b/checkstyle/import-control-group-coordinator.xml
@@ -73,6 +73,7 @@
+
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index a5c99e3d571..286eef4af02 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -80,8 +80,13 @@ import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
+import org.apache.kafka.server.share.persister.DeleteShareGroupStateResult;
+import org.apache.kafka.server.share.persister.PartitionErrorData;
+import org.apache.kafka.server.share.persister.PartitionFactory;
import org.apache.kafka.server.share.persister.Persister;
import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryParameters;
+import org.apache.kafka.server.share.persister.TopicData;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.Timer;
@@ -91,11 +96,13 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
@@ -109,6 +116,7 @@ import static org.apache.kafka.coordinator.common.runtime.CoordinatorOperationEx
/**
* The group coordinator service.
*/
+@SuppressWarnings({"ClassDataAbstractionCoupling"})
public class GroupCoordinatorService implements GroupCoordinator {
public static class Builder {
@@ -823,20 +831,27 @@ public class GroupCoordinatorService implements GroupCoordinator {
});
groupsByTopicPartition.forEach((topicPartition, groupList) -> {
- CompletableFuture future =
- runtime.scheduleWriteOperation(
- "delete-groups",
- topicPartition,
- Duration.ofMillis(config.offsetCommitTimeoutMs()),
- coordinator -> coordinator.deleteGroups(context, groupList)
- ).exceptionally(exception -> handleOperationException(
- "delete-groups",
- groupList,
- exception,
- (error, __) -> DeleteGroupsRequest.getErrorResultCollection(groupList, error),
- log
- ));
+ CompletableFuture future = deleteShareGroups(topicPartition, groupList).thenCompose(groupErrMap -> {
+ DeleteGroupsResponseData.DeletableGroupResultCollection collection = new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ List retainedGroupIds = deleteCandidateGroupIds(groupErrMap, groupList, collection);
+ if (retainedGroupIds.isEmpty()) {
+ return CompletableFuture.completedFuture(collection);
+ }
+ return handleDeleteGroups(context, topicPartition, retainedGroupIds)
+ .whenComplete((resp, __) -> resp.forEach(result -> collection.add(result.duplicate())))
+ .thenApply(__ -> collection);
+ });
+ // deleteShareGroups has its own exceptionally block, so we don't need one here.
+
+ // This future object has the following stages:
+ // - First it invokes the share group delete flow where the shard sharePartitionDeleteRequests
+ // method is invoked, and it returns request objects for each valid share group passed to it.
+ // - Then the requests are passed to the persister.deleteState method one at a time. The results
+ // are collated as a Map of groupId -> persister errors
+ // - The above map is then used to decide whether to invoke the group coordinator delete groups logic
+ // - Share groups with failed persister delete are NOT CONSIDERED for group coordinator delete.
+ // TLDR: DeleteShareGroups -> filter erroneous persister deletes -> general delete groups logic
futures.add(future);
});
@@ -846,6 +861,152 @@ public class GroupCoordinatorService implements GroupCoordinator {
(accumulator, newResults) -> newResults.forEach(result -> accumulator.add(result.duplicate())));
}
+ private List deleteCandidateGroupIds(
+ Map groupErrMap,
+ List groupList,
+ DeleteGroupsResponseData.DeletableGroupResultCollection collection
+ ) {
+ List errGroupIds = new ArrayList<>();
+ groupErrMap.forEach((groupId, error) -> {
+ if (error.code() != Errors.NONE.code()) {
+ log.error("Error deleting share group {} due to error {}", groupId, error);
+ errGroupIds.add(groupId);
+ collection.add(
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(groupId)
+ .setErrorCode(error.code())
+ );
+ }
+ });
+
+ Set groupSet = new HashSet<>(groupList);
+ // Remove all share group ids which have errored out
+ // when deleting with persister.
+ groupSet.removeAll(errGroupIds);
+
+ // Let us invoke the standard procedure of any non-share
+ // groups or successfully deleted share groups remaining.
+ return groupSet.stream().toList();
+ }
+
+ private CompletableFuture handleDeleteGroups(
+ RequestContext context,
+ TopicPartition topicPartition,
+ List groupIds
+ ) {
+ return runtime.scheduleWriteOperation(
+ "delete-groups",
+ topicPartition,
+ Duration.ofMillis(config.offsetCommitTimeoutMs()),
+ coordinator -> coordinator.deleteGroups(context, groupIds)
+ ).exceptionally(exception -> handleOperationException(
+ "delete-groups",
+ groupIds,
+ exception,
+ (error, __) -> DeleteGroupsRequest.getErrorResultCollection(groupIds, error),
+ log
+ ));
+ }
+
+ private CompletableFuture