From a6dfde7ce64ee00d257bb6ff20ce923dacdc0edb Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Fri, 11 Apr 2025 00:45:13 +0530 Subject: [PATCH] KAFKA-18629: Utilize share group partition metadata for delete group. (#19363) * Currently, the delete share group code flow uses `group.subscribedTopicNames()` to fetch information about all the share partitions to which a share group is subscribed to. * However, this is incorrect since once the group is EMPTY, a precondition for delete, the aforementioned method will return an empty list. * In this PR we have leveraged the `ShareGroupStatePartitionMetadata` record to grab the `initialized` and `initializing` partitions to build the delete candidates, which remedies the situation. Reviewers: Andrew Schofield --- .../group/GroupCoordinatorRecordHelpers.java | 15 +++ .../group/GroupCoordinatorService.java | 34 ++++-- .../group/GroupCoordinatorShard.java | 9 +- .../group/GroupMetadataManager.java | 79 ++++++++----- .../group/modern/share/ShareGroup.java | 1 + .../kafka/coordinator/group/Assertions.java | 31 ++++- .../group/GroupCoordinatorShardTest.java | 43 ++++--- .../group/GroupMetadataManagerTest.java | 108 ++++++++++++------ .../share/ShareCoordinatorShard.java | 10 ++ .../share/ShareCoordinatorShardTest.java | 23 +--- 10 files changed, 234 insertions(+), 119 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java index d0f6901d444..42790515a2d 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java @@ -814,6 +814,21 @@ public class GroupCoordinatorRecordHelpers { ); } + /** + * Creates a ShareGroupStatePartitionMetadata tombstone. + * + * @param groupId The share group id. + * @return The record. + */ + public static CoordinatorRecord newShareGroupStatePartitionMetadataTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId) + ); + } + /** * Creates a ShareGroupStatePartitionMetadata record. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 97bc7e69622..c00502af48f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -1073,24 +1073,25 @@ public class GroupCoordinatorService implements GroupCoordinator { groupsByTopicPartition.forEach((topicPartition, groupList) -> { CompletableFuture future = deleteShareGroups(topicPartition, groupList).thenCompose(groupErrMap -> { - DeleteGroupsResponseData.DeletableGroupResultCollection collection = new DeleteGroupsResponseData.DeletableGroupResultCollection(); - List retainedGroupIds = deleteCandidateGroupIds(groupErrMap, groupList, collection); + DeleteGroupsResponseData.DeletableGroupResultCollection deletableGroupResults = new DeleteGroupsResponseData.DeletableGroupResultCollection(); + List retainedGroupIds = updateResponseAndGetNonErrorGroupList(groupErrMap, groupList, deletableGroupResults); if (retainedGroupIds.isEmpty()) { - return CompletableFuture.completedFuture(collection); + return CompletableFuture.completedFuture(deletableGroupResults); } return handleDeleteGroups(context, topicPartition, retainedGroupIds) - .whenComplete((resp, __) -> resp.forEach(result -> collection.add(result.duplicate()))) - .thenApply(__ -> collection); + .whenComplete((resp, __) -> resp.forEach(result -> deletableGroupResults.add(result.duplicate()))) + .thenApply(__ -> deletableGroupResults); }); // deleteShareGroups has its own exceptionally block, so we don't need one here. // This future object has the following stages: // - First it invokes the share group delete flow where the shard sharePartitionDeleteRequests // method is invoked, and it returns request objects for each valid share group passed to it. + // All initialized and initializing share partitions are moved to deleting. // - Then the requests are passed to the persister.deleteState method one at a time. The results // are collated as a Map of groupId -> persister errors - // - The above map is then used to decide whether to invoke the group coordinator delete groups logic + // - The above map can be used to decide whether to invoke the group coordinator delete groups logic // - Share groups with failed persister delete are NOT CONSIDERED for group coordinator delete. // TLDR: DeleteShareGroups -> filter erroneous persister deletes -> general delete groups logic futures.add(future); @@ -1102,17 +1103,26 @@ public class GroupCoordinatorService implements GroupCoordinator { (accumulator, newResults) -> newResults.forEach(result -> accumulator.add(result.duplicate()))); } - private List deleteCandidateGroupIds( - Map groupErrMap, + /** + * Processes input shareGroupErrMap by retaining only those which do not contain an error. + * Also updates the result collection input arg with share groups containing errors. + * + * @param shareGroupErrMap Map keyed on share groupId and value as the error (NONE for no error). + * @param groupList Entire list of groups (all types) + * @param deletableGroupResults Collection of responses for delete groups request. + * @return A list of all non-error groupIds + */ + private List updateResponseAndGetNonErrorGroupList( + Map shareGroupErrMap, List groupList, - DeleteGroupsResponseData.DeletableGroupResultCollection collection + DeleteGroupsResponseData.DeletableGroupResultCollection deletableGroupResults ) { List errGroupIds = new ArrayList<>(); - groupErrMap.forEach((groupId, error) -> { + shareGroupErrMap.forEach((groupId, error) -> { if (error.code() != Errors.NONE.code()) { log.error("Error deleting share group {} due to error {}", groupId, error); errGroupIds.add(groupId); - collection.add( + deletableGroupResults.add( new DeleteGroupsResponseData.DeletableGroupResult() .setGroupId(groupId) .setErrorCode(error.code()) @@ -1153,7 +1163,7 @@ public class GroupCoordinatorService implements GroupCoordinator { TopicPartition topicPartition, List groupList ) { - // topicPartition refers to internal topic __consumer_offsets + // topicPartition refers to internal topic __consumer_offsets. return runtime.scheduleWriteOperation( "delete-share-groups", topicPartition, diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index ded14600dab..812641c9459 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -589,18 +589,19 @@ public class GroupCoordinatorShard implements CoordinatorShard

+ *

* The groupIds are first filtered by type to restrict the list to share groups. * @param groupIds - A list of groupIds as string - * @return {@link CoordinatorResult} object always containing empty records and Map keyed on groupId and value pair (req, error) + * @return A result object containing a map keyed on groupId and value pair (req, error) and related coordinator records. */ public CoordinatorResult>, CoordinatorRecord> sharePartitionDeleteRequests(List groupIds) { Map> responseMap = new HashMap<>(); + List records = new ArrayList<>(); for (String groupId : groupIds) { try { ShareGroup group = groupMetadataManager.shareGroup(groupId); group.validateDeleteGroup(); - groupMetadataManager.shareGroupBuildPartitionDeleteRequest(group) + groupMetadataManager.shareGroupBuildPartitionDeleteRequest(groupId, records) .ifPresent(req -> responseMap.put(groupId, Map.entry(req, Errors.NONE))); } catch (GroupIdNotFoundException exception) { log.debug("GroupId {} not found as a share group.", groupId); @@ -609,7 +610,7 @@ public class GroupCoordinatorShard implements CoordinatorShard(List.of(), responseMap); + return new CoordinatorResult<>(records, responseMap); } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 08e9159b02d..f8a114d999f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -2984,7 +2984,7 @@ public class GroupMetadataManager { groupId, attachTopicName(finalInitializingMap), attachTopicName(currentMap.initializedTopics()), - Map.of() + attachTopicName(currentMap.deletingTopics()) ) ); } @@ -4979,7 +4979,7 @@ public class GroupMetadataManager { group.groupId(), attachTopicName(finalInitializingMap), attachTopicName(finalInitializedMap), - Map.of() + attachTopicName(currentMap.deletingTopics()) )), null ); @@ -5025,7 +5025,7 @@ public class GroupMetadataManager { groupId, attachTopicName(finalInitializingTopics), attachTopicName(info.initializedTopics()), - Map.of() + attachTopicName(info.deletingTopics()) ) ), null @@ -5057,6 +5057,13 @@ public class GroupMetadataManager { return requests; } + private Map attachTopicName(Set topicIds) { + TopicsImage topicsImage = metadataImage.topics(); + return topicIds.stream() + .map(topicId -> Map.entry(topicId, topicsImage.getTopic(topicId).name())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + private Map>> attachTopicName(Map> initMap) { TopicsImage topicsImage = metadataImage.topics(); Map>> finalMap = new HashMap<>(); @@ -8142,39 +8149,49 @@ public class GroupMetadataManager { /** * Returns an optional of delete share group request object to be used with the persister. * Empty if no subscribed topics or if the share group is empty. - * @param shareGroup - A share group + * @param shareGroupId Share group id + * @param records List of coordinator records to append to * @return Optional of object representing the share group state delete request. */ - public Optional shareGroupBuildPartitionDeleteRequest(ShareGroup shareGroup) { - TopicsImage topicsImage = metadataImage.topics(); - Set subscribedTopics = shareGroup.subscribedTopicNames().keySet(); - List> topicDataList = new ArrayList<>(subscribedTopics.size()); - - for (String topic : subscribedTopics) { - TopicImage topicImage = topicsImage.getTopic(topic); - topicDataList.add( - new TopicData<>( - topicImage.id(), - topicImage.partitions().keySet().stream() - .map(PartitionFactory::newPartitionIdData) - .toList() - ) - ); - } - - if (topicDataList.isEmpty()) { + public Optional shareGroupBuildPartitionDeleteRequest(String shareGroupId, List records) { + if (!shareGroupPartitionMetadata.containsKey(shareGroupId)) { return Optional.empty(); } - return Optional.of( - new DeleteShareGroupStateParameters.Builder() - .setGroupTopicPartitionData( - new GroupTopicPartitionData.Builder() - .setGroupId(shareGroup.groupId()) - .setTopicsData(topicDataList) - .build() - ) - .build() + Map> deleteCandidates = mergeShareGroupInitMaps( + shareGroupPartitionMetadata.get(shareGroupId).initializedTopics(), + shareGroupPartitionMetadata.get(shareGroupId).initializingTopics() + ); + + if (deleteCandidates.isEmpty()) { + return Optional.empty(); + } + + List> topicDataList = new ArrayList<>(deleteCandidates.size()); + + for (Map.Entry> entry : deleteCandidates.entrySet()) { + topicDataList.add(new TopicData<>( + entry.getKey(), + entry.getValue().stream() + .map(PartitionFactory::newPartitionIdData) + .toList() + )); + } + + // Remove all initializing and initialized topic info from record and add deleting. + records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( + shareGroupId, + Map.of(), + Map.of(), + attachTopicName(deleteCandidates.keySet()) + )); + + return Optional.of(new DeleteShareGroupStateParameters.Builder() + .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() + .setGroupId(shareGroupId) + .setTopicsData(topicDataList) + .build()) + .build() ); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java index 1e7d11f866a..b63100744c6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java @@ -236,6 +236,7 @@ public class ShareGroup extends ModernGroup { ); records.add(GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataTombstoneRecord(groupId())); + records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataTombstoneRecord(groupId())); records.add(GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord(groupId())); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java index 227d07a8def..a593bb3d664 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java @@ -31,6 +31,7 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetada import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.opentest4j.AssertionFailedError; @@ -60,7 +61,8 @@ public class Assertions { ConsumerGroupPartitionMetadataValue.class, Assertions::assertConsumerGroupPartitionMetadataValue, GroupMetadataValue.class, Assertions::assertGroupMetadataValue, ConsumerGroupTargetAssignmentMemberValue.class, Assertions::assertConsumerGroupTargetAssignmentMemberValue, - ShareGroupPartitionMetadataValue.class, Assertions::assertShareGroupPartitionMetadataValue + ShareGroupPartitionMetadataValue.class, Assertions::assertShareGroupPartitionMetadataValue, + ShareGroupStatePartitionMetadataValue.class, Assertions::assertShareGroupStatePartitionMetadataValue ); public static void assertResponseEquals( @@ -285,6 +287,33 @@ public class Assertions { assertEquals(expected, actual); } + private static void assertShareGroupStatePartitionMetadataValue( + ApiMessage exp, + ApiMessage act + ) { + ShareGroupStatePartitionMetadataValue expected = (ShareGroupStatePartitionMetadataValue) exp.duplicate(); + ShareGroupStatePartitionMetadataValue actual = (ShareGroupStatePartitionMetadataValue) act.duplicate(); + + Consumer normalize = message -> { + message.initializedTopics().sort(Comparator.comparing(ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo::topicId)); + message.initializedTopics().forEach(topic -> { + topic.partitions().sort(Comparator.naturalOrder()); + }); + + message.initializingTopics().sort(Comparator.comparing(ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo::topicId)); + message.initializingTopics().forEach(topic -> { + topic.partitions().sort(Comparator.naturalOrder()); + }); + + message.deletingTopics().sort(Comparator.comparing(ShareGroupStatePartitionMetadataValue.TopicInfo::topicId)); + }; + + normalize.accept(expected); + normalize.accept(actual); + + assertEquals(expected, actual); + } + private static void assertGroupMetadataValue( ApiMessage exp, ApiMessage act diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java index f8e0d8a70f6..16665f98bf2 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java @@ -1860,9 +1860,10 @@ public class GroupCoordinatorShardTest { metricsShard ); - ShareGroup shareGroup = new ShareGroup(new SnapshotRegistry(mock(LogContext.class)), "share-group"); + String groupId = "share-group"; + ShareGroup shareGroup = new ShareGroup(new SnapshotRegistry(mock(LogContext.class)), groupId); - when(groupMetadataManager.shareGroup(eq("share-group"))).thenReturn(shareGroup); + when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup); when(groupMetadataManager.shareGroup(eq("non-share-group"))).thenThrow(GroupIdNotFoundException.class); TopicData topicData = new TopicData<>(Uuid.randomUuid(), @@ -1873,21 +1874,20 @@ public class GroupCoordinatorShardTest { DeleteShareGroupStateParameters params = new DeleteShareGroupStateParameters.Builder() .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() - .setGroupId("share-group") + .setGroupId(groupId) .setTopicsData(List.of(topicData)) .build()) .build(); - when(groupMetadataManager.shareGroupBuildPartitionDeleteRequest(eq(shareGroup))).thenReturn(Optional.of(params)); + when(groupMetadataManager.shareGroupBuildPartitionDeleteRequest(eq(groupId), anyList())).thenReturn(Optional.of(params)); CoordinatorResult>, CoordinatorRecord> expectedResult = - new CoordinatorResult<>(List.of(), Map.of("share-group", Map.entry(params, Errors.NONE))); + new CoordinatorResult<>(List.of(), Map.of(groupId, Map.entry(params, Errors.NONE))); - - assertEquals(expectedResult, coordinator.sharePartitionDeleteRequests(List.of("share-group", "non-share-group"))); - verify(groupMetadataManager, times(1)).shareGroup(eq("share-group")); + assertEquals(expectedResult, coordinator.sharePartitionDeleteRequests(List.of(groupId, "non-share-group"))); + verify(groupMetadataManager, times(1)).shareGroup(eq(groupId)); verify(groupMetadataManager, times(1)).shareGroup(eq("non-share-group")); - verify(groupMetadataManager, times(1)).shareGroupBuildPartitionDeleteRequest(eq(shareGroup)); + verify(groupMetadataManager, times(1)).shareGroupBuildPartitionDeleteRequest(eq(groupId), anyList()); // empty list Mockito.reset(groupMetadataManager); @@ -1897,9 +1897,9 @@ public class GroupCoordinatorShardTest { coordinator.sharePartitionDeleteRequests(List.of()) ); - verify(groupMetadataManager, times(0)).group(eq("share-group")); + verify(groupMetadataManager, times(0)).group(eq(groupId)); verify(groupMetadataManager, times(0)).group(eq("non-share-group")); - verify(groupMetadataManager, times(0)).shareGroupBuildPartitionDeleteRequest(eq(shareGroup)); + verify(groupMetadataManager, times(0)).shareGroupBuildPartitionDeleteRequest(eq(groupId), anyList()); } @Test @@ -1919,19 +1919,24 @@ public class GroupCoordinatorShardTest { metricsShard ); + String groupId = "share-group"; ShareGroup shareGroup = mock(ShareGroup.class); doThrow(new GroupNotEmptyException("bad stuff")).when(shareGroup).validateDeleteGroup(); - when(groupMetadataManager.shareGroup(eq("share-group"))).thenReturn(shareGroup); + when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup); CoordinatorResult>, CoordinatorRecord> expectedResult = - new CoordinatorResult<>(List.of(), Map.of("share-group", Map.entry(DeleteShareGroupStateParameters.EMPTY_PARAMS, - Errors.forException(new GroupNotEmptyException("bad stuff"))) - )); - assertEquals(expectedResult, coordinator.sharePartitionDeleteRequests(List.of("share-group"))); - verify(groupMetadataManager, times(1)).shareGroup(eq("share-group")); + new CoordinatorResult<>( + List.of(), + Map.of( + groupId, + Map.entry(DeleteShareGroupStateParameters.EMPTY_PARAMS, Errors.forException(new GroupNotEmptyException("bad stuff"))) + ) + ); + assertEquals(expectedResult, coordinator.sharePartitionDeleteRequests(List.of(groupId))); + verify(groupMetadataManager, times(1)).shareGroup(eq(groupId)); // Not called because of NON-EMPTY group. - verify(groupMetadataManager, times(0)).shareGroupBuildPartitionDeleteRequest(eq(shareGroup)); + verify(groupMetadataManager, times(0)).shareGroupBuildPartitionDeleteRequest(eq(groupId), anyList()); // empty list Mockito.reset(groupMetadataManager); @@ -1942,6 +1947,6 @@ public class GroupCoordinatorShardTest { ); verify(groupMetadataManager, times(0)).group(eq("share-group")); - verify(groupMetadataManager, times(0)).shareGroupBuildPartitionDeleteRequest(eq(shareGroup)); + verify(groupMetadataManager, times(0)).shareGroupBuildPartitionDeleteRequest(eq(groupId), anyList()); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 95e247beef7..35574d35ca3 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -99,7 +99,6 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.modern.Assignment; import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl; import org.apache.kafka.coordinator.group.modern.MemberState; -import org.apache.kafka.coordinator.group.modern.SubscriptionCount; import org.apache.kafka.coordinator.group.modern.TopicMetadata; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupBuilder; @@ -128,9 +127,7 @@ import org.apache.kafka.server.authorizer.Action; import org.apache.kafka.server.authorizer.AuthorizationResult; import org.apache.kafka.server.authorizer.Authorizer; import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters; -import org.apache.kafka.server.share.persister.GroupTopicPartitionData; import org.apache.kafka.server.share.persister.InitializeShareGroupStateParameters; -import org.apache.kafka.server.share.persister.PartitionFactory; import org.apache.kafka.server.share.persister.PartitionIdData; import org.apache.kafka.server.share.persister.PartitionStateData; import org.apache.kafka.server.share.persister.TopicData; @@ -147,7 +144,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -209,7 +205,6 @@ import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -15421,7 +15416,7 @@ public class GroupMetadataManagerTest { } @Test - public void testShareGroupDelete() { + public void testShareGroupDeleteTombstones() { String groupId = "share-group-id"; GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withShareGroup(new ShareGroupBuilder(groupId, 10)) @@ -15430,6 +15425,7 @@ public class GroupMetadataManagerTest { List expectedRecords = List.of( GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochTombstoneRecord(groupId), GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataTombstoneRecord(groupId), + GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataTombstoneRecord(groupId), GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord(groupId) ); List records = new ArrayList<>(); @@ -20745,7 +20741,7 @@ public class GroupMetadataManagerTest { } @Test - public void testSharePartitionDeleteRequest() { + public void testShareGroupDeleteRequest() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() @@ -20754,38 +20750,58 @@ public class GroupMetadataManagerTest { Uuid t1Uuid = Uuid.randomUuid(); Uuid t2Uuid = Uuid.randomUuid(); - MetadataImage image = spy(new MetadataImageBuilder() - .addTopic(t1Uuid, "t1", 2) - .addTopic(t2Uuid, "t2", 2) - .build()); - - context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class)); + String t1Name = "t1"; + String t2Name = "t2"; + String groupId = "share-group"; ShareGroup shareGroup = mock(ShareGroup.class); - Map topicMap = new LinkedHashMap<>(); - topicMap.put("t1", mock(SubscriptionCount.class)); - topicMap.put("t2", mock(SubscriptionCount.class)); - when(shareGroup.subscribedTopicNames()).thenReturn(topicMap); - when(shareGroup.groupId()).thenReturn("share-group"); + when(shareGroup.groupId()).thenReturn(groupId); when(shareGroup.isEmpty()).thenReturn(false); - DeleteShareGroupStateParameters expectedParameters = new DeleteShareGroupStateParameters.Builder() - .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() - .setGroupId("share-group") - .setTopicsData(List.of( - new TopicData<>(t1Uuid, List.of(PartitionFactory.newPartitionIdData(0), PartitionFactory.newPartitionIdData(1))), - new TopicData<>(t2Uuid, List.of(PartitionFactory.newPartitionIdData(0), PartitionFactory.newPartitionIdData(1))) - )) - .build() - ) + MetadataImage image = new MetadataImageBuilder() + .addTopic(t1Uuid, t1Name, 2) + .addTopic(t2Uuid, t2Name, 2) .build(); - Optional params = context.groupMetadataManager.shareGroupBuildPartitionDeleteRequest(shareGroup); - assertTrue(params.isPresent()); - assertEquals(expectedParameters.groupTopicPartitionData(), params.get().groupTopicPartitionData()); - verify(image, times(1)).topics(); - verify(shareGroup, times(1)).subscribedTopicNames(); - verify(shareGroup, times(1)).groupId(); + MetadataDelta delta = new MetadataDelta(image); + context.groupMetadataManager.onNewMetadataImage(image, delta); + + context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0)); + + context.replay( + GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( + groupId, + Map.of(t1Uuid, Map.entry(t1Name, Set.of(0, 1))), + Map.of(t2Uuid, Map.entry(t2Name, Set.of(0, 1))), + Map.of() + ) + ); + + context.commit(); + + Map> expectedTopicPartitionMap = Map.of( + t1Uuid, Set.of(0, 1), + t2Uuid, Set.of(0, 1) + ); + + List expectedRecords = List.of( + newShareGroupStatePartitionMetadataRecord( + groupId, + Map.of(), + Map.of(), + Map.of(t1Uuid, t1Name, t2Uuid, t2Name) + ) + ); + + List records = new ArrayList<>(); + Optional params = context.groupMetadataManager.shareGroupBuildPartitionDeleteRequest(groupId, records); + verifyShareGroupDeleteRequest( + params, + expectedTopicPartitionMap, + groupId, + true + ); + assertRecordsEquals(expectedRecords, records); } @Test @@ -20844,14 +20860,15 @@ public class GroupMetadataManagerTest { new ShareGroupStatePartitionMetadataKey() .setGroupId(groupId), new ShareGroupStatePartitionMetadataValue() + .setInitializingTopics(List.of()) .setInitializedTopics(List.of( new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() .setTopicId(t1Uuid) - .setTopicName("t1") + .setTopicName(t1Name) .setPartitions(List.of(0, 1)), new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() .setTopicId(t2Uuid) - .setTopicName("t2") + .setTopicName(t2Name) .setPartitions(List.of(0, 1)) )) .setDeletingTopics(List.of()) @@ -21292,4 +21309,25 @@ public class GroupMetadataManagerTest { assertTrue(initRequest.isEmpty()); } } + + private void verifyShareGroupDeleteRequest( + Optional deleteRequest, + Map> expectedTopicPartitionsMap, + String groupId, + boolean shouldExist + ) { + if (shouldExist) { + assertTrue(deleteRequest.isPresent()); + DeleteShareGroupStateParameters request = deleteRequest.get(); + assertEquals(groupId, request.groupTopicPartitionData().groupId()); + Map> actualTopicPartitionsMap = new HashMap<>(); + for (TopicData topicData : request.groupTopicPartitionData().topicsData()) { + actualTopicPartitionsMap.computeIfAbsent(topicData.topicId(), k -> new HashSet<>()) + .addAll(topicData.partitions().stream().map(PartitionIdData::partition).toList()); + } + assertEquals(expectedTopicPartitionsMap, actualTopicPartitionsMap); + } else { + assertTrue(deleteRequest.isEmpty()); + } + } } diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java index e0118ee2499..35891bddf8d 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java @@ -515,6 +515,16 @@ public class ShareCoordinatorShard implements CoordinatorShard(List.of(), new DeleteShareGroupStateResponseData().setResults( + List.of(DeleteShareGroupStateResponse.toResponseDeleteStateResult(key.topicId(), + List.of(DeleteShareGroupStateResponse.toResponsePartitionResult( + key.partition())) + )) + )); + } + CoordinatorRecord record = generateTombstoneRecord(key); // build successful response if record is correctly created DeleteShareGroupStateResponseData responseData = new DeleteShareGroupStateResponseData().setResults( diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java index 0426c4e5005..20be20832d8 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java @@ -994,9 +994,7 @@ class ShareCoordinatorShardTest { .setPartitions(List.of(new DeleteShareGroupStateRequestData.PartitionData() .setPartition(PARTITION))))); - CoordinatorResult result = shard.deleteState(request); - - // apply a record in to verify delete + // Apply a record to the state machine so that delete can be verified. CoordinatorRecord record = ShareCoordinatorRecordHelpers.newShareSnapshotRecord( GROUP_ID, TOPIC_ID, @@ -1021,7 +1019,9 @@ class ShareCoordinatorShardTest { assertNotNull(shard.getLeaderMapValue(shareCoordinatorKey)); assertNotNull(shard.getStateEpochMapValue(shareCoordinatorKey)); - // apply tombstone + CoordinatorResult result = shard.deleteState(request); + + // Apply tombstone. shard.replay(0L, 0L, (short) 0, result.records().get(0)); DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION); @@ -1039,7 +1039,7 @@ class ShareCoordinatorShardTest { } @Test - public void testDeleteStateFirstRecordDeleteSuccess() { + public void testDeleteStateUnintializedRecord() { ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION); @@ -1057,21 +1057,10 @@ class ShareCoordinatorShardTest { assertNull(shard.getLeaderMapValue(shareCoordinatorKey)); assertNull(shard.getStateEpochMapValue(shareCoordinatorKey)); - // apply tombstone - shard.replay(0L, 0L, (short) 0, result.records().get(0)); - DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION); - List expectedRecords = List.of( - ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord( - GROUP_ID, TOPIC_ID, PARTITION) - ); assertEquals(expectedData, result.response()); - assertEquals(expectedRecords, result.records()); - - assertNull(shard.getShareStateMapValue(shareCoordinatorKey)); - assertNull(shard.getLeaderMapValue(shareCoordinatorKey)); - assertNull(shard.getStateEpochMapValue(shareCoordinatorKey)); + assertEquals(List.of(), result.records()); } @Test