KAFKA-18629: Utilize share group partition metadata for delete group. (#19363)

* Currently, the delete share group code flow uses
`group.subscribedTopicNames()` to fetch information about all the share
partitions to which a share group is subscribed to.
* However, this is incorrect since once the group is EMPTY, a
precondition for delete, the aforementioned method will return an empty
list.
* In this PR we have leveraged the `ShareGroupStatePartitionMetadata`
record to grab the `initialized` and `initializing` partitions to build
the delete candidates, which remedies the situation.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Sushant Mahajan 2025-04-11 00:45:13 +05:30 committed by GitHub
parent c11938c926
commit a6dfde7ce6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 234 additions and 119 deletions

View File

@ -814,6 +814,21 @@ public class GroupCoordinatorRecordHelpers {
); );
} }
/**
* Creates a ShareGroupStatePartitionMetadata tombstone.
*
* @param groupId The share group id.
* @return The record.
*/
public static CoordinatorRecord newShareGroupStatePartitionMetadataTombstoneRecord(
String groupId
) {
return CoordinatorRecord.tombstone(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId)
);
}
/** /**
* Creates a ShareGroupStatePartitionMetadata record. * Creates a ShareGroupStatePartitionMetadata record.
* *

View File

@ -1073,24 +1073,25 @@ public class GroupCoordinatorService implements GroupCoordinator {
groupsByTopicPartition.forEach((topicPartition, groupList) -> { groupsByTopicPartition.forEach((topicPartition, groupList) -> {
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future = deleteShareGroups(topicPartition, groupList).thenCompose(groupErrMap -> { CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future = deleteShareGroups(topicPartition, groupList).thenCompose(groupErrMap -> {
DeleteGroupsResponseData.DeletableGroupResultCollection collection = new DeleteGroupsResponseData.DeletableGroupResultCollection(); DeleteGroupsResponseData.DeletableGroupResultCollection deletableGroupResults = new DeleteGroupsResponseData.DeletableGroupResultCollection();
List<String> retainedGroupIds = deleteCandidateGroupIds(groupErrMap, groupList, collection); List<String> retainedGroupIds = updateResponseAndGetNonErrorGroupList(groupErrMap, groupList, deletableGroupResults);
if (retainedGroupIds.isEmpty()) { if (retainedGroupIds.isEmpty()) {
return CompletableFuture.completedFuture(collection); return CompletableFuture.completedFuture(deletableGroupResults);
} }
return handleDeleteGroups(context, topicPartition, retainedGroupIds) return handleDeleteGroups(context, topicPartition, retainedGroupIds)
.whenComplete((resp, __) -> resp.forEach(result -> collection.add(result.duplicate()))) .whenComplete((resp, __) -> resp.forEach(result -> deletableGroupResults.add(result.duplicate())))
.thenApply(__ -> collection); .thenApply(__ -> deletableGroupResults);
}); });
// deleteShareGroups has its own exceptionally block, so we don't need one here. // deleteShareGroups has its own exceptionally block, so we don't need one here.
// This future object has the following stages: // This future object has the following stages:
// - First it invokes the share group delete flow where the shard sharePartitionDeleteRequests // - First it invokes the share group delete flow where the shard sharePartitionDeleteRequests
// method is invoked, and it returns request objects for each valid share group passed to it. // method is invoked, and it returns request objects for each valid share group passed to it.
// All initialized and initializing share partitions are moved to deleting.
// - Then the requests are passed to the persister.deleteState method one at a time. The results // - Then the requests are passed to the persister.deleteState method one at a time. The results
// are collated as a Map of groupId -> persister errors // are collated as a Map of groupId -> persister errors
// - The above map is then used to decide whether to invoke the group coordinator delete groups logic // - The above map can be used to decide whether to invoke the group coordinator delete groups logic
// - Share groups with failed persister delete are NOT CONSIDERED for group coordinator delete. // - Share groups with failed persister delete are NOT CONSIDERED for group coordinator delete.
// TLDR: DeleteShareGroups -> filter erroneous persister deletes -> general delete groups logic // TLDR: DeleteShareGroups -> filter erroneous persister deletes -> general delete groups logic
futures.add(future); futures.add(future);
@ -1102,17 +1103,26 @@ public class GroupCoordinatorService implements GroupCoordinator {
(accumulator, newResults) -> newResults.forEach(result -> accumulator.add(result.duplicate()))); (accumulator, newResults) -> newResults.forEach(result -> accumulator.add(result.duplicate())));
} }
private List<String> deleteCandidateGroupIds( /**
Map<String, Errors> groupErrMap, * Processes input shareGroupErrMap by retaining only those which do not contain an error.
* Also updates the result collection input arg with share groups containing errors.
*
* @param shareGroupErrMap Map keyed on share groupId and value as the error (NONE for no error).
* @param groupList Entire list of groups (all types)
* @param deletableGroupResults Collection of responses for delete groups request.
* @return A list of all non-error groupIds
*/
private List<String> updateResponseAndGetNonErrorGroupList(
Map<String, Errors> shareGroupErrMap,
List<String> groupList, List<String> groupList,
DeleteGroupsResponseData.DeletableGroupResultCollection collection DeleteGroupsResponseData.DeletableGroupResultCollection deletableGroupResults
) { ) {
List<String> errGroupIds = new ArrayList<>(); List<String> errGroupIds = new ArrayList<>();
groupErrMap.forEach((groupId, error) -> { shareGroupErrMap.forEach((groupId, error) -> {
if (error.code() != Errors.NONE.code()) { if (error.code() != Errors.NONE.code()) {
log.error("Error deleting share group {} due to error {}", groupId, error); log.error("Error deleting share group {} due to error {}", groupId, error);
errGroupIds.add(groupId); errGroupIds.add(groupId);
collection.add( deletableGroupResults.add(
new DeleteGroupsResponseData.DeletableGroupResult() new DeleteGroupsResponseData.DeletableGroupResult()
.setGroupId(groupId) .setGroupId(groupId)
.setErrorCode(error.code()) .setErrorCode(error.code())
@ -1153,7 +1163,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
TopicPartition topicPartition, TopicPartition topicPartition,
List<String> groupList List<String> groupList
) { ) {
// topicPartition refers to internal topic __consumer_offsets // topicPartition refers to internal topic __consumer_offsets.
return runtime.scheduleWriteOperation( return runtime.scheduleWriteOperation(
"delete-share-groups", "delete-share-groups",
topicPartition, topicPartition,

View File

@ -589,18 +589,19 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
* Method returns a Map keyed on groupId and value as pair of {@link DeleteShareGroupStateParameters} * Method returns a Map keyed on groupId and value as pair of {@link DeleteShareGroupStateParameters}
* and any ERRORS while building the request corresponding * and any ERRORS while building the request corresponding
* to the valid share groups passed as the input. * to the valid share groups passed as the input.
* <p></p> * <p>
* The groupIds are first filtered by type to restrict the list to share groups. * The groupIds are first filtered by type to restrict the list to share groups.
* @param groupIds - A list of groupIds as string * @param groupIds - A list of groupIds as string
* @return {@link CoordinatorResult} object always containing empty records and Map keyed on groupId and value pair (req, error) * @return A result object containing a map keyed on groupId and value pair (req, error) and related coordinator records.
*/ */
public CoordinatorResult<Map<String, Map.Entry<DeleteShareGroupStateParameters, Errors>>, CoordinatorRecord> sharePartitionDeleteRequests(List<String> groupIds) { public CoordinatorResult<Map<String, Map.Entry<DeleteShareGroupStateParameters, Errors>>, CoordinatorRecord> sharePartitionDeleteRequests(List<String> groupIds) {
Map<String, Map.Entry<DeleteShareGroupStateParameters, Errors>> responseMap = new HashMap<>(); Map<String, Map.Entry<DeleteShareGroupStateParameters, Errors>> responseMap = new HashMap<>();
List<CoordinatorRecord> records = new ArrayList<>();
for (String groupId : groupIds) { for (String groupId : groupIds) {
try { try {
ShareGroup group = groupMetadataManager.shareGroup(groupId); ShareGroup group = groupMetadataManager.shareGroup(groupId);
group.validateDeleteGroup(); group.validateDeleteGroup();
groupMetadataManager.shareGroupBuildPartitionDeleteRequest(group) groupMetadataManager.shareGroupBuildPartitionDeleteRequest(groupId, records)
.ifPresent(req -> responseMap.put(groupId, Map.entry(req, Errors.NONE))); .ifPresent(req -> responseMap.put(groupId, Map.entry(req, Errors.NONE)));
} catch (GroupIdNotFoundException exception) { } catch (GroupIdNotFoundException exception) {
log.debug("GroupId {} not found as a share group.", groupId); log.debug("GroupId {} not found as a share group.", groupId);
@ -609,7 +610,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
responseMap.put(groupId, Map.entry(DeleteShareGroupStateParameters.EMPTY_PARAMS, Errors.forException(exception))); responseMap.put(groupId, Map.entry(DeleteShareGroupStateParameters.EMPTY_PARAMS, Errors.forException(exception)));
} }
} }
return new CoordinatorResult<>(List.of(), responseMap); return new CoordinatorResult<>(records, responseMap);
} }
/** /**

View File

@ -2984,7 +2984,7 @@ public class GroupMetadataManager {
groupId, groupId,
attachTopicName(finalInitializingMap), attachTopicName(finalInitializingMap),
attachTopicName(currentMap.initializedTopics()), attachTopicName(currentMap.initializedTopics()),
Map.of() attachTopicName(currentMap.deletingTopics())
) )
); );
} }
@ -4979,7 +4979,7 @@ public class GroupMetadataManager {
group.groupId(), group.groupId(),
attachTopicName(finalInitializingMap), attachTopicName(finalInitializingMap),
attachTopicName(finalInitializedMap), attachTopicName(finalInitializedMap),
Map.of() attachTopicName(currentMap.deletingTopics())
)), )),
null null
); );
@ -5025,7 +5025,7 @@ public class GroupMetadataManager {
groupId, groupId,
attachTopicName(finalInitializingTopics), attachTopicName(finalInitializingTopics),
attachTopicName(info.initializedTopics()), attachTopicName(info.initializedTopics()),
Map.of() attachTopicName(info.deletingTopics())
) )
), ),
null null
@ -5057,6 +5057,13 @@ public class GroupMetadataManager {
return requests; return requests;
} }
private Map<Uuid, String> attachTopicName(Set<Uuid> topicIds) {
TopicsImage topicsImage = metadataImage.topics();
return topicIds.stream()
.map(topicId -> Map.entry(topicId, topicsImage.getTopic(topicId).name()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
private Map<Uuid, Map.Entry<String, Set<Integer>>> attachTopicName(Map<Uuid, Set<Integer>> initMap) { private Map<Uuid, Map.Entry<String, Set<Integer>>> attachTopicName(Map<Uuid, Set<Integer>> initMap) {
TopicsImage topicsImage = metadataImage.topics(); TopicsImage topicsImage = metadataImage.topics();
Map<Uuid, Map.Entry<String, Set<Integer>>> finalMap = new HashMap<>(); Map<Uuid, Map.Entry<String, Set<Integer>>> finalMap = new HashMap<>();
@ -8142,39 +8149,49 @@ public class GroupMetadataManager {
/** /**
* Returns an optional of delete share group request object to be used with the persister. * Returns an optional of delete share group request object to be used with the persister.
* Empty if no subscribed topics or if the share group is empty. * Empty if no subscribed topics or if the share group is empty.
* @param shareGroup - A share group * @param shareGroupId Share group id
* @param records List of coordinator records to append to
* @return Optional of object representing the share group state delete request. * @return Optional of object representing the share group state delete request.
*/ */
public Optional<DeleteShareGroupStateParameters> shareGroupBuildPartitionDeleteRequest(ShareGroup shareGroup) { public Optional<DeleteShareGroupStateParameters> shareGroupBuildPartitionDeleteRequest(String shareGroupId, List<CoordinatorRecord> records) {
TopicsImage topicsImage = metadataImage.topics(); if (!shareGroupPartitionMetadata.containsKey(shareGroupId)) {
Set<String> subscribedTopics = shareGroup.subscribedTopicNames().keySet();
List<TopicData<PartitionIdData>> topicDataList = new ArrayList<>(subscribedTopics.size());
for (String topic : subscribedTopics) {
TopicImage topicImage = topicsImage.getTopic(topic);
topicDataList.add(
new TopicData<>(
topicImage.id(),
topicImage.partitions().keySet().stream()
.map(PartitionFactory::newPartitionIdData)
.toList()
)
);
}
if (topicDataList.isEmpty()) {
return Optional.empty(); return Optional.empty();
} }
return Optional.of( Map<Uuid, Set<Integer>> deleteCandidates = mergeShareGroupInitMaps(
new DeleteShareGroupStateParameters.Builder() shareGroupPartitionMetadata.get(shareGroupId).initializedTopics(),
.setGroupTopicPartitionData( shareGroupPartitionMetadata.get(shareGroupId).initializingTopics()
new GroupTopicPartitionData.Builder<PartitionIdData>() );
.setGroupId(shareGroup.groupId())
.setTopicsData(topicDataList) if (deleteCandidates.isEmpty()) {
.build() return Optional.empty();
) }
.build()
List<TopicData<PartitionIdData>> topicDataList = new ArrayList<>(deleteCandidates.size());
for (Map.Entry<Uuid, Set<Integer>> entry : deleteCandidates.entrySet()) {
topicDataList.add(new TopicData<>(
entry.getKey(),
entry.getValue().stream()
.map(PartitionFactory::newPartitionIdData)
.toList()
));
}
// Remove all initializing and initialized topic info from record and add deleting.
records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
shareGroupId,
Map.of(),
Map.of(),
attachTopicName(deleteCandidates.keySet())
));
return Optional.of(new DeleteShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdData>()
.setGroupId(shareGroupId)
.setTopicsData(topicDataList)
.build())
.build()
); );
} }

View File

@ -236,6 +236,7 @@ public class ShareGroup extends ModernGroup<ShareGroupMember> {
); );
records.add(GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataTombstoneRecord(groupId())); records.add(GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataTombstoneRecord(groupId()));
records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataTombstoneRecord(groupId()));
records.add(GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord(groupId())); records.add(GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord(groupId()));
} }

View File

@ -31,6 +31,7 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetada
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.opentest4j.AssertionFailedError; import org.opentest4j.AssertionFailedError;
@ -60,7 +61,8 @@ public class Assertions {
ConsumerGroupPartitionMetadataValue.class, Assertions::assertConsumerGroupPartitionMetadataValue, ConsumerGroupPartitionMetadataValue.class, Assertions::assertConsumerGroupPartitionMetadataValue,
GroupMetadataValue.class, Assertions::assertGroupMetadataValue, GroupMetadataValue.class, Assertions::assertGroupMetadataValue,
ConsumerGroupTargetAssignmentMemberValue.class, Assertions::assertConsumerGroupTargetAssignmentMemberValue, ConsumerGroupTargetAssignmentMemberValue.class, Assertions::assertConsumerGroupTargetAssignmentMemberValue,
ShareGroupPartitionMetadataValue.class, Assertions::assertShareGroupPartitionMetadataValue ShareGroupPartitionMetadataValue.class, Assertions::assertShareGroupPartitionMetadataValue,
ShareGroupStatePartitionMetadataValue.class, Assertions::assertShareGroupStatePartitionMetadataValue
); );
public static void assertResponseEquals( public static void assertResponseEquals(
@ -285,6 +287,33 @@ public class Assertions {
assertEquals(expected, actual); assertEquals(expected, actual);
} }
private static void assertShareGroupStatePartitionMetadataValue(
ApiMessage exp,
ApiMessage act
) {
ShareGroupStatePartitionMetadataValue expected = (ShareGroupStatePartitionMetadataValue) exp.duplicate();
ShareGroupStatePartitionMetadataValue actual = (ShareGroupStatePartitionMetadataValue) act.duplicate();
Consumer<ShareGroupStatePartitionMetadataValue> normalize = message -> {
message.initializedTopics().sort(Comparator.comparing(ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo::topicId));
message.initializedTopics().forEach(topic -> {
topic.partitions().sort(Comparator.naturalOrder());
});
message.initializingTopics().sort(Comparator.comparing(ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo::topicId));
message.initializingTopics().forEach(topic -> {
topic.partitions().sort(Comparator.naturalOrder());
});
message.deletingTopics().sort(Comparator.comparing(ShareGroupStatePartitionMetadataValue.TopicInfo::topicId));
};
normalize.accept(expected);
normalize.accept(actual);
assertEquals(expected, actual);
}
private static void assertGroupMetadataValue( private static void assertGroupMetadataValue(
ApiMessage exp, ApiMessage exp,
ApiMessage act ApiMessage act

View File

@ -1860,9 +1860,10 @@ public class GroupCoordinatorShardTest {
metricsShard metricsShard
); );
ShareGroup shareGroup = new ShareGroup(new SnapshotRegistry(mock(LogContext.class)), "share-group"); String groupId = "share-group";
ShareGroup shareGroup = new ShareGroup(new SnapshotRegistry(mock(LogContext.class)), groupId);
when(groupMetadataManager.shareGroup(eq("share-group"))).thenReturn(shareGroup); when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup);
when(groupMetadataManager.shareGroup(eq("non-share-group"))).thenThrow(GroupIdNotFoundException.class); when(groupMetadataManager.shareGroup(eq("non-share-group"))).thenThrow(GroupIdNotFoundException.class);
TopicData<PartitionIdData> topicData = new TopicData<>(Uuid.randomUuid(), TopicData<PartitionIdData> topicData = new TopicData<>(Uuid.randomUuid(),
@ -1873,21 +1874,20 @@ public class GroupCoordinatorShardTest {
DeleteShareGroupStateParameters params = new DeleteShareGroupStateParameters.Builder() DeleteShareGroupStateParameters params = new DeleteShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdData>() .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdData>()
.setGroupId("share-group") .setGroupId(groupId)
.setTopicsData(List.of(topicData)) .setTopicsData(List.of(topicData))
.build()) .build())
.build(); .build();
when(groupMetadataManager.shareGroupBuildPartitionDeleteRequest(eq(shareGroup))).thenReturn(Optional.of(params)); when(groupMetadataManager.shareGroupBuildPartitionDeleteRequest(eq(groupId), anyList())).thenReturn(Optional.of(params));
CoordinatorResult<Map<String, Map.Entry<DeleteShareGroupStateParameters, Errors>>, CoordinatorRecord> expectedResult = CoordinatorResult<Map<String, Map.Entry<DeleteShareGroupStateParameters, Errors>>, CoordinatorRecord> expectedResult =
new CoordinatorResult<>(List.of(), Map.of("share-group", Map.entry(params, Errors.NONE))); new CoordinatorResult<>(List.of(), Map.of(groupId, Map.entry(params, Errors.NONE)));
assertEquals(expectedResult, coordinator.sharePartitionDeleteRequests(List.of(groupId, "non-share-group")));
assertEquals(expectedResult, coordinator.sharePartitionDeleteRequests(List.of("share-group", "non-share-group"))); verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
verify(groupMetadataManager, times(1)).shareGroup(eq("share-group"));
verify(groupMetadataManager, times(1)).shareGroup(eq("non-share-group")); verify(groupMetadataManager, times(1)).shareGroup(eq("non-share-group"));
verify(groupMetadataManager, times(1)).shareGroupBuildPartitionDeleteRequest(eq(shareGroup)); verify(groupMetadataManager, times(1)).shareGroupBuildPartitionDeleteRequest(eq(groupId), anyList());
// empty list // empty list
Mockito.reset(groupMetadataManager); Mockito.reset(groupMetadataManager);
@ -1897,9 +1897,9 @@ public class GroupCoordinatorShardTest {
coordinator.sharePartitionDeleteRequests(List.of()) coordinator.sharePartitionDeleteRequests(List.of())
); );
verify(groupMetadataManager, times(0)).group(eq("share-group")); verify(groupMetadataManager, times(0)).group(eq(groupId));
verify(groupMetadataManager, times(0)).group(eq("non-share-group")); verify(groupMetadataManager, times(0)).group(eq("non-share-group"));
verify(groupMetadataManager, times(0)).shareGroupBuildPartitionDeleteRequest(eq(shareGroup)); verify(groupMetadataManager, times(0)).shareGroupBuildPartitionDeleteRequest(eq(groupId), anyList());
} }
@Test @Test
@ -1919,19 +1919,24 @@ public class GroupCoordinatorShardTest {
metricsShard metricsShard
); );
String groupId = "share-group";
ShareGroup shareGroup = mock(ShareGroup.class); ShareGroup shareGroup = mock(ShareGroup.class);
doThrow(new GroupNotEmptyException("bad stuff")).when(shareGroup).validateDeleteGroup(); doThrow(new GroupNotEmptyException("bad stuff")).when(shareGroup).validateDeleteGroup();
when(groupMetadataManager.shareGroup(eq("share-group"))).thenReturn(shareGroup); when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup);
CoordinatorResult<Map<String, Map.Entry<DeleteShareGroupStateParameters, Errors>>, CoordinatorRecord> expectedResult = CoordinatorResult<Map<String, Map.Entry<DeleteShareGroupStateParameters, Errors>>, CoordinatorRecord> expectedResult =
new CoordinatorResult<>(List.of(), Map.of("share-group", Map.entry(DeleteShareGroupStateParameters.EMPTY_PARAMS, new CoordinatorResult<>(
Errors.forException(new GroupNotEmptyException("bad stuff"))) List.of(),
)); Map.of(
assertEquals(expectedResult, coordinator.sharePartitionDeleteRequests(List.of("share-group"))); groupId,
verify(groupMetadataManager, times(1)).shareGroup(eq("share-group")); Map.entry(DeleteShareGroupStateParameters.EMPTY_PARAMS, Errors.forException(new GroupNotEmptyException("bad stuff")))
)
);
assertEquals(expectedResult, coordinator.sharePartitionDeleteRequests(List.of(groupId)));
verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
// Not called because of NON-EMPTY group. // Not called because of NON-EMPTY group.
verify(groupMetadataManager, times(0)).shareGroupBuildPartitionDeleteRequest(eq(shareGroup)); verify(groupMetadataManager, times(0)).shareGroupBuildPartitionDeleteRequest(eq(groupId), anyList());
// empty list // empty list
Mockito.reset(groupMetadataManager); Mockito.reset(groupMetadataManager);
@ -1942,6 +1947,6 @@ public class GroupCoordinatorShardTest {
); );
verify(groupMetadataManager, times(0)).group(eq("share-group")); verify(groupMetadataManager, times(0)).group(eq("share-group"));
verify(groupMetadataManager, times(0)).shareGroupBuildPartitionDeleteRequest(eq(shareGroup)); verify(groupMetadataManager, times(0)).shareGroupBuildPartitionDeleteRequest(eq(groupId), anyList());
} }
} }

View File

@ -99,7 +99,6 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.modern.Assignment; import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl; import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
import org.apache.kafka.coordinator.group.modern.MemberState; import org.apache.kafka.coordinator.group.modern.MemberState;
import org.apache.kafka.coordinator.group.modern.SubscriptionCount;
import org.apache.kafka.coordinator.group.modern.TopicMetadata; import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupBuilder; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupBuilder;
@ -128,9 +127,7 @@ import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizationResult; import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.Authorizer; import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters; import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
import org.apache.kafka.server.share.persister.InitializeShareGroupStateParameters; import org.apache.kafka.server.share.persister.InitializeShareGroupStateParameters;
import org.apache.kafka.server.share.persister.PartitionFactory;
import org.apache.kafka.server.share.persister.PartitionIdData; import org.apache.kafka.server.share.persister.PartitionIdData;
import org.apache.kafka.server.share.persister.PartitionStateData; import org.apache.kafka.server.share.persister.PartitionStateData;
import org.apache.kafka.server.share.persister.TopicData; import org.apache.kafka.server.share.persister.TopicData;
@ -147,7 +144,6 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -209,7 +205,6 @@ import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -15421,7 +15416,7 @@ public class GroupMetadataManagerTest {
} }
@Test @Test
public void testShareGroupDelete() { public void testShareGroupDeleteTombstones() {
String groupId = "share-group-id"; String groupId = "share-group-id";
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroup(new ShareGroupBuilder(groupId, 10)) .withShareGroup(new ShareGroupBuilder(groupId, 10))
@ -15430,6 +15425,7 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of( List<CoordinatorRecord> expectedRecords = List.of(
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochTombstoneRecord(groupId), GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochTombstoneRecord(groupId),
GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataTombstoneRecord(groupId), GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataTombstoneRecord(groupId),
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataTombstoneRecord(groupId),
GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord(groupId) GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord(groupId)
); );
List<CoordinatorRecord> records = new ArrayList<>(); List<CoordinatorRecord> records = new ArrayList<>();
@ -20745,7 +20741,7 @@ public class GroupMetadataManagerTest {
} }
@Test @Test
public void testSharePartitionDeleteRequest() { public void testShareGroupDeleteRequest() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range"); MockPartitionAssignor assignor = new MockPartitionAssignor("range");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
@ -20754,38 +20750,58 @@ public class GroupMetadataManagerTest {
Uuid t1Uuid = Uuid.randomUuid(); Uuid t1Uuid = Uuid.randomUuid();
Uuid t2Uuid = Uuid.randomUuid(); Uuid t2Uuid = Uuid.randomUuid();
MetadataImage image = spy(new MetadataImageBuilder() String t1Name = "t1";
.addTopic(t1Uuid, "t1", 2) String t2Name = "t2";
.addTopic(t2Uuid, "t2", 2)
.build());
context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class));
String groupId = "share-group";
ShareGroup shareGroup = mock(ShareGroup.class); ShareGroup shareGroup = mock(ShareGroup.class);
Map<String, SubscriptionCount> topicMap = new LinkedHashMap<>(); when(shareGroup.groupId()).thenReturn(groupId);
topicMap.put("t1", mock(SubscriptionCount.class));
topicMap.put("t2", mock(SubscriptionCount.class));
when(shareGroup.subscribedTopicNames()).thenReturn(topicMap);
when(shareGroup.groupId()).thenReturn("share-group");
when(shareGroup.isEmpty()).thenReturn(false); when(shareGroup.isEmpty()).thenReturn(false);
DeleteShareGroupStateParameters expectedParameters = new DeleteShareGroupStateParameters.Builder() MetadataImage image = new MetadataImageBuilder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdData>() .addTopic(t1Uuid, t1Name, 2)
.setGroupId("share-group") .addTopic(t2Uuid, t2Name, 2)
.setTopicsData(List.of(
new TopicData<>(t1Uuid, List.of(PartitionFactory.newPartitionIdData(0), PartitionFactory.newPartitionIdData(1))),
new TopicData<>(t2Uuid, List.of(PartitionFactory.newPartitionIdData(0), PartitionFactory.newPartitionIdData(1)))
))
.build()
)
.build(); .build();
Optional<DeleteShareGroupStateParameters> params = context.groupMetadataManager.shareGroupBuildPartitionDeleteRequest(shareGroup);
assertTrue(params.isPresent());
assertEquals(expectedParameters.groupTopicPartitionData(), params.get().groupTopicPartitionData());
verify(image, times(1)).topics(); MetadataDelta delta = new MetadataDelta(image);
verify(shareGroup, times(1)).subscribedTopicNames(); context.groupMetadataManager.onNewMetadataImage(image, delta);
verify(shareGroup, times(1)).groupId();
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0));
context.replay(
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(t1Uuid, Map.entry(t1Name, Set.of(0, 1))),
Map.of(t2Uuid, Map.entry(t2Name, Set.of(0, 1))),
Map.of()
)
);
context.commit();
Map<Uuid, Set<Integer>> expectedTopicPartitionMap = Map.of(
t1Uuid, Set.of(0, 1),
t2Uuid, Set.of(0, 1)
);
List<CoordinatorRecord> expectedRecords = List.of(
newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(),
Map.of(),
Map.of(t1Uuid, t1Name, t2Uuid, t2Name)
)
);
List<CoordinatorRecord> records = new ArrayList<>();
Optional<DeleteShareGroupStateParameters> params = context.groupMetadataManager.shareGroupBuildPartitionDeleteRequest(groupId, records);
verifyShareGroupDeleteRequest(
params,
expectedTopicPartitionMap,
groupId,
true
);
assertRecordsEquals(expectedRecords, records);
} }
@Test @Test
@ -20844,14 +20860,15 @@ public class GroupMetadataManagerTest {
new ShareGroupStatePartitionMetadataKey() new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId), .setGroupId(groupId),
new ShareGroupStatePartitionMetadataValue() new ShareGroupStatePartitionMetadataValue()
.setInitializingTopics(List.of())
.setInitializedTopics(List.of( .setInitializedTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(t1Uuid) .setTopicId(t1Uuid)
.setTopicName("t1") .setTopicName(t1Name)
.setPartitions(List.of(0, 1)), .setPartitions(List.of(0, 1)),
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(t2Uuid) .setTopicId(t2Uuid)
.setTopicName("t2") .setTopicName(t2Name)
.setPartitions(List.of(0, 1)) .setPartitions(List.of(0, 1))
)) ))
.setDeletingTopics(List.of()) .setDeletingTopics(List.of())
@ -21292,4 +21309,25 @@ public class GroupMetadataManagerTest {
assertTrue(initRequest.isEmpty()); assertTrue(initRequest.isEmpty());
} }
} }
private void verifyShareGroupDeleteRequest(
Optional<DeleteShareGroupStateParameters> deleteRequest,
Map<Uuid, Set<Integer>> expectedTopicPartitionsMap,
String groupId,
boolean shouldExist
) {
if (shouldExist) {
assertTrue(deleteRequest.isPresent());
DeleteShareGroupStateParameters request = deleteRequest.get();
assertEquals(groupId, request.groupTopicPartitionData().groupId());
Map<Uuid, Set<Integer>> actualTopicPartitionsMap = new HashMap<>();
for (TopicData<PartitionIdData> topicData : request.groupTopicPartitionData().topicsData()) {
actualTopicPartitionsMap.computeIfAbsent(topicData.topicId(), k -> new HashSet<>())
.addAll(topicData.partitions().stream().map(PartitionIdData::partition).toList());
}
assertEquals(expectedTopicPartitionsMap, actualTopicPartitionsMap);
} else {
assertTrue(deleteRequest.isEmpty());
}
}
} }

View File

@ -515,6 +515,16 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
DeleteShareGroupStateRequestData.PartitionData partitionData = topicData.partitions().get(0); DeleteShareGroupStateRequestData.PartitionData partitionData = topicData.partitions().get(0);
SharePartitionKey key = SharePartitionKey.getInstance(request.groupId(), topicData.topicId(), partitionData.partition()); SharePartitionKey key = SharePartitionKey.getInstance(request.groupId(), topicData.topicId(), partitionData.partition());
if (!shareStateMap.containsKey(key)) {
log.warn("Attempted to delete non-existent share partition {}.", key);
return new CoordinatorResult<>(List.of(), new DeleteShareGroupStateResponseData().setResults(
List.of(DeleteShareGroupStateResponse.toResponseDeleteStateResult(key.topicId(),
List.of(DeleteShareGroupStateResponse.toResponsePartitionResult(
key.partition()))
))
));
}
CoordinatorRecord record = generateTombstoneRecord(key); CoordinatorRecord record = generateTombstoneRecord(key);
// build successful response if record is correctly created // build successful response if record is correctly created
DeleteShareGroupStateResponseData responseData = new DeleteShareGroupStateResponseData().setResults( DeleteShareGroupStateResponseData responseData = new DeleteShareGroupStateResponseData().setResults(

View File

@ -994,9 +994,7 @@ class ShareCoordinatorShardTest {
.setPartitions(List.of(new DeleteShareGroupStateRequestData.PartitionData() .setPartitions(List.of(new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION))))); .setPartition(PARTITION)))));
CoordinatorResult<DeleteShareGroupStateResponseData, CoordinatorRecord> result = shard.deleteState(request); // Apply a record to the state machine so that delete can be verified.
// apply a record in to verify delete
CoordinatorRecord record = ShareCoordinatorRecordHelpers.newShareSnapshotRecord( CoordinatorRecord record = ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID, GROUP_ID,
TOPIC_ID, TOPIC_ID,
@ -1021,7 +1019,9 @@ class ShareCoordinatorShardTest {
assertNotNull(shard.getLeaderMapValue(shareCoordinatorKey)); assertNotNull(shard.getLeaderMapValue(shareCoordinatorKey));
assertNotNull(shard.getStateEpochMapValue(shareCoordinatorKey)); assertNotNull(shard.getStateEpochMapValue(shareCoordinatorKey));
// apply tombstone CoordinatorResult<DeleteShareGroupStateResponseData, CoordinatorRecord> result = shard.deleteState(request);
// Apply tombstone.
shard.replay(0L, 0L, (short) 0, result.records().get(0)); shard.replay(0L, 0L, (short) 0, result.records().get(0));
DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION); DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
@ -1039,7 +1039,7 @@ class ShareCoordinatorShardTest {
} }
@Test @Test
public void testDeleteStateFirstRecordDeleteSuccess() { public void testDeleteStateUnintializedRecord() {
ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build();
SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION); SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
@ -1057,21 +1057,10 @@ class ShareCoordinatorShardTest {
assertNull(shard.getLeaderMapValue(shareCoordinatorKey)); assertNull(shard.getLeaderMapValue(shareCoordinatorKey));
assertNull(shard.getStateEpochMapValue(shareCoordinatorKey)); assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
// apply tombstone
shard.replay(0L, 0L, (short) 0, result.records().get(0));
DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION); DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
List<CoordinatorRecord> expectedRecords = List.of(
ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord(
GROUP_ID, TOPIC_ID, PARTITION)
);
assertEquals(expectedData, result.response()); assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records()); assertEquals(List.of(), result.records());
assertNull(shard.getShareStateMapValue(shareCoordinatorKey));
assertNull(shard.getLeaderMapValue(shareCoordinatorKey));
assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
} }
@Test @Test