mirror of https://github.com/apache/kafka.git
KAFKA-18629: Delete share group state RPC group coordinator impl. [3/N] (#18848)
* In this PR, we have added GC side impl to call the delete state share coord RPC using the persister. * We will be using the existing `GroupCoordinatorService.deleteGroups`. The logic will be modified as follows: * After sanitization, we will call a new `runtime.scheduleWriteOperation` (not read for consistency) with callback `GroupCoordinatorShard.sharePartitions`. This will return a Map of share partitions of the groups which are of SHARE type. We need to pass all groups as WE CANNOT DETERMINE the type of the group in the service class. * Then using the map we will create requests which could be passed to the persister and make the appropriate calls. * Once this future completes, we will continue with the existing flow of group deletion. * If the group under inspection is not share group - the read callback should return an empty map. * Tests have been added wherever applicable. Reviewers: David Jacot <djacot@confluent.io>, Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
parent
d31cbf59de
commit
c2cb543a1e
|
@ -73,6 +73,7 @@
|
|||
<allow pkg="org.apache.kafka.coordinator.common" />
|
||||
<allow pkg="org.apache.kafka.coordinator.common.runtime" />
|
||||
<allow pkg="com.google.re2j" />
|
||||
<allow pkg="org.apache.kafka.metadata" />
|
||||
<subpackage name="metrics">
|
||||
<allow pkg="com.yammer.metrics"/>
|
||||
<allow pkg="org.HdrHistogram" />
|
||||
|
|
|
@ -80,8 +80,13 @@ import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
|
|||
import org.apache.kafka.image.MetadataDelta;
|
||||
import org.apache.kafka.image.MetadataImage;
|
||||
import org.apache.kafka.server.record.BrokerCompressionType;
|
||||
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
|
||||
import org.apache.kafka.server.share.persister.DeleteShareGroupStateResult;
|
||||
import org.apache.kafka.server.share.persister.PartitionErrorData;
|
||||
import org.apache.kafka.server.share.persister.PartitionFactory;
|
||||
import org.apache.kafka.server.share.persister.Persister;
|
||||
import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryParameters;
|
||||
import org.apache.kafka.server.share.persister.TopicData;
|
||||
import org.apache.kafka.server.util.FutureUtils;
|
||||
import org.apache.kafka.server.util.timer.Timer;
|
||||
|
||||
|
@ -91,11 +96,13 @@ import java.time.Duration;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.OptionalInt;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
@ -109,6 +116,7 @@ import static org.apache.kafka.coordinator.common.runtime.CoordinatorOperationEx
|
|||
/**
|
||||
* The group coordinator service.
|
||||
*/
|
||||
@SuppressWarnings({"ClassDataAbstractionCoupling"})
|
||||
public class GroupCoordinatorService implements GroupCoordinator {
|
||||
|
||||
public static class Builder {
|
||||
|
@ -823,20 +831,27 @@ public class GroupCoordinatorService implements GroupCoordinator {
|
|||
});
|
||||
|
||||
groupsByTopicPartition.forEach((topicPartition, groupList) -> {
|
||||
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future =
|
||||
runtime.scheduleWriteOperation(
|
||||
"delete-groups",
|
||||
topicPartition,
|
||||
Duration.ofMillis(config.offsetCommitTimeoutMs()),
|
||||
coordinator -> coordinator.deleteGroups(context, groupList)
|
||||
).exceptionally(exception -> handleOperationException(
|
||||
"delete-groups",
|
||||
groupList,
|
||||
exception,
|
||||
(error, __) -> DeleteGroupsRequest.getErrorResultCollection(groupList, error),
|
||||
log
|
||||
));
|
||||
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future = deleteShareGroups(topicPartition, groupList).thenCompose(groupErrMap -> {
|
||||
DeleteGroupsResponseData.DeletableGroupResultCollection collection = new DeleteGroupsResponseData.DeletableGroupResultCollection();
|
||||
List<String> retainedGroupIds = deleteCandidateGroupIds(groupErrMap, groupList, collection);
|
||||
if (retainedGroupIds.isEmpty()) {
|
||||
return CompletableFuture.completedFuture(collection);
|
||||
}
|
||||
|
||||
return handleDeleteGroups(context, topicPartition, retainedGroupIds)
|
||||
.whenComplete((resp, __) -> resp.forEach(result -> collection.add(result.duplicate())))
|
||||
.thenApply(__ -> collection);
|
||||
});
|
||||
// deleteShareGroups has its own exceptionally block, so we don't need one here.
|
||||
|
||||
// This future object has the following stages:
|
||||
// - First it invokes the share group delete flow where the shard sharePartitionDeleteRequests
|
||||
// method is invoked, and it returns request objects for each valid share group passed to it.
|
||||
// - Then the requests are passed to the persister.deleteState method one at a time. The results
|
||||
// are collated as a Map of groupId -> persister errors
|
||||
// - The above map is then used to decide whether to invoke the group coordinator delete groups logic
|
||||
// - Share groups with failed persister delete are NOT CONSIDERED for group coordinator delete.
|
||||
// TLDR: DeleteShareGroups -> filter erroneous persister deletes -> general delete groups logic
|
||||
futures.add(future);
|
||||
});
|
||||
|
||||
|
@ -846,6 +861,152 @@ public class GroupCoordinatorService implements GroupCoordinator {
|
|||
(accumulator, newResults) -> newResults.forEach(result -> accumulator.add(result.duplicate())));
|
||||
}
|
||||
|
||||
private List<String> deleteCandidateGroupIds(
|
||||
Map<String, Errors> groupErrMap,
|
||||
List<String> groupList,
|
||||
DeleteGroupsResponseData.DeletableGroupResultCollection collection
|
||||
) {
|
||||
List<String> errGroupIds = new ArrayList<>();
|
||||
groupErrMap.forEach((groupId, error) -> {
|
||||
if (error.code() != Errors.NONE.code()) {
|
||||
log.error("Error deleting share group {} due to error {}", groupId, error);
|
||||
errGroupIds.add(groupId);
|
||||
collection.add(
|
||||
new DeleteGroupsResponseData.DeletableGroupResult()
|
||||
.setGroupId(groupId)
|
||||
.setErrorCode(error.code())
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
Set<String> groupSet = new HashSet<>(groupList);
|
||||
// Remove all share group ids which have errored out
|
||||
// when deleting with persister.
|
||||
groupSet.removeAll(errGroupIds);
|
||||
|
||||
// Let us invoke the standard procedure of any non-share
|
||||
// groups or successfully deleted share groups remaining.
|
||||
return groupSet.stream().toList();
|
||||
}
|
||||
|
||||
private CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> handleDeleteGroups(
|
||||
RequestContext context,
|
||||
TopicPartition topicPartition,
|
||||
List<String> groupIds
|
||||
) {
|
||||
return runtime.scheduleWriteOperation(
|
||||
"delete-groups",
|
||||
topicPartition,
|
||||
Duration.ofMillis(config.offsetCommitTimeoutMs()),
|
||||
coordinator -> coordinator.deleteGroups(context, groupIds)
|
||||
).exceptionally(exception -> handleOperationException(
|
||||
"delete-groups",
|
||||
groupIds,
|
||||
exception,
|
||||
(error, __) -> DeleteGroupsRequest.getErrorResultCollection(groupIds, error),
|
||||
log
|
||||
));
|
||||
}
|
||||
|
||||
private CompletableFuture<Map<String, Errors>> deleteShareGroups(
|
||||
TopicPartition topicPartition,
|
||||
List<String> groupList
|
||||
) {
|
||||
// topicPartition refers to internal topic __consumer_offsets
|
||||
return runtime.scheduleWriteOperation(
|
||||
"delete-share-groups",
|
||||
topicPartition,
|
||||
Duration.ofMillis(config.offsetCommitTimeoutMs()),
|
||||
coordinator -> coordinator.sharePartitionDeleteRequests(groupList)
|
||||
).thenCompose(
|
||||
this::performShareGroupsDeletion
|
||||
).exceptionally(exception -> handleOperationException(
|
||||
"delete-share-groups",
|
||||
groupList,
|
||||
exception,
|
||||
(error, __) -> {
|
||||
Map<String, Errors> errors = new HashMap<>();
|
||||
groupList.forEach(group -> errors.put(group, error));
|
||||
return errors;
|
||||
},
|
||||
log
|
||||
));
|
||||
}
|
||||
|
||||
private CompletableFuture<Map<String, Errors>> performShareGroupsDeletion(
|
||||
Map<String, Map.Entry<DeleteShareGroupStateParameters, Errors>> deleteRequests
|
||||
) {
|
||||
List<CompletableFuture<Map.Entry<String, DeleteShareGroupStateResult>>> futures = new ArrayList<>(deleteRequests.size());
|
||||
Map<String, Errors> errorMap = new HashMap<>();
|
||||
deleteRequests.forEach((groupId, valPair) -> {
|
||||
if (valPair.getValue() == Errors.NONE) {
|
||||
futures.add(deleteShareGroup(valPair.getKey()));
|
||||
} else {
|
||||
errorMap.put(groupId, valPair.getValue());
|
||||
}
|
||||
});
|
||||
|
||||
return persisterDeleteToGroupIdErrorMap(futures)
|
||||
.thenApply(respErrMap -> {
|
||||
errorMap.putAll(respErrMap);
|
||||
return errorMap;
|
||||
});
|
||||
}
|
||||
|
||||
private CompletableFuture<Map.Entry<String, DeleteShareGroupStateResult>> deleteShareGroup(
|
||||
DeleteShareGroupStateParameters deleteRequest
|
||||
) {
|
||||
String groupId = deleteRequest.groupTopicPartitionData().groupId();
|
||||
return persister.deleteState(deleteRequest)
|
||||
.thenCompose(result -> CompletableFuture.completedFuture(Map.entry(groupId, result)))
|
||||
.exceptionally(exception -> {
|
||||
// In case the deleteState call fails,
|
||||
// we should construct the appropriate response here
|
||||
// so that the subsequent callbacks don't see runtime exceptions.
|
||||
log.error("Unable to delete share group partition(s) - {} using request {}", groupId, deleteRequest, exception);
|
||||
List<TopicData<PartitionErrorData>> respTopicData = deleteRequest.groupTopicPartitionData().topicsData().stream()
|
||||
.map(reqTopicData -> new TopicData<>(
|
||||
reqTopicData.topicId(),
|
||||
reqTopicData.partitions().stream()
|
||||
.map(reqPartData -> {
|
||||
Errors err = Errors.forException(exception);
|
||||
return PartitionFactory.newPartitionErrorData(reqPartData.partition(), err.code(), err.message());
|
||||
})
|
||||
.toList()
|
||||
))
|
||||
.toList();
|
||||
|
||||
return Map.entry(groupId, new DeleteShareGroupStateResult.Builder()
|
||||
.setTopicsData(respTopicData)
|
||||
.build()
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
private CompletableFuture<Map<String, Errors>> persisterDeleteToGroupIdErrorMap(
|
||||
List<CompletableFuture<Map.Entry<String, DeleteShareGroupStateResult>>> futures
|
||||
) {
|
||||
return CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[]{})).thenCompose(v -> {
|
||||
Map<String, Errors> groupIds = new HashMap<>();
|
||||
for (CompletableFuture<Map.Entry<String, DeleteShareGroupStateResult>> future : futures) {
|
||||
Map.Entry<String, DeleteShareGroupStateResult> entry = future.getNow(null); // safe as within allOff
|
||||
groupIds.putIfAbsent(entry.getKey(), Errors.NONE);
|
||||
for (TopicData<PartitionErrorData> topicData : entry.getValue().topicsData()) {
|
||||
Optional<PartitionErrorData> errItem = topicData.partitions().stream()
|
||||
.filter(errData -> errData.errorCode() != Errors.NONE.code())
|
||||
.findAny();
|
||||
|
||||
errItem.ifPresent(val -> {
|
||||
log.error("Received error while deleting share group {} - {}", entry.getKey(), val);
|
||||
groupIds.put(entry.getKey(), Errors.forCode(val.errorCode()));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return CompletableFuture.completedFuture(groupIds);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* See {@link GroupCoordinator#fetchOffsets(RequestContext, OffsetFetchRequestData.OffsetFetchRequestGroup, boolean)}.
|
||||
*/
|
||||
|
|
|
@ -18,6 +18,8 @@ package org.apache.kafka.coordinator.group;
|
|||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.errors.ApiException;
|
||||
import org.apache.kafka.common.errors.GroupIdNotFoundException;
|
||||
import org.apache.kafka.common.errors.GroupNotEmptyException;
|
||||
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
||||
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
|
||||
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
|
||||
|
@ -107,16 +109,20 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
|
|||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
|
||||
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
|
||||
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
|
||||
import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
|
||||
import org.apache.kafka.image.MetadataDelta;
|
||||
import org.apache.kafka.image.MetadataImage;
|
||||
import org.apache.kafka.server.common.ApiMessageAndVersion;
|
||||
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
|
||||
import org.apache.kafka.timeline.SnapshotRegistry;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -490,6 +496,33 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
|
|||
return new CoordinatorResult<>(records, resultCollection);
|
||||
}
|
||||
|
||||
/**
|
||||
* Method returns a Map keyed on groupId and value as pair of {@link DeleteShareGroupStateParameters}
|
||||
* and any ERRORS while building the request corresponding
|
||||
* to the valid share groups passed as the input.
|
||||
* <p></p>
|
||||
* The groupIds are first filtered by type to restrict the list to share groups.
|
||||
* @param groupIds - A list of groupIds as string
|
||||
* @return {@link CoordinatorResult} object always containing empty records and Map keyed on groupId and value pair (req, error)
|
||||
*/
|
||||
public CoordinatorResult<Map<String, Map.Entry<DeleteShareGroupStateParameters, Errors>>, CoordinatorRecord> sharePartitionDeleteRequests(List<String> groupIds) {
|
||||
Map<String, Map.Entry<DeleteShareGroupStateParameters, Errors>> responseMap = new HashMap<>();
|
||||
for (String groupId : groupIds) {
|
||||
try {
|
||||
ShareGroup group = groupMetadataManager.shareGroup(groupId);
|
||||
group.validateDeleteGroup();
|
||||
groupMetadataManager.shareGroupBuildPartitionDeleteRequest(group)
|
||||
.ifPresent(req -> responseMap.put(groupId, Map.entry(req, Errors.NONE)));
|
||||
} catch (GroupIdNotFoundException exception) {
|
||||
log.debug("GroupId {} not found as a share group.", groupId);
|
||||
} catch (GroupNotEmptyException exception) {
|
||||
log.debug("Share group {} is not empty.", groupId);
|
||||
responseMap.put(groupId, Map.entry(DeleteShareGroupStateParameters.EMPTY_PARAMS, Errors.forException(exception)));
|
||||
}
|
||||
}
|
||||
return new CoordinatorResult<>(List.of(), responseMap);
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch offsets for a given set of partitions and a given group.
|
||||
*
|
||||
|
|
|
@ -140,6 +140,12 @@ import org.apache.kafka.image.MetadataDelta;
|
|||
import org.apache.kafka.image.MetadataImage;
|
||||
import org.apache.kafka.image.TopicImage;
|
||||
import org.apache.kafka.image.TopicsDelta;
|
||||
import org.apache.kafka.image.TopicsImage;
|
||||
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
|
||||
import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
|
||||
import org.apache.kafka.server.share.persister.PartitionFactory;
|
||||
import org.apache.kafka.server.share.persister.PartitionIdData;
|
||||
import org.apache.kafka.server.share.persister.TopicData;
|
||||
import org.apache.kafka.timeline.SnapshotRegistry;
|
||||
import org.apache.kafka.timeline.TimelineHashMap;
|
||||
import org.apache.kafka.timeline.TimelineHashSet;
|
||||
|
@ -6855,6 +6861,45 @@ public class GroupMetadataManager {
|
|||
group.createGroupTombstoneRecords(records);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an optional of delete share group request object to be used with the persister.
|
||||
* Empty if no subscribed topics or if the share group is empty.
|
||||
* @param shareGroup - A share group
|
||||
* @return Optional of object representing the share group state delete request.
|
||||
*/
|
||||
public Optional<DeleteShareGroupStateParameters> shareGroupBuildPartitionDeleteRequest(ShareGroup shareGroup) {
|
||||
TopicsImage topicsImage = metadataImage.topics();
|
||||
Set<String> subscribedTopics = shareGroup.subscribedTopicNames().keySet();
|
||||
List<TopicData<PartitionIdData>> topicDataList = new ArrayList<>(subscribedTopics.size());
|
||||
|
||||
for (String topic : subscribedTopics) {
|
||||
TopicImage topicImage = topicsImage.getTopic(topic);
|
||||
topicDataList.add(
|
||||
new TopicData<>(
|
||||
topicImage.id(),
|
||||
topicImage.partitions().keySet().stream()
|
||||
.map(PartitionFactory::newPartitionIdData)
|
||||
.toList()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
if (topicDataList.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
return Optional.of(
|
||||
new DeleteShareGroupStateParameters.Builder()
|
||||
.setGroupTopicPartitionData(
|
||||
new GroupTopicPartitionData.Builder<PartitionIdData>()
|
||||
.setGroupId(shareGroup.groupId())
|
||||
.setTopicsData(topicDataList)
|
||||
.build()
|
||||
)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the DeleteGroups request.
|
||||
*
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.kafka.common.Uuid;
|
|||
import org.apache.kafka.common.config.TopicConfig;
|
||||
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
|
||||
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
|
||||
import org.apache.kafka.common.errors.GroupNotEmptyException;
|
||||
import org.apache.kafka.common.errors.InvalidFetchSizeException;
|
||||
import org.apache.kafka.common.errors.InvalidRequestException;
|
||||
import org.apache.kafka.common.errors.KafkaStorageException;
|
||||
|
@ -81,11 +82,16 @@ import org.apache.kafka.image.MetadataImage;
|
|||
import org.apache.kafka.image.TopicsImage;
|
||||
import org.apache.kafka.server.record.BrokerCompressionType;
|
||||
import org.apache.kafka.server.share.persister.DefaultStatePersister;
|
||||
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
|
||||
import org.apache.kafka.server.share.persister.DeleteShareGroupStateResult;
|
||||
import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
|
||||
import org.apache.kafka.server.share.persister.NoOpStatePersister;
|
||||
import org.apache.kafka.server.share.persister.PartitionFactory;
|
||||
import org.apache.kafka.server.share.persister.PartitionIdData;
|
||||
import org.apache.kafka.server.share.persister.Persister;
|
||||
import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryParameters;
|
||||
import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryResult;
|
||||
import org.apache.kafka.server.share.persister.TopicData;
|
||||
import org.apache.kafka.server.util.FutureUtils;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
@ -115,6 +121,7 @@ import java.util.stream.Stream;
|
|||
import static org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID;
|
||||
import static org.apache.kafka.coordinator.common.runtime.TestUtil.requestContext;
|
||||
import static org.apache.kafka.coordinator.group.GroupConfigManagerTest.createConfigManager;
|
||||
import static org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters.EMPTY_PARAMS;
|
||||
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
@ -122,6 +129,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
|
|||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
@ -1594,6 +1602,13 @@ public class GroupCoordinatorServiceTest {
|
|||
result1.duplicate()
|
||||
));
|
||||
|
||||
when(runtime.scheduleWriteOperation(
|
||||
ArgumentMatchers.eq("delete-share-groups"),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.any()
|
||||
)).thenReturn(CompletableFuture.completedFuture(Map.of()));
|
||||
|
||||
when(runtime.scheduleWriteOperation(
|
||||
ArgumentMatchers.eq("delete-groups"),
|
||||
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 2)),
|
||||
|
@ -1627,6 +1642,358 @@ public class GroupCoordinatorServiceTest {
|
|||
assertEquals(expectedResultCollection, future.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteWithShareGroups() throws Exception {
|
||||
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
|
||||
Persister persister = mock(Persister.class);
|
||||
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
|
||||
.setConfig(createConfig())
|
||||
.setRuntime(runtime)
|
||||
.setMetrics(mock(GroupCoordinatorMetrics.class))
|
||||
.setPersister(persister)
|
||||
.build();
|
||||
service.startup(() -> 3);
|
||||
|
||||
DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection1 =
|
||||
new DeleteGroupsResponseData.DeletableGroupResultCollection();
|
||||
// share group
|
||||
DeleteGroupsResponseData.DeletableGroupResult result1 = new DeleteGroupsResponseData.DeletableGroupResult()
|
||||
.setGroupId("share-group-id-1");
|
||||
resultCollection1.add(result1);
|
||||
|
||||
DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection2 =
|
||||
new DeleteGroupsResponseData.DeletableGroupResultCollection();
|
||||
// non-share group
|
||||
DeleteGroupsResponseData.DeletableGroupResult result2 = new DeleteGroupsResponseData.DeletableGroupResult()
|
||||
.setGroupId("group-id-2");
|
||||
resultCollection2.add(result2);
|
||||
|
||||
// null
|
||||
DeleteGroupsResponseData.DeletableGroupResult result3 = new DeleteGroupsResponseData.DeletableGroupResult()
|
||||
.setGroupId(null)
|
||||
.setErrorCode(Errors.INVALID_GROUP_ID.code());
|
||||
|
||||
DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection =
|
||||
new DeleteGroupsResponseData.DeletableGroupResultCollection();
|
||||
expectedResultCollection.addAll(List.of(
|
||||
result3.duplicate(),
|
||||
result2.duplicate(),
|
||||
result1.duplicate()
|
||||
));
|
||||
|
||||
Uuid shareGroupTopicId = Uuid.randomUuid();
|
||||
|
||||
when(runtime.scheduleWriteOperation(
|
||||
ArgumentMatchers.eq("delete-share-groups"),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.any()
|
||||
)).thenReturn(CompletableFuture.completedFuture(
|
||||
Map.of("share-group-id-1", Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId, List.of(0, 1)), Errors.NONE))
|
||||
)).thenReturn(CompletableFuture.completedFuture(Map.of())); // non-share group
|
||||
|
||||
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(
|
||||
new DeleteShareGroupStateResult.Builder()
|
||||
.setTopicsData(List.of(
|
||||
new TopicData<>(
|
||||
shareGroupTopicId,
|
||||
List.of(
|
||||
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()),
|
||||
PartitionFactory.newPartitionErrorData(1, Errors.NONE.code(), Errors.NONE.message())
|
||||
))
|
||||
))
|
||||
.build()
|
||||
));
|
||||
|
||||
// share-group-id-1
|
||||
when(runtime.scheduleWriteOperation(
|
||||
ArgumentMatchers.eq("delete-groups"),
|
||||
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
|
||||
ArgumentMatchers.eq(Duration.ofMillis(5000)),
|
||||
ArgumentMatchers.any()
|
||||
)).thenReturn(CompletableFuture.completedFuture(resultCollection1));
|
||||
|
||||
// group-id-2
|
||||
when(runtime.scheduleWriteOperation(
|
||||
ArgumentMatchers.eq("delete-groups"),
|
||||
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
|
||||
ArgumentMatchers.eq(Duration.ofMillis(5000)),
|
||||
ArgumentMatchers.any()
|
||||
)).thenReturn(CompletableFuture.completedFuture(resultCollection2));
|
||||
|
||||
List<String> groupIds = Arrays.asList("share-group-id-1", "group-id-2", null);
|
||||
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future =
|
||||
service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), groupIds, BufferSupplier.NO_CACHING);
|
||||
|
||||
future.getNow(null);
|
||||
assertEquals(expectedResultCollection, future.get());
|
||||
verify(persister, times(1)).deleteState(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteShareGroupPersisterError() throws Exception {
|
||||
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
|
||||
Persister persister = mock(Persister.class);
|
||||
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
|
||||
.setConfig(createConfig())
|
||||
.setRuntime(runtime)
|
||||
.setMetrics(mock(GroupCoordinatorMetrics.class))
|
||||
.setPersister(persister)
|
||||
.build();
|
||||
service.startup(() -> 3);
|
||||
|
||||
DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection1 =
|
||||
new DeleteGroupsResponseData.DeletableGroupResultCollection();
|
||||
// share group err
|
||||
DeleteGroupsResponseData.DeletableGroupResult result1 = new DeleteGroupsResponseData.DeletableGroupResult()
|
||||
.setGroupId("share-group-id-1")
|
||||
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code());
|
||||
resultCollection1.add(result1);
|
||||
|
||||
DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection2 =
|
||||
new DeleteGroupsResponseData.DeletableGroupResultCollection();
|
||||
// share group success
|
||||
DeleteGroupsResponseData.DeletableGroupResult result2 = new DeleteGroupsResponseData.DeletableGroupResult()
|
||||
.setGroupId("share-group-id-2");
|
||||
resultCollection2.add(result2);
|
||||
|
||||
DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection =
|
||||
new DeleteGroupsResponseData.DeletableGroupResultCollection();
|
||||
expectedResultCollection.addAll(Arrays.asList(
|
||||
result1.duplicate(),
|
||||
result2.duplicate()));
|
||||
|
||||
Uuid shareGroupTopicId = Uuid.randomUuid();
|
||||
Uuid shareGroupTopicId2 = Uuid.randomUuid();
|
||||
when(runtime.scheduleWriteOperation(
|
||||
ArgumentMatchers.eq("delete-share-groups"),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.any()
|
||||
)).thenReturn(CompletableFuture.completedFuture(
|
||||
Map.of("share-group-id-1", Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId, List.of(0, 1)), Errors.NONE))
|
||||
)).thenReturn(CompletableFuture.completedFuture(
|
||||
Map.of("share-group-id-2", Map.entry(createDeleteShareRequest("share-group-id-2", shareGroupTopicId2, List.of(0, 1)), Errors.NONE))
|
||||
));
|
||||
|
||||
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(
|
||||
new DeleteShareGroupStateResult.Builder()
|
||||
.setTopicsData(List.of(
|
||||
new TopicData<>(
|
||||
shareGroupTopicId,
|
||||
List.of(
|
||||
PartitionFactory.newPartitionErrorData(0, Errors.UNKNOWN_SERVER_ERROR.code(), Errors.UNKNOWN_SERVER_ERROR.message()),
|
||||
PartitionFactory.newPartitionErrorData(1, Errors.UNKNOWN_SERVER_ERROR.code(), Errors.UNKNOWN_SERVER_ERROR.message())
|
||||
))
|
||||
))
|
||||
.build()
|
||||
)).thenReturn(CompletableFuture.completedFuture(
|
||||
new DeleteShareGroupStateResult.Builder()
|
||||
.setTopicsData(List.of(
|
||||
new TopicData<>(
|
||||
shareGroupTopicId2,
|
||||
List.of(
|
||||
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()),
|
||||
PartitionFactory.newPartitionErrorData(1, Errors.NONE.code(), Errors.NONE.message())
|
||||
))
|
||||
))
|
||||
.build()
|
||||
));
|
||||
|
||||
// share-group-id-1
|
||||
when(runtime.scheduleWriteOperation(
|
||||
ArgumentMatchers.eq("delete-groups"),
|
||||
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
|
||||
ArgumentMatchers.eq(Duration.ofMillis(5000)),
|
||||
ArgumentMatchers.any()
|
||||
)).thenReturn(CompletableFuture.completedFuture(resultCollection1));
|
||||
|
||||
// share-group-id-2
|
||||
when(runtime.scheduleWriteOperation(
|
||||
ArgumentMatchers.eq("delete-groups"),
|
||||
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 2)),
|
||||
ArgumentMatchers.eq(Duration.ofMillis(5000)),
|
||||
ArgumentMatchers.any()
|
||||
)).thenReturn(CompletableFuture.completedFuture(resultCollection2));
|
||||
|
||||
List<String> groupIds = List.of("share-group-id-1", "share-group-id-2");
|
||||
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future =
|
||||
service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), groupIds, BufferSupplier.NO_CACHING);
|
||||
|
||||
future.getNow(null);
|
||||
assertEquals(expectedResultCollection, future.get());
|
||||
verify(persister, times(2)).deleteState(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteShareGroupCoordinatorShareSpecificWriteError() throws Exception {
|
||||
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
|
||||
Persister persister = mock(Persister.class);
|
||||
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
|
||||
.setConfig(createConfig())
|
||||
.setRuntime(runtime)
|
||||
.setMetrics(mock(GroupCoordinatorMetrics.class))
|
||||
.setPersister(persister)
|
||||
.build();
|
||||
service.startup(() -> 3);
|
||||
|
||||
DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection1 =
|
||||
new DeleteGroupsResponseData.DeletableGroupResultCollection();
|
||||
// share group err
|
||||
DeleteGroupsResponseData.DeletableGroupResult result1 = new DeleteGroupsResponseData.DeletableGroupResult()
|
||||
.setGroupId("share-group-id-1")
|
||||
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code());
|
||||
resultCollection1.add(result1);
|
||||
|
||||
DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection =
|
||||
new DeleteGroupsResponseData.DeletableGroupResultCollection();
|
||||
expectedResultCollection.add(
|
||||
result1.duplicate()
|
||||
);
|
||||
|
||||
when(runtime.scheduleWriteOperation(
|
||||
ArgumentMatchers.eq("delete-share-groups"),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.any()
|
||||
)).thenReturn(CompletableFuture.failedFuture(
|
||||
Errors.COORDINATOR_NOT_AVAILABLE.exception()
|
||||
));
|
||||
|
||||
// share-group-id-1
|
||||
when(runtime.scheduleWriteOperation(
|
||||
ArgumentMatchers.eq("delete-groups"),
|
||||
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
|
||||
ArgumentMatchers.eq(Duration.ofMillis(5000)),
|
||||
ArgumentMatchers.any()
|
||||
)).thenReturn(CompletableFuture.completedFuture(resultCollection1));
|
||||
|
||||
List<String> groupIds = List.of("share-group-id-1");
|
||||
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future =
|
||||
service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), groupIds, BufferSupplier.NO_CACHING);
|
||||
|
||||
future.getNow(null);
|
||||
assertEquals(expectedResultCollection, future.get());
|
||||
verify(persister, times(0)).deleteState(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteShareGroupNotEmptyError() throws Exception {
|
||||
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
|
||||
Persister persister = mock(Persister.class);
|
||||
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
|
||||
.setConfig(createConfig())
|
||||
.setRuntime(runtime)
|
||||
.setMetrics(mock(GroupCoordinatorMetrics.class))
|
||||
.setPersister(persister)
|
||||
.build();
|
||||
service.startup(() -> 3);
|
||||
|
||||
DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection1 =
|
||||
new DeleteGroupsResponseData.DeletableGroupResultCollection();
|
||||
// share group err
|
||||
DeleteGroupsResponseData.DeletableGroupResult result1 = new DeleteGroupsResponseData.DeletableGroupResult()
|
||||
.setGroupId("share-group-id-1")
|
||||
.setErrorCode(Errors.forException(new GroupNotEmptyException("bad stuff")).code());
|
||||
resultCollection1.add(result1);
|
||||
|
||||
DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection =
|
||||
new DeleteGroupsResponseData.DeletableGroupResultCollection();
|
||||
expectedResultCollection.add(
|
||||
result1.duplicate()
|
||||
);
|
||||
|
||||
when(runtime.scheduleWriteOperation(
|
||||
ArgumentMatchers.eq("delete-share-groups"),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.any()
|
||||
)).thenReturn(CompletableFuture.completedFuture(
|
||||
Map.of("share-group-id-1", Map.entry(EMPTY_PARAMS, Errors.forException(new GroupNotEmptyException("bad stuff"))))
|
||||
));
|
||||
|
||||
List<String> groupIds = List.of("share-group-id-1");
|
||||
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future =
|
||||
service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), groupIds, BufferSupplier.NO_CACHING);
|
||||
|
||||
future.getNow(null);
|
||||
assertEquals(expectedResultCollection, future.get());
|
||||
// If there is error creating share group delete req
|
||||
// neither persister call nor general delete groups call is made.
|
||||
verify(persister, times(0)).deleteState(any());
|
||||
verify(runtime, times(0)).scheduleWriteOperation(
|
||||
ArgumentMatchers.eq("delete-groups"),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.any()
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteShareGroupCoordinatorGeneralWriteError() throws Exception {
|
||||
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
|
||||
Persister persister = mock(Persister.class);
|
||||
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
|
||||
.setConfig(createConfig())
|
||||
.setRuntime(runtime)
|
||||
.setMetrics(mock(GroupCoordinatorMetrics.class))
|
||||
.setPersister(persister)
|
||||
.build();
|
||||
service.startup(() -> 3);
|
||||
|
||||
DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection1 =
|
||||
new DeleteGroupsResponseData.DeletableGroupResultCollection();
|
||||
// share group err
|
||||
DeleteGroupsResponseData.DeletableGroupResult result1 = new DeleteGroupsResponseData.DeletableGroupResult()
|
||||
.setGroupId("share-group-id-1")
|
||||
.setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
|
||||
resultCollection1.add(result1);
|
||||
|
||||
DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection =
|
||||
new DeleteGroupsResponseData.DeletableGroupResultCollection();
|
||||
expectedResultCollection.add(
|
||||
result1.duplicate()
|
||||
);
|
||||
|
||||
Uuid shareGroupTopicId = Uuid.randomUuid();
|
||||
when(runtime.scheduleWriteOperation(
|
||||
ArgumentMatchers.eq("delete-share-groups"),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.any()
|
||||
)).thenReturn(CompletableFuture.completedFuture(
|
||||
Map.of("share-group-id-1", Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId, List.of(0, 1)), Errors.NONE))
|
||||
));
|
||||
|
||||
// share-group-id-1
|
||||
when(runtime.scheduleWriteOperation(
|
||||
ArgumentMatchers.eq("delete-groups"),
|
||||
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
|
||||
ArgumentMatchers.eq(Duration.ofMillis(5000)),
|
||||
ArgumentMatchers.any()
|
||||
)).thenReturn(CompletableFuture.failedFuture(Errors.CLUSTER_AUTHORIZATION_FAILED.exception()));
|
||||
|
||||
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(new DeleteShareGroupStateResult.Builder()
|
||||
.setTopicsData(List.of(
|
||||
new TopicData<>(
|
||||
shareGroupTopicId,
|
||||
List.of(
|
||||
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()),
|
||||
PartitionFactory.newPartitionErrorData(1, Errors.NONE.code(), Errors.NONE.message())
|
||||
))
|
||||
))
|
||||
.build()
|
||||
));
|
||||
|
||||
List<String> groupIds = List.of("share-group-id-1");
|
||||
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future =
|
||||
service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), groupIds, BufferSupplier.NO_CACHING);
|
||||
|
||||
future.getNow(null);
|
||||
assertEquals(expectedResultCollection, future.get());
|
||||
verify(persister, times(1)).deleteState(any());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("testGroupHeartbeatWithExceptionSource")
|
||||
public void testDeleteGroupsWithException(
|
||||
|
@ -1640,6 +2007,13 @@ public class GroupCoordinatorServiceTest {
|
|||
.setMetrics(mock(GroupCoordinatorMetrics.class))
|
||||
.build(true);
|
||||
|
||||
when(runtime.scheduleWriteOperation(
|
||||
ArgumentMatchers.eq("delete-share-groups"),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.any()
|
||||
)).thenReturn(CompletableFuture.completedFuture(Map.of()));
|
||||
|
||||
when(runtime.scheduleWriteOperation(
|
||||
ArgumentMatchers.eq("delete-groups"),
|
||||
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
|
||||
|
@ -2532,4 +2906,17 @@ public class GroupCoordinatorServiceTest {
|
|||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
private static DeleteShareGroupStateParameters createDeleteShareRequest(String groupId, Uuid topic, List<Integer> partitions) {
|
||||
TopicData<PartitionIdData> topicData = new TopicData<>(topic,
|
||||
partitions.stream().map(PartitionFactory::newPartitionIdData).toList()
|
||||
);
|
||||
|
||||
return new DeleteShareGroupStateParameters.Builder()
|
||||
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdData>()
|
||||
.setGroupId(groupId)
|
||||
.setTopicsData(List.of(topicData))
|
||||
.build())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,9 @@
|
|||
package org.apache.kafka.coordinator.group;
|
||||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.common.errors.GroupIdNotFoundException;
|
||||
import org.apache.kafka.common.errors.GroupNotEmptyException;
|
||||
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
|
||||
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
|
||||
import org.apache.kafka.common.message.DeleteGroupsResponseData;
|
||||
|
@ -77,19 +80,29 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignment
|
|||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
|
||||
import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
|
||||
import org.apache.kafka.image.MetadataImage;
|
||||
import org.apache.kafka.server.common.ApiMessageAndVersion;
|
||||
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
|
||||
import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
|
||||
import org.apache.kafka.server.share.persister.PartitionFactory;
|
||||
import org.apache.kafka.server.share.persister.PartitionIdData;
|
||||
import org.apache.kafka.server.share.persister.TopicData;
|
||||
import org.apache.kafka.timeline.SnapshotRegistry;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.EnumSource;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.ArgumentMatchers;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.kafka.coordinator.common.runtime.TestUtil.requestContext;
|
||||
|
@ -1744,4 +1757,106 @@ public class GroupCoordinatorShardTest {
|
|||
|
||||
verify(groupMetadataManager, times(1)).replay(key, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSharePartitionDeleteRequests() {
|
||||
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
|
||||
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
|
||||
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
|
||||
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
|
||||
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
|
||||
new LogContext(),
|
||||
groupMetadataManager,
|
||||
offsetMetadataManager,
|
||||
Time.SYSTEM,
|
||||
new MockCoordinatorTimer<>(Time.SYSTEM),
|
||||
mock(GroupCoordinatorConfig.class),
|
||||
coordinatorMetrics,
|
||||
metricsShard
|
||||
);
|
||||
|
||||
ShareGroup shareGroup = new ShareGroup(new SnapshotRegistry(mock(LogContext.class)), "share-group");
|
||||
|
||||
when(groupMetadataManager.shareGroup(eq("share-group"))).thenReturn(shareGroup);
|
||||
when(groupMetadataManager.shareGroup(eq("non-share-group"))).thenThrow(GroupIdNotFoundException.class);
|
||||
|
||||
TopicData<PartitionIdData> topicData = new TopicData<>(Uuid.randomUuid(),
|
||||
List.of(
|
||||
PartitionFactory.newPartitionIdData(0),
|
||||
PartitionFactory.newPartitionIdData(1)
|
||||
));
|
||||
|
||||
DeleteShareGroupStateParameters params = new DeleteShareGroupStateParameters.Builder()
|
||||
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdData>()
|
||||
.setGroupId("share-group")
|
||||
.setTopicsData(List.of(topicData))
|
||||
.build())
|
||||
.build();
|
||||
|
||||
when(groupMetadataManager.shareGroupBuildPartitionDeleteRequest(eq(shareGroup))).thenReturn(Optional.of(params));
|
||||
|
||||
CoordinatorResult<Map<String, Map.Entry<DeleteShareGroupStateParameters, Errors>>, CoordinatorRecord> expectedResult =
|
||||
new CoordinatorResult<>(List.of(), Map.of("share-group", Map.entry(params, Errors.NONE)));
|
||||
|
||||
|
||||
assertEquals(expectedResult, coordinator.sharePartitionDeleteRequests(List.of("share-group", "non-share-group")));
|
||||
verify(groupMetadataManager, times(1)).shareGroup(eq("share-group"));
|
||||
verify(groupMetadataManager, times(1)).shareGroup(eq("non-share-group"));
|
||||
verify(groupMetadataManager, times(1)).shareGroupBuildPartitionDeleteRequest(eq(shareGroup));
|
||||
|
||||
// empty list
|
||||
Mockito.reset(groupMetadataManager);
|
||||
expectedResult = new CoordinatorResult<>(List.of(), Map.of());
|
||||
assertEquals(
|
||||
expectedResult,
|
||||
coordinator.sharePartitionDeleteRequests(List.of())
|
||||
);
|
||||
|
||||
verify(groupMetadataManager, times(0)).group(eq("share-group"));
|
||||
verify(groupMetadataManager, times(0)).group(eq("non-share-group"));
|
||||
verify(groupMetadataManager, times(0)).shareGroupBuildPartitionDeleteRequest(eq(shareGroup));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSharePartitionDeleteRequestsNonEmptyShareGroup() {
|
||||
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
|
||||
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
|
||||
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
|
||||
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
|
||||
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
|
||||
new LogContext(),
|
||||
groupMetadataManager,
|
||||
offsetMetadataManager,
|
||||
Time.SYSTEM,
|
||||
new MockCoordinatorTimer<>(Time.SYSTEM),
|
||||
mock(GroupCoordinatorConfig.class),
|
||||
coordinatorMetrics,
|
||||
metricsShard
|
||||
);
|
||||
|
||||
ShareGroup shareGroup = mock(ShareGroup.class);
|
||||
doThrow(new GroupNotEmptyException("bad stuff")).when(shareGroup).validateDeleteGroup();
|
||||
|
||||
when(groupMetadataManager.shareGroup(eq("share-group"))).thenReturn(shareGroup);
|
||||
|
||||
CoordinatorResult<Map<String, Map.Entry<DeleteShareGroupStateParameters, Errors>>, CoordinatorRecord> expectedResult =
|
||||
new CoordinatorResult<>(List.of(), Map.of("share-group", Map.entry(DeleteShareGroupStateParameters.EMPTY_PARAMS,
|
||||
Errors.forException(new GroupNotEmptyException("bad stuff")))
|
||||
));
|
||||
assertEquals(expectedResult, coordinator.sharePartitionDeleteRequests(List.of("share-group")));
|
||||
verify(groupMetadataManager, times(1)).shareGroup(eq("share-group"));
|
||||
// Not called because of NON-EMPTY group.
|
||||
verify(groupMetadataManager, times(0)).shareGroupBuildPartitionDeleteRequest(eq(shareGroup));
|
||||
|
||||
// empty list
|
||||
Mockito.reset(groupMetadataManager);
|
||||
expectedResult = new CoordinatorResult<>(List.of(), Map.of());
|
||||
assertEquals(
|
||||
expectedResult,
|
||||
coordinator.sharePartitionDeleteRequests(List.of())
|
||||
);
|
||||
|
||||
verify(groupMetadataManager, times(0)).group(eq("share-group"));
|
||||
verify(groupMetadataManager, times(0)).shareGroupBuildPartitionDeleteRequest(eq(shareGroup));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -85,6 +85,7 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
|
|||
import org.apache.kafka.coordinator.group.modern.Assignment;
|
||||
import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
|
||||
import org.apache.kafka.coordinator.group.modern.MemberState;
|
||||
import org.apache.kafka.coordinator.group.modern.SubscriptionCount;
|
||||
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
|
||||
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
|
||||
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupBuilder;
|
||||
|
@ -104,6 +105,11 @@ import org.apache.kafka.coordinator.group.streams.TasksTuple;
|
|||
import org.apache.kafka.image.MetadataDelta;
|
||||
import org.apache.kafka.image.MetadataImage;
|
||||
import org.apache.kafka.image.MetadataProvenance;
|
||||
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
|
||||
import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
|
||||
import org.apache.kafka.server.share.persister.PartitionFactory;
|
||||
import org.apache.kafka.server.share.persister.PartitionIdData;
|
||||
import org.apache.kafka.server.share.persister.TopicData;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
|
@ -115,6 +121,7 @@ import java.nio.ByteBuffer;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
@ -170,6 +177,7 @@ import static org.junit.jupiter.api.Assertions.fail;
|
|||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
@ -16859,6 +16867,50 @@ public class GroupMetadataManagerTest {
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSharePartitionDeleteRequest() {
|
||||
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
|
||||
assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap()));
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor))
|
||||
.build();
|
||||
|
||||
Uuid t1Uuid = Uuid.randomUuid();
|
||||
Uuid t2Uuid = Uuid.randomUuid();
|
||||
MetadataImage image = spy(new MetadataImageBuilder()
|
||||
.addTopic(t1Uuid, "t1", 2)
|
||||
.addTopic(t2Uuid, "t2", 2)
|
||||
.build());
|
||||
|
||||
context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class));
|
||||
|
||||
ShareGroup shareGroup = mock(ShareGroup.class);
|
||||
Map<String, SubscriptionCount> topicMap = new LinkedHashMap<>();
|
||||
topicMap.put("t1", mock(SubscriptionCount.class));
|
||||
topicMap.put("t2", mock(SubscriptionCount.class));
|
||||
when(shareGroup.subscribedTopicNames()).thenReturn(topicMap);
|
||||
when(shareGroup.groupId()).thenReturn("share-group");
|
||||
when(shareGroup.isEmpty()).thenReturn(false);
|
||||
|
||||
DeleteShareGroupStateParameters expectedParameters = new DeleteShareGroupStateParameters.Builder()
|
||||
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdData>()
|
||||
.setGroupId("share-group")
|
||||
.setTopicsData(List.of(
|
||||
new TopicData<>(t1Uuid, List.of(PartitionFactory.newPartitionIdData(0), PartitionFactory.newPartitionIdData(1))),
|
||||
new TopicData<>(t2Uuid, List.of(PartitionFactory.newPartitionIdData(0), PartitionFactory.newPartitionIdData(1)))
|
||||
))
|
||||
.build()
|
||||
)
|
||||
.build();
|
||||
Optional<DeleteShareGroupStateParameters> params = context.groupMetadataManager.shareGroupBuildPartitionDeleteRequest(shareGroup);
|
||||
assertTrue(params.isPresent());
|
||||
assertEquals(expectedParameters.groupTopicPartitionData(), params.get().groupTopicPartitionData());
|
||||
|
||||
verify(image, times(1)).topics();
|
||||
verify(shareGroup, times(1)).subscribedTopicNames();
|
||||
verify(shareGroup, times(1)).groupId();
|
||||
}
|
||||
|
||||
private static void checkJoinGroupResponse(
|
||||
JoinGroupResponseData expectedResponse,
|
||||
JoinGroupResponseData actualResponse,
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.kafka.server.share.persister;
|
|||
|
||||
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
|
@ -27,6 +28,12 @@ import java.util.stream.Collectors;
|
|||
public class DeleteShareGroupStateParameters implements PersisterParameters {
|
||||
private final GroupTopicPartitionData<PartitionIdData> groupTopicPartitionData;
|
||||
|
||||
public static final DeleteShareGroupStateParameters EMPTY_PARAMS = new DeleteShareGroupStateParameters(new GroupTopicPartitionData.Builder<PartitionIdData>()
|
||||
.setGroupId("")
|
||||
.setTopicsData(List.of())
|
||||
.build()
|
||||
);
|
||||
|
||||
private DeleteShareGroupStateParameters(GroupTopicPartitionData<PartitionIdData> groupTopicPartitionData) {
|
||||
this.groupTopicPartitionData = groupTopicPartitionData;
|
||||
}
|
||||
|
|
|
@ -28,6 +28,10 @@ import java.util.stream.Collectors;
|
|||
public class DeleteShareGroupStateResult implements PersisterResult {
|
||||
private final List<TopicData<PartitionErrorData>> topicsData;
|
||||
|
||||
public static final DeleteShareGroupStateResult EMPTY_RESULT = new Builder()
|
||||
.setTopicsData(List.of())
|
||||
.build();
|
||||
|
||||
private DeleteShareGroupStateResult(List<TopicData<PartitionErrorData>> topicsData) {
|
||||
this.topicsData = topicsData;
|
||||
}
|
||||
|
@ -38,12 +42,12 @@ public class DeleteShareGroupStateResult implements PersisterResult {
|
|||
|
||||
public static DeleteShareGroupStateResult from(DeleteShareGroupStateResponseData data) {
|
||||
return new Builder()
|
||||
.setTopicsData(data.results().stream()
|
||||
.map(deleteStateResult -> new TopicData<>(deleteStateResult.topicId(), deleteStateResult.partitions().stream()
|
||||
.map(partitionResult -> PartitionFactory.newPartitionErrorData(partitionResult.partition(), partitionResult.errorCode(), partitionResult.errorMessage()))
|
||||
.collect(Collectors.toList())))
|
||||
.collect(Collectors.toList()))
|
||||
.build();
|
||||
.setTopicsData(data.results().stream()
|
||||
.map(deleteStateResult -> new TopicData<>(deleteStateResult.topicId(), deleteStateResult.partitions().stream()
|
||||
.map(partitionResult -> PartitionFactory.newPartitionErrorData(partitionResult.partition(), partitionResult.errorCode(), partitionResult.errorMessage()))
|
||||
.collect(Collectors.toList())))
|
||||
.collect(Collectors.toList()))
|
||||
.build();
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
|
|
Loading…
Reference in New Issue