diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index c00502af48f..757bba87263 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -28,7 +28,6 @@ import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.message.DeleteGroupsResponseData; import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData; import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData; -import org.apache.kafka.common.message.DeleteShareGroupStateRequestData; import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData; import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData; @@ -1258,45 +1257,40 @@ public class GroupCoordinatorService implements GroupCoordinator { }); } - private void populateDeleteShareGroupOffsetsFuture( - DeleteShareGroupOffsetsRequestData requestData, - CompletableFuture future, - Map requestTopicIdToNameMapping, - List deleteShareGroupStateRequestTopicsData, - List deleteShareGroupOffsetsResponseTopicList - + private CompletableFuture persistDeleteShareGroupOffsets( + DeleteShareGroupStateParameters deleteStateRequestParameters, + List errorTopicResponseList ) { - DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = new DeleteShareGroupStateRequestData() - .setGroupId(requestData.groupId()) - .setTopics(deleteShareGroupStateRequestTopicsData); - - persister.deleteState(DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)) - .whenComplete((result, error) -> { - if (error != null) { - log.error("Failed to delete share group state"); - future.completeExceptionally(error); - return; - } + return persister.deleteState(deleteStateRequestParameters) + .thenCompose(result -> { if (result == null || result.topicsData() == null) { log.error("Result is null for the delete share group state"); - future.completeExceptionally(new IllegalStateException("Result is null for the delete share group state")); - return; + Exception exception = new IllegalStateException("Result is null for the delete share group state"); + return CompletableFuture.completedFuture( + DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(exception)) + ); } result.topicsData().forEach(topicData -> - deleteShareGroupOffsetsResponseTopicList.add(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic() - .setTopicId(topicData.topicId()) - .setTopicName(requestTopicIdToNameMapping.get(topicData.topicId())) - .setPartitions(topicData.partitions().stream().map( - partitionData -> new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition() - .setPartitionIndex(partitionData.partition()) - .setErrorMessage(partitionData.errorCode() == Errors.NONE.code() ? null : Errors.forCode(partitionData.errorCode()).message()) - .setErrorCode(partitionData.errorCode()) - ).toList()) - )); + errorTopicResponseList.add( + new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic() + .setTopicId(topicData.topicId()) + .setTopicName(metadataImage.topics().topicIdToNameView().get(topicData.topicId())) + .setPartitions(topicData.partitions().stream().map( + partitionData -> new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition() + .setPartitionIndex(partitionData.partition()) + .setErrorMessage(partitionData.errorCode() == Errors.NONE.code() ? null : Errors.forCode(partitionData.errorCode()).message()) + .setErrorCode(partitionData.errorCode()) + ).toList()) + ) + ); - future.complete( + return CompletableFuture.completedFuture( new DeleteShareGroupOffsetsResponseData() - .setResponses(deleteShareGroupOffsetsResponseTopicList)); + .setResponses(errorTopicResponseList) + ); + }).exceptionally(throwable -> { + log.error("Failed to delete share group state"); + return DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(throwable)); }); } @@ -1590,83 +1584,53 @@ public class GroupCoordinatorService implements GroupCoordinator { DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.INVALID_GROUP_ID)); } - Map requestTopicIdToNameMapping = new HashMap<>(); - List deleteShareGroupStateRequestTopicsData = new ArrayList<>(requestData.topics().size()); - List deleteShareGroupOffsetsResponseTopicList = new ArrayList<>(requestData.topics().size()); - - requestData.topics().forEach(topic -> { - Uuid topicId = metadataImage.topics().topicNameToIdView().get(topic.topicName()); - if (topicId != null) { - requestTopicIdToNameMapping.put(topicId, topic.topicName()); - deleteShareGroupStateRequestTopicsData.add(new DeleteShareGroupStateRequestData.DeleteStateData() - .setTopicId(topicId) - .setPartitions( - topic.partitions().stream().map( - partitionIndex -> new DeleteShareGroupStateRequestData.PartitionData().setPartition(partitionIndex) - ).toList() - )); - } else { - deleteShareGroupOffsetsResponseTopicList.add(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic() - .setTopicName(topic.topicName()) - .setPartitions(topic.partitions().stream().map( - partition -> new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition() - .setPartitionIndex(partition) - .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) - .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()) - ).toList())); - } - }); - - // If the request for the persister is empty, just complete the operation right away. - if (deleteShareGroupStateRequestTopicsData.isEmpty()) { + if (requestData.topics() == null || requestData.topics().isEmpty()) { return CompletableFuture.completedFuture( new DeleteShareGroupOffsetsResponseData() - .setResponses(deleteShareGroupOffsetsResponseTopicList)); + ); } - CompletableFuture future = new CompletableFuture<>(); + return runtime.scheduleReadOperation( + "share-group-delete-offsets-request", + topicPartitionFor(groupId), + (coordinator, lastCommittedOffset) -> coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData) + ) + .thenCompose(resultHolder -> { + if (resultHolder == null) { + log.error("Failed to retrieve deleteState request parameters from group coordinator for the group {}", groupId); + return CompletableFuture.completedFuture( + DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.UNKNOWN_SERVER_ERROR) + ); + } - TopicPartition topicPartition = topicPartitionFor(groupId); + if (resultHolder.topLevelErrorCode() != Errors.NONE.code()) { + return CompletableFuture.completedFuture( + DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData( + resultHolder.topLevelErrorCode(), + resultHolder.topLevelErrorMessage() + ) + ); + } - // This is done to make sure the provided group is empty. Offsets can be deleted only for an empty share group. - CompletableFuture> describeGroupFuture = - runtime.scheduleReadOperation( - "share-group-describe", - topicPartition, - (coordinator, lastCommittedOffset) -> coordinator.shareGroupDescribe(List.of(groupId), lastCommittedOffset) - ).exceptionally(exception -> handleOperationException( - "share-group-describe", - List.of(groupId), - exception, - (error, __) -> ShareGroupDescribeRequest.getErrorDescribedGroupList(List.of(groupId), error), - log - )); + List errorTopicResponseList = + resultHolder.errorTopicResponseList() == null ? new ArrayList<>() : new ArrayList<>(resultHolder.errorTopicResponseList()); - describeGroupFuture.whenComplete((groups, throwable) -> { - if (throwable != null) { - log.error("Failed to describe the share group {}", groupId, throwable); - future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(throwable))); - } else if (groups == null || groups.isEmpty()) { - log.error("Describe share group resulted in empty response for group {}", groupId); - future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.GROUP_ID_NOT_FOUND)); - } else if (groups.get(0).errorCode() != Errors.NONE.code()) { - log.error("Failed to describe the share group {}", groupId); - future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(groups.get(0).errorCode(), groups.get(0).errorMessage())); - } else if (groups.get(0).members() != null && !groups.get(0).members().isEmpty()) { - log.error("Provided group {} is not empty", groupId); - future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.NON_EMPTY_GROUP)); - } else { - populateDeleteShareGroupOffsetsFuture( - requestData, - future, - requestTopicIdToNameMapping, - deleteShareGroupStateRequestTopicsData, - deleteShareGroupOffsetsResponseTopicList + if (resultHolder.deleteStateRequestParameters() == null) { + return CompletableFuture.completedFuture( + new DeleteShareGroupOffsetsResponseData() + .setResponses(errorTopicResponseList) + ); + } + + return persistDeleteShareGroupOffsets( + resultHolder.deleteStateRequestParameters(), + errorTopicResponseList ); - } - }); - - return future; + }) + .exceptionally(throwable -> { + log.error("Failed to retrieve deleteState request parameters from group coordinator for the group {}", groupId, throwable); + return DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(throwable)); + }); } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 812641c9459..7f9fd576fd5 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -26,6 +26,9 @@ import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.message.DeleteGroupsResponseData; +import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData; +import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData; +import org.apache.kafka.common.message.DeleteShareGroupStateRequestData; import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.HeartbeatRequestData; import org.apache.kafka.common.message.HeartbeatResponseData; @@ -131,6 +134,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -290,6 +294,69 @@ public class GroupCoordinatorShard implements CoordinatorShard errorTopicResponseList; + private final DeleteShareGroupStateParameters deleteStateRequestParameters; + + DeleteShareGroupOffsetsResultHolder(short topLevelErrorCode, String topLevelErrorMessage) { + this(topLevelErrorCode, topLevelErrorMessage, null, null); + } + + DeleteShareGroupOffsetsResultHolder( + short topLevelErrorCode, + String topLevelErrorMessage, + List errorTopicResponseList + ) { + this(topLevelErrorCode, topLevelErrorMessage, errorTopicResponseList, null); + } + + DeleteShareGroupOffsetsResultHolder( + short topLevelErrorCode, + String topLevelErrorMessage, + List errorTopicResponseList, + DeleteShareGroupStateParameters deleteStateRequestParameters + ) { + this.topLevelErrorCode = topLevelErrorCode; + this.topLevelErrorMessage = topLevelErrorMessage; + this.errorTopicResponseList = errorTopicResponseList; + this.deleteStateRequestParameters = deleteStateRequestParameters; + } + + public short topLevelErrorCode() { + return this.topLevelErrorCode; + } + + public String topLevelErrorMessage() { + return this.topLevelErrorMessage; + } + + public List errorTopicResponseList() { + return this.errorTopicResponseList; + } + + public DeleteShareGroupStateParameters deleteStateRequestParameters() { + return this.deleteStateRequestParameters; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DeleteShareGroupOffsetsResultHolder other = (DeleteShareGroupOffsetsResultHolder) o; + return topLevelErrorCode == other.topLevelErrorCode && + Objects.equals(topLevelErrorMessage, other.topLevelErrorMessage) && + Objects.equals(errorTopicResponseList, other.errorTopicResponseList) && + Objects.equals(deleteStateRequestParameters, other.deleteStateRequestParameters); + } + + @Override + public int hashCode() { + return Objects.hash(topLevelErrorCode, topLevelErrorMessage, errorTopicResponseList, deleteStateRequestParameters); + } + } + /** * The group/offsets expiration key to schedule a timer task. * @@ -613,6 +680,57 @@ public class GroupCoordinatorShard implements CoordinatorShard(records, responseMap); } + /** + * Does the following checks to make sure that a DeleteShareGroupOffsets request is valid and can be processed further + * 1. Checks whether the provided group is empty + * 2. Checks the requested topics are presented in the metadataImage + * 3. Checks the requested share partitions are initialized for the group + * + * @param groupId - The group ID + * @param requestData - The request data for DeleteShareGroupOffsetsRequest + * @return {@link DeleteShareGroupOffsetsResultHolder} an object containing top level error code, list of topic responses + * and persister deleteState request parameters + */ + public DeleteShareGroupOffsetsResultHolder shareGroupDeleteOffsetsRequest( + String groupId, + DeleteShareGroupOffsetsRequestData requestData + ) { + try { + ShareGroup group = groupMetadataManager.shareGroup(groupId); + group.validateDeleteGroup(); + + List errorTopicResponseList = new ArrayList<>(); + List deleteShareGroupStateRequestTopicsData = + groupMetadataManager.sharePartitionsEligibleForOffsetDeletion( + groupId, + requestData, + errorTopicResponseList + ); + + if (deleteShareGroupStateRequestTopicsData.isEmpty()) { + return new DeleteShareGroupOffsetsResultHolder(Errors.NONE.code(), null, errorTopicResponseList); + } + + DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = new DeleteShareGroupStateRequestData() + .setGroupId(requestData.groupId()) + .setTopics(deleteShareGroupStateRequestTopicsData); + + return new DeleteShareGroupOffsetsResultHolder( + Errors.NONE.code(), + null, + errorTopicResponseList, + DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData) + ); + + } catch (GroupIdNotFoundException exception) { + log.error("groupId {} not found", groupId, exception); + return new DeleteShareGroupOffsetsResultHolder(Errors.GROUP_ID_NOT_FOUND.code(), exception.getMessage()); + } catch (GroupNotEmptyException exception) { + log.error("Provided group {} is not empty", groupId); + return new DeleteShareGroupOffsetsResultHolder(Errors.NON_EMPTY_GROUP.code(), exception.getMessage()); + } + } + /** * Fetch offsets for a given set of partitions and a given group. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index f8a114d999f..2e2b02f9432 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -41,6 +41,9 @@ import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.message.ConsumerProtocolSubscription; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData; +import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData; +import org.apache.kafka.common.message.DeleteShareGroupStateRequestData; import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.HeartbeatRequestData; import org.apache.kafka.common.message.HeartbeatResponseData; @@ -8195,6 +8198,52 @@ public class GroupMetadataManager { ); } + /** + * Returns a list of delete share group state request topic objects to be used with the persister. + * @param groupId - group ID of the share group + * @param requestData - the request data for DeleteShareGroupOffsets request + * @param errorTopicResponseList - the list of topics not found in the metadata image + * @return List of objects representing the share group state delete request for topics. + */ + public List sharePartitionsEligibleForOffsetDeletion( + String groupId, + DeleteShareGroupOffsetsRequestData requestData, + List errorTopicResponseList + ) { + List deleteShareGroupStateRequestTopicsData = new ArrayList<>(); + + Map> initializedSharePartitions = initializedShareGroupPartitions(groupId); + requestData.topics().forEach(topic -> { + Uuid topicId = metadataImage.topics().topicNameToIdView().get(topic.topicName()); + if (topicId != null) { + // A deleteState request to persister should only be sent with those topic partitions for which corresponding + // share partitions are initialized for the group. + if (initializedSharePartitions.containsKey(topicId)) { + List partitions = new ArrayList<>(); + topic.partitions().forEach(partition -> { + if (initializedSharePartitions.get(topicId).contains(partition)) { + partitions.add(new DeleteShareGroupStateRequestData.PartitionData().setPartition(partition)); + } + }); + deleteShareGroupStateRequestTopicsData.add(new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(topicId) + .setPartitions(partitions)); + } + } else { + errorTopicResponseList.add(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic() + .setTopicName(topic.topicName()) + .setPartitions(topic.partitions().stream().map( + partition -> new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()) + ).collect(Collectors.toCollection(ArrayList::new)))); + } + }); + + return deleteShareGroupStateRequestTopicsData; + } + /** * Validates the DeleteGroups request. * diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 6b117a0ec45..01e74ea558a 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -121,7 +121,6 @@ import org.mockito.ArgumentMatchers; import java.net.InetAddress; import java.time.Duration; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -3298,9 +3297,11 @@ public class GroupCoordinatorServiceTest { .build(true); service.startup(() -> 1); + String groupId = "share-group-id"; + int partition = 1; DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() - .setGroupId("share-group-id") + .setGroupId(groupId) .setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() .setTopicName(TOPIC_NAME) .setPartitions(List.of(partition)) @@ -3317,14 +3318,30 @@ public class GroupCoordinatorServiceTest { .setErrorMessage(null)))) ); - ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup() - .setGroupId("share-group-id-1"); + GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder = + new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder( + Errors.NONE.code(), + null, + List.of(), + DeleteShareGroupStateParameters.from( + new DeleteShareGroupStateRequestData() + .setGroupId(groupId) + .setTopics(List.of( + new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(TOPIC_ID) + .setPartitions(List.of( + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(partition) + )) + )) + ) + ); when(runtime.scheduleReadOperation( - ArgumentMatchers.eq("share-group-describe"), + ArgumentMatchers.eq("share-group-delete-offsets-request"), ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.any() - )).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup))); + )).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder)); CompletableFuture future = service.deleteShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); @@ -3343,16 +3360,18 @@ public class GroupCoordinatorServiceTest { .build(true); service.startup(() -> 1); + String groupId = "share-group-id"; + int partition = 1; DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() - .setGroupId("share-group-id") + .setGroupId(groupId) .setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() .setTopicName(TOPIC_NAME) .setPartitions(List.of(partition)) )); DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = new DeleteShareGroupStateRequestData() - .setGroupId("share-group-id") + .setGroupId(groupId) .setTopics(List.of(new DeleteShareGroupStateRequestData.DeleteStateData() .setTopicId(TOPIC_ID) .setPartitions(List.of(new DeleteShareGroupStateRequestData.PartitionData() @@ -3380,14 +3399,19 @@ public class GroupCoordinatorServiceTest { ) ); - ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup() - .setGroupId("share-group-id-1"); + GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder = + new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder( + Errors.NONE.code(), + null, + List.of(), + DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData) + ); when(runtime.scheduleReadOperation( - ArgumentMatchers.eq("share-group-describe"), + ArgumentMatchers.eq("share-group-delete-offsets-request"), ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.any() - )).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup))); + )).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder)); DeleteShareGroupStateParameters deleteShareGroupStateParameters = DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData); DeleteShareGroupStateResult deleteShareGroupStateResult = DeleteShareGroupStateResult.from(deleteShareGroupStateResponseData); @@ -3401,152 +3425,6 @@ public class GroupCoordinatorServiceTest { assertEquals(responseData, future.get()); } - @Test - public void testDeleteShareGroupOffsetsNonexistentTopicWithDefaultPersister() throws InterruptedException, ExecutionException { - CoordinatorRuntime runtime = mockRuntime(); - Persister persister = mock(DefaultStatePersister.class); - GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() - .setConfig(createConfig()) - .setRuntime(runtime) - .setPersister(persister) - .build(true); - service.startup(() -> 1); - - int partition = 1; - DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() - .setGroupId("share-group-id") - .setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() - .setTopicName("badtopic") - .setPartitions(List.of(partition)) - )); - - DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData() - .setResponses( - List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic() - .setTopicName("badtopic") - .setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition() - .setPartitionIndex(partition) - .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) - .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())))) - ); - - CompletableFuture future = - service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData); - - assertEquals(responseData, future.get()); - } - - @Test - public void testDeleteShareGroupOffsetsWithDefaultPersisterThrowsError() { - CoordinatorRuntime runtime = mockRuntime(); - Persister persister = mock(DefaultStatePersister.class); - GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() - .setConfig(createConfig()) - .setRuntime(runtime) - .setPersister(persister) - .build(true); - service.startup(() -> 1); - - int partition = 1; - DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() - .setGroupId("share-group-id") - .setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() - .setTopicName(TOPIC_NAME) - .setPartitions(List.of(partition)) - )); - - ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup() - .setGroupId("share-group-id-1"); - - when(runtime.scheduleReadOperation( - ArgumentMatchers.eq("share-group-describe"), - ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), - ArgumentMatchers.any() - )).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup))); - - when(persister.deleteState(ArgumentMatchers.any())) - .thenReturn(CompletableFuture.failedFuture(new Exception("Unable to validate delete share group state request"))); - - CompletableFuture future = - service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData); - assertFutureThrows(Exception.class, future, "Unable to validate delete share group state request"); - } - - @Test - public void testDeleteShareGroupOffsetsWithDefaultPersisterNullResult() { - CoordinatorRuntime runtime = mockRuntime(); - Persister persister = mock(DefaultStatePersister.class); - GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() - .setConfig(createConfig()) - .setRuntime(runtime) - .setPersister(persister) - .build(true); - service.startup(() -> 1); - - int partition = 1; - DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() - .setGroupId("share-group-id") - .setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() - .setTopicName(TOPIC_NAME) - .setPartitions(List.of(partition)) - )); - - ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup() - .setGroupId("share-group-id-1"); - - when(runtime.scheduleReadOperation( - ArgumentMatchers.eq("share-group-describe"), - ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), - ArgumentMatchers.any() - )).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup))); - - when(persister.deleteState(ArgumentMatchers.any())) - .thenReturn(CompletableFuture.completedFuture(null)); - - CompletableFuture future = - service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData); - assertFutureThrows(IllegalStateException.class, future, "Result is null for the delete share group state"); - } - - @Test - public void testDeleteShareGroupOffsetsWithDefaultPersisterNullTopicData() { - CoordinatorRuntime runtime = mockRuntime(); - Persister persister = mock(DefaultStatePersister.class); - GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() - .setConfig(createConfig()) - .setRuntime(runtime) - .setPersister(persister) - .build(true); - service.startup(() -> 1); - - int partition = 1; - DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() - .setGroupId("share-group-id") - .setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() - .setTopicName(TOPIC_NAME) - .setPartitions(List.of(partition)) - )); - - ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup() - .setGroupId("share-group-id-1"); - - when(runtime.scheduleReadOperation( - ArgumentMatchers.eq("share-group-describe"), - ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), - ArgumentMatchers.any() - )).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup))); - - DeleteShareGroupStateResult deleteShareGroupStateResult = - new DeleteShareGroupStateResult.Builder().setTopicsData(null).build(); - - when(persister.deleteState(ArgumentMatchers.any())) - .thenReturn(CompletableFuture.completedFuture(deleteShareGroupStateResult)); - - CompletableFuture future = - service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData); - assertFutureThrows(IllegalStateException.class, future, "Result is null for the delete share group state"); - } - @Test public void testDeleteShareGroupOffsetsCoordinatorNotActive() throws ExecutionException, InterruptedException { CoordinatorRuntime runtime = mockRuntime(); @@ -3632,7 +3510,7 @@ public class GroupCoordinatorServiceTest { } @Test - public void testDeleteShareGroupOffsetsDescribeThrowsError() throws InterruptedException, ExecutionException { + public void testDeleteShareGroupOffsetsEmptyRequest() throws InterruptedException, ExecutionException { CoordinatorRuntime runtime = mockRuntime(); Persister persister = mock(DefaultStatePersister.class); GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() @@ -3642,9 +3520,33 @@ public class GroupCoordinatorServiceTest { .build(true); service.startup(() -> 1); + DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() + .setGroupId("share-group-id"); + + DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData(); + + CompletableFuture future = + service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData); + + assertEquals(responseData, future.get()); + } + + @Test + public void testDeleteShareGroupOffsetsRequestThrowsError() throws InterruptedException, ExecutionException { + CoordinatorRuntime runtime = mockRuntime(); + Persister persister = mock(DefaultStatePersister.class); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setPersister(persister) + .build(true); + service.startup(() -> 1); + + String groupId = "share-group-id"; + int partition = 1; DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() - .setGroupId("share-group-id") + .setGroupId(groupId) .setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() .setTopicName(TOPIC_NAME) .setPartitions(List.of(partition)) @@ -3655,10 +3557,10 @@ public class GroupCoordinatorServiceTest { .setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message()); when(runtime.scheduleReadOperation( - ArgumentMatchers.eq("share-group-describe"), + ArgumentMatchers.eq("share-group-delete-offsets-request"), ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.any() - )).thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception())); + )).thenReturn(CompletableFuture.completedFuture(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()))); CompletableFuture future = service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData); @@ -3667,7 +3569,7 @@ public class GroupCoordinatorServiceTest { } @Test - public void testDeleteShareGroupOffsetsDescribeReturnsNull() throws InterruptedException, ExecutionException { + public void testDeleteShareGroupOffsetsRequestReturnsNull() throws InterruptedException, ExecutionException { CoordinatorRuntime runtime = mockRuntime(); Persister persister = mock(DefaultStatePersister.class); GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() @@ -3677,20 +3579,22 @@ public class GroupCoordinatorServiceTest { .build(true); service.startup(() -> 1); + String groupId = "share-group-id"; + int partition = 1; DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() - .setGroupId("share-group-id") + .setGroupId(groupId) .setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() .setTopicName(TOPIC_NAME) .setPartitions(List.of(partition)) )); DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData() - .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()) - .setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message()); + .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()) + .setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message()); when(runtime.scheduleReadOperation( - ArgumentMatchers.eq("share-group-describe"), + ArgumentMatchers.eq("share-group-delete-offsets-request"), ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.any() )).thenReturn(CompletableFuture.completedFuture(null)); @@ -3702,7 +3606,7 @@ public class GroupCoordinatorServiceTest { } @Test - public void testDeleteShareGroupOffsetsDescribeReturnsEmpty() throws InterruptedException, ExecutionException { + public void testDeleteShareGroupOffsetsRequestReturnsGroupIdNotFound() throws InterruptedException, ExecutionException { CoordinatorRuntime runtime = mockRuntime(); Persister persister = mock(DefaultStatePersister.class); GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() @@ -3712,9 +3616,11 @@ public class GroupCoordinatorServiceTest { .build(true); service.startup(() -> 1); + String groupId = "share-group-id"; + int partition = 1; DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() - .setGroupId("share-group-id") + .setGroupId(groupId) .setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() .setTopicName(TOPIC_NAME) .setPartitions(List.of(partition)) @@ -3724,11 +3630,19 @@ public class GroupCoordinatorServiceTest { .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()) .setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message()); + GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder = + new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder( + Errors.GROUP_ID_NOT_FOUND.code(), + Errors.GROUP_ID_NOT_FOUND.message(), + null, + null + ); + when(runtime.scheduleReadOperation( - ArgumentMatchers.eq("share-group-describe"), + ArgumentMatchers.eq("share-group-delete-offsets-request"), ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.any() - )).thenReturn(CompletableFuture.completedFuture(Collections.emptyList())); + )).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder)); CompletableFuture future = service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData); @@ -3737,7 +3651,7 @@ public class GroupCoordinatorServiceTest { } @Test - public void testDeleteShareGroupOffsetsDescribeReturnsError() throws InterruptedException, ExecutionException { + public void testDeleteShareGroupOffsetsRequestReturnsGroupNotEmpty() throws InterruptedException, ExecutionException { CoordinatorRuntime runtime = mockRuntime(); Persister persister = mock(DefaultStatePersister.class); GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() @@ -3747,48 +3661,11 @@ public class GroupCoordinatorServiceTest { .build(true); service.startup(() -> 1); - int partition = 1; - DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() - .setGroupId("share-group-id") - .setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() - .setTopicName(TOPIC_NAME) - .setPartitions(List.of(partition)) - )); - - DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData() - .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()) - .setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message()); - - ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup() - .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()) - .setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message()); - - when(runtime.scheduleReadOperation( - ArgumentMatchers.eq("share-group-describe"), - ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), - ArgumentMatchers.any() - )).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup))); - - CompletableFuture future = - service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData); - - assertEquals(responseData, future.get()); - } - - @Test - public void testDeleteShareGroupOffsetsGroupIsNotEmpty() throws InterruptedException, ExecutionException { - CoordinatorRuntime runtime = mockRuntime(); - Persister persister = mock(DefaultStatePersister.class); - GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() - .setConfig(createConfig()) - .setRuntime(runtime) - .setPersister(persister) - .build(true); - service.startup(() -> 1); + String groupId = "share-group-id"; int partition = 1; DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() - .setGroupId("share-group-id") + .setGroupId(groupId) .setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() .setTopicName(TOPIC_NAME) .setPartitions(List.of(partition)) @@ -3798,15 +3675,415 @@ public class GroupCoordinatorServiceTest { .setErrorCode(Errors.NON_EMPTY_GROUP.code()) .setErrorMessage(Errors.NON_EMPTY_GROUP.message()); - ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup() - .setGroupId("share-group-id-1") - .setMembers(List.of(new ShareGroupDescribeResponseData.Member())); + GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder = + new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder( + Errors.NON_EMPTY_GROUP.code(), + Errors.NON_EMPTY_GROUP.message(), + null, + null + ); when(runtime.scheduleReadOperation( - ArgumentMatchers.eq("share-group-describe"), + ArgumentMatchers.eq("share-group-delete-offsets-request"), ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.any() - )).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup))); + )).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder)); + + CompletableFuture future = + service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData); + + assertEquals(responseData, future.get()); + } + + @Test + public void testDeleteShareGroupOffsetsRequestReturnsNullParameters() throws InterruptedException, ExecutionException { + CoordinatorRuntime runtime = mockRuntime(); + Persister persister = mock(DefaultStatePersister.class); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setPersister(persister) + .build(true); + service.startup(() -> 1); + + String groupId = "share-group-id"; + + int partition = 1; + DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(partition)) + )); + + DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData(); + + GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder = + new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder( + Errors.NONE.code(), + null, + null, + null + ); + + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("share-group-delete-offsets-request"), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder)); + + CompletableFuture future = + service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData); + + assertEquals(responseData, future.get()); + } + + @Test + public void testDeleteShareGroupOffsetsRequestReturnsNullParametersWithErrorTopics() throws InterruptedException, ExecutionException { + CoordinatorRuntime runtime = mockRuntime(); + Persister persister = mock(DefaultStatePersister.class); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setPersister(persister) + .build(true); + service.startup(() -> 1); + + String badTopicName = "bad-topic"; + Uuid badTopicId = Uuid.randomUuid(); + String groupId = "share-group-id"; + + int partition = 1; + DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(List.of( + new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(partition)), + new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName(badTopicName) + .setPartitions(List.of(partition)) + )); + + DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData() + .setResponses(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic() + .setTopicName(badTopicName) + .setTopicId(badTopicId) + .setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()) + )))); + + GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder = + new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder( + Errors.NONE.code(), + null, + List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic() + .setTopicName(badTopicName) + .setTopicId(badTopicId) + .setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()) + ))), + null + ); + + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("share-group-delete-offsets-request"), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder)); + + CompletableFuture future = + service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData); + + assertEquals(responseData, future.get()); + } + + @Test + public void testDeleteShareGroupOffsetsWithDefaultPersisterThrowsError() throws InterruptedException, ExecutionException { + CoordinatorRuntime runtime = mockRuntime(); + Persister persister = mock(DefaultStatePersister.class); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setPersister(persister) + .build(true); + service.startup(() -> 1); + + String groupId = "share-group-id"; + + int partition = 1; + DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(partition)) + )); + + Exception persisterException = new Exception("Unable to validate delete share group state request"); + + DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData() + .setErrorCode(Errors.forException(persisterException).code()) + .setErrorMessage(Errors.forException(persisterException).message()); + + GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder = + new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder( + Errors.NONE.code(), + null, + List.of(), + DeleteShareGroupStateParameters.from( + new DeleteShareGroupStateRequestData() + .setGroupId(groupId) + .setTopics(List.of( + new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(TOPIC_ID) + .setPartitions(List.of( + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(partition) + )) + )) + ) + ); + + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("share-group-delete-offsets-request"), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder)); + + when(persister.deleteState(ArgumentMatchers.any())) + .thenReturn(CompletableFuture.failedFuture(persisterException)); + + CompletableFuture future = + service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData); + + assertEquals(responseData, future.get()); + } + + @Test + public void testDeleteShareGroupOffsetsWithDefaultPersisterNullResult() throws InterruptedException, ExecutionException { + CoordinatorRuntime runtime = mockRuntime(); + Persister persister = mock(DefaultStatePersister.class); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setPersister(persister) + .build(true); + service.startup(() -> 1); + + String groupId = "share-group-id"; + + int partition = 1; + DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(partition)) + )); + + Exception persisterException = new IllegalStateException("Result is null for the delete share group state"); + + DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData() + .setErrorCode(Errors.forException(persisterException).code()) + .setErrorMessage(Errors.forException(persisterException).message()); + + GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder = + new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder( + Errors.NONE.code(), + null, + List.of(), + DeleteShareGroupStateParameters.from( + new DeleteShareGroupStateRequestData() + .setGroupId(groupId) + .setTopics(List.of( + new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(TOPIC_ID) + .setPartitions(List.of( + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(partition) + )) + )) + ) + ); + + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("share-group-delete-offsets-request"), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder)); + + when(persister.deleteState(ArgumentMatchers.any())) + .thenReturn(CompletableFuture.completedFuture(null)); + + CompletableFuture future = + service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData); + + assertEquals(responseData, future.get()); + } + + @Test + public void testDeleteShareGroupOffsetsWithDefaultPersisterNullTopicData() throws InterruptedException, ExecutionException { + CoordinatorRuntime runtime = mockRuntime(); + Persister persister = mock(DefaultStatePersister.class); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setPersister(persister) + .build(true); + service.startup(() -> 1); + + String groupId = "share-group-id"; + + int partition = 1; + DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(partition)) + )); + + Exception persisterException = new IllegalStateException("Result is null for the delete share group state"); + + DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData() + .setErrorCode(Errors.forException(persisterException).code()) + .setErrorMessage(Errors.forException(persisterException).message()); + + GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder = + new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder( + Errors.NONE.code(), + null, + List.of(), + DeleteShareGroupStateParameters.from( + new DeleteShareGroupStateRequestData() + .setGroupId(groupId) + .setTopics(List.of( + new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(TOPIC_ID) + .setPartitions(List.of( + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(partition) + )) + )) + ) + ); + + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("share-group-delete-offsets-request"), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder)); + + DeleteShareGroupStateResult deleteShareGroupStateResult = + new DeleteShareGroupStateResult.Builder().setTopicsData(null).build(); + + when(persister.deleteState(ArgumentMatchers.any())) + .thenReturn(CompletableFuture.completedFuture(deleteShareGroupStateResult)); + + CompletableFuture future = + service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData); + + assertEquals(responseData, future.get()); + } + + @Test + public void testDeleteShareGroupOffsetsSuccessWithErrorTopicPartitions() throws InterruptedException, ExecutionException { + CoordinatorRuntime runtime = mockRuntime(); + Persister persister = mock(DefaultStatePersister.class); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setPersister(persister) + .build(true); + service.startup(() -> 1); + + String badTopicName = "bad-topic"; + Uuid badTopicId = Uuid.randomUuid(); + String groupId = "share-group-id"; + + int partition = 1; + + DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(List.of( + new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(partition)), + new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName(badTopicName) + .setPartitions(List.of(partition)) + )); + + DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = new DeleteShareGroupStateRequestData() + .setGroupId(groupId) + .setTopics(List.of(new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(partition))))); + + DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData() + .setResponses( + List.of( + new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic() + .setTopicName(badTopicName) + .setTopicId(badTopicId) + .setPartitions(List.of( + new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()) + )), + new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic() + .setTopicName(TOPIC_NAME) + .setTopicId(TOPIC_ID) + .setPartitions(List.of( + new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage(null) + )) + ) + ); + + DeleteShareGroupStateResponseData deleteShareGroupStateResponseData = new DeleteShareGroupStateResponseData() + .setResults( + List.of(new DeleteShareGroupStateResponseData.DeleteStateResult() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult() + .setPartition(partition) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage(null))) + ) + ); + + GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder = + new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder( + Errors.NONE.code(), + null, + List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic() + .setTopicName(badTopicName) + .setTopicId(badTopicId) + .setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()) + ))), + DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData) + ); + + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("share-group-delete-offsets-request"), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder)); + + DeleteShareGroupStateParameters deleteShareGroupStateParameters = DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData); + DeleteShareGroupStateResult deleteShareGroupStateResult = DeleteShareGroupStateResult.from(deleteShareGroupStateResponseData); + when(persister.deleteState( + ArgumentMatchers.eq(deleteShareGroupStateParameters) + )).thenReturn(CompletableFuture.completedFuture(deleteShareGroupStateResult)); CompletableFuture future = service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java index 16665f98bf2..25af1ca34a6 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java @@ -23,6 +23,9 @@ import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.message.DeleteGroupsResponseData; +import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData; +import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData; +import org.apache.kafka.common.message.DeleteShareGroupStateRequestData; import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.OffsetCommitResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; @@ -124,6 +127,7 @@ import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -1949,4 +1953,302 @@ public class GroupCoordinatorShardTest { verify(groupMetadataManager, times(0)).group(eq("share-group")); verify(groupMetadataManager, times(0)).shareGroupBuildPartitionDeleteRequest(eq(groupId), anyList()); } + + @Test + public void testShareGroupDeleteOffsetsRequestGroupNotFound() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + new LogContext(), + groupMetadataManager, + offsetMetadataManager, + Time.SYSTEM, + new MockCoordinatorTimer<>(Time.SYSTEM), + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard + ); + + String groupId = "share-group"; + DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName("topic-1") + .setPartitions(List.of(0)) + )); + + GroupIdNotFoundException exception = new GroupIdNotFoundException("group Id not found"); + + doThrow(exception).when(groupMetadataManager).shareGroup(eq(groupId)); + + GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder expectedResult = + new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(Errors.forException(exception).code(), exception.getMessage()); + + assertEquals(expectedResult, coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData)); + verify(groupMetadataManager, times(1)).shareGroup(eq(groupId)); + // Not called because of Group not found. + verify(groupMetadataManager, times(0)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any()); + } + + @Test + public void testShareGroupDeleteOffsetsRequestNonEmptyShareGroup() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + new LogContext(), + groupMetadataManager, + offsetMetadataManager, + Time.SYSTEM, + new MockCoordinatorTimer<>(Time.SYSTEM), + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard + ); + + String groupId = "share-group"; + DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName("topic-1") + .setPartitions(List.of(0)) + )); + + ShareGroup shareGroup = mock(ShareGroup.class); + GroupNotEmptyException exception = new GroupNotEmptyException("group is not empty"); + doThrow(exception).when(shareGroup).validateDeleteGroup(); + + when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup); + + GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder expectedResult = + new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(Errors.forException(exception).code(), exception.getMessage()); + + assertEquals(expectedResult, coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData)); + verify(groupMetadataManager, times(1)).shareGroup(eq(groupId)); + // Not called because of Group not found. + verify(groupMetadataManager, times(0)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any()); + } + + @Test + public void testShareGroupDeleteOffsetsRequestEmptyResult() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + new LogContext(), + groupMetadataManager, + offsetMetadataManager, + Time.SYSTEM, + new MockCoordinatorTimer<>(Time.SYSTEM), + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard + ); + + String groupId = "share-group"; + String topicName = "topic-1"; + Uuid topicId = Uuid.randomUuid(); + int partition = 0; + DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName(topicName) + .setPartitions(List.of(partition)) + )); + + ShareGroup shareGroup = mock(ShareGroup.class); + doNothing().when(shareGroup).validateDeleteGroup(); + + when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup); + + List errorTopicResponseList = List.of( + new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic() + .setTopicName(topicName) + .setTopicId(topicId) + .setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()))) + ); + + when(groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(eq(groupId), eq(requestData), any())) + .thenAnswer(invocation -> { + List inputList = invocation.getArgument(2); + inputList.addAll(errorTopicResponseList); + return List.of(); + }); + + GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder expectedResult = + new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(Errors.NONE.code(), null, errorTopicResponseList); + + assertEquals(expectedResult, coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData)); + verify(groupMetadataManager, times(1)).shareGroup(eq(groupId)); + verify(groupMetadataManager, times(1)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any()); + } + + @Test + public void testShareGroupDeleteOffsetsRequestSuccess() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + new LogContext(), + groupMetadataManager, + offsetMetadataManager, + Time.SYSTEM, + new MockCoordinatorTimer<>(Time.SYSTEM), + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard + ); + + String groupId = "share-group"; + String topicName1 = "topic-1"; + Uuid topicId1 = Uuid.randomUuid(); + String topicName2 = "topic-2"; + Uuid topicId2 = Uuid.randomUuid(); + int partition = 0; + DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(List.of( + new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName(topicName1) + .setPartitions(List.of(partition)), + new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName(topicName2) + .setPartitions(List.of(partition)) + )); + + ShareGroup shareGroup = mock(ShareGroup.class); + doNothing().when(shareGroup).validateDeleteGroup(); + + when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup); + + List deleteShareGroupStateRequestTopicsData = + List.of( + new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(topicId1) + .setPartitions(List.of( + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(partition) + )), + new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(topicId2) + .setPartitions(List.of( + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(partition) + )) + ); + + when(groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(eq(groupId), eq(requestData), any())) + .thenReturn(deleteShareGroupStateRequestTopicsData); + + + GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder expectedResult = + new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder( + Errors.NONE.code(), + null, + List.of(), + DeleteShareGroupStateParameters.from( + new DeleteShareGroupStateRequestData() + .setGroupId(requestData.groupId()) + .setTopics(deleteShareGroupStateRequestTopicsData) + )); + + assertEquals(expectedResult, coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData)); + verify(groupMetadataManager, times(1)).shareGroup(eq(groupId)); + verify(groupMetadataManager, times(1)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any()); + } + + @Test + public void testShareGroupDeleteOffsetsRequestSuccessWithErrorTopics() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + new LogContext(), + groupMetadataManager, + offsetMetadataManager, + Time.SYSTEM, + new MockCoordinatorTimer<>(Time.SYSTEM), + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard + ); + + String groupId = "share-group"; + String topicName1 = "topic-1"; + Uuid topicId1 = Uuid.randomUuid(); + String topicName2 = "topic-2"; + Uuid topicId2 = Uuid.randomUuid(); + int partition = 0; + DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(List.of( + new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName(topicName1) + .setPartitions(List.of(partition)), + new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName(topicName2) + .setPartitions(List.of(partition)) + )); + + ShareGroup shareGroup = mock(ShareGroup.class); + doNothing().when(shareGroup).validateDeleteGroup(); + + when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup); + + List deleteShareGroupStateRequestTopicsData = + List.of( + new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(topicId1) + .setPartitions(List.of( + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(partition) + )) + ); + + List errorTopicResponseList = + List.of( + new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic() + .setTopicName(topicName2) + .setTopicId(topicId2) + .setPartitions(List.of( + new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()) + )) + ); + + when(groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(eq(groupId), eq(requestData), any())) + .thenAnswer(invocation -> { + List inputList = invocation.getArgument(2); + + inputList.addAll(errorTopicResponseList); + return deleteShareGroupStateRequestTopicsData; + }); + + + GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder expectedResult = + new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder( + Errors.NONE.code(), + null, + errorTopicResponseList, + DeleteShareGroupStateParameters.from( + new DeleteShareGroupStateRequestData() + .setGroupId(requestData.groupId()) + .setTopics(deleteShareGroupStateRequestTopicsData) + )); + + assertEquals(expectedResult, coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData)); + verify(groupMetadataManager, times(1)).shareGroup(eq(groupId)); + verify(groupMetadataManager, times(1)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any()); + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 35574d35ca3..4665e81e9be 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -43,6 +43,9 @@ import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.message.ConsumerProtocolAssignment; import org.apache.kafka.common.message.ConsumerProtocolSubscription; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData; +import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData; +import org.apache.kafka.common.message.DeleteShareGroupStateRequestData; import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.HeartbeatRequestData; import org.apache.kafka.common.message.HeartbeatResponseData; @@ -20804,6 +20807,362 @@ public class GroupMetadataManagerTest { assertRecordsEquals(expectedRecords, records); } + @Test + public void testSharePartitionsEligibleForOffsetDeletionSuccess() { + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withShareGroupAssignor(assignor) + .build(); + + String groupId = "share-group"; + Uuid memberId = Uuid.randomUuid(); + String topicName1 = "topic-1"; + String topicName2 = "topic-2"; + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + + MetadataImage image = new MetadataImageBuilder() + .addTopic(topicId1, topicName1, 3) + .addTopic(topicId2, topicName2, 2) + .build(); + + context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class)); + + context.shareGroupHeartbeat( + new ShareGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId.toString()) + .setMemberEpoch(0) + .setSubscribedTopicNames(List.of(topicName1, topicName2))); + + context.groupMetadataManager.replay( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId), + new ShareGroupStatePartitionMetadataValue() + .setInitializingTopics(List.of()) + .setInitializedTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicId(topicId1) + .setTopicName(topicName1) + .setPartitions(List.of(0, 1, 2)), + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicId(topicId2) + .setTopicName(topicName2) + .setPartitions(List.of(0, 1)) + )) + .setDeletingTopics(List.of()) + ); + + List expectedResult = List.of( + new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(topicId1) + .setPartitions(List.of( + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(0), + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(1), + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(2) + )), + new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(topicId2) + .setPartitions(List.of( + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(0), + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(1) + )) + ); + + DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(List.of( + new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName(topicName1) + .setPartitions(List.of(0, 1, 2)), + new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName(topicName2) + .setPartitions(List.of(0, 1)) + )); + List errorTopicResponseList = new ArrayList<>(); + + List result = + context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId, requestData, errorTopicResponseList); + + assertTrue(errorTopicResponseList.isEmpty()); + assertEquals(expectedResult, result); + } + + @Test + public void testSharePartitionsEligibleForOffsetDeletionErrorTopics() { + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withShareGroupAssignor(assignor) + .build(); + + String groupId = "share-group"; + Uuid memberId = Uuid.randomUuid(); + String topicName1 = "topic-1"; + String topicName2 = "topic-2"; + Uuid topicId1 = Uuid.randomUuid(); + + MetadataImage image = new MetadataImageBuilder() + .addTopic(topicId1, topicName1, 3) + .build(); + + context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class)); + + context.shareGroupHeartbeat( + new ShareGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId.toString()) + .setMemberEpoch(0) + .setSubscribedTopicNames(List.of(topicName1))); + + context.groupMetadataManager.replay( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId), + new ShareGroupStatePartitionMetadataValue() + .setInitializingTopics(List.of()) + .setInitializedTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicId(topicId1) + .setTopicName(topicName1) + .setPartitions(List.of(0, 1, 2)) + )) + .setDeletingTopics(List.of()) + ); + + List expectedResult = List.of( + new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(topicId1) + .setPartitions(List.of( + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(0), + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(1), + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(2) + )) + ); + + DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(List.of( + new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName(topicName1) + .setPartitions(List.of(0, 1, 2)), + new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName(topicName2) + .setPartitions(List.of(0, 1)) + )); + List errorTopicResponseList = new ArrayList<>(); + + List result = + context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId, requestData, errorTopicResponseList); + + assertEquals( + List.of( + new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic() + .setTopicName(topicName2) + .setPartitions(List.of( + new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()), + new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()) + )) + ), + errorTopicResponseList + ); + assertEquals(expectedResult, result); + } + + @Test + public void testSharePartitionsEligibleForOffsetDeletionUninitializedTopics() { + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withShareGroupAssignor(assignor) + .build(); + + String groupId = "share-group"; + Uuid memberId = Uuid.randomUuid(); + String topicName1 = "topic-1"; + String topicName2 = "topic-2"; + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + + MetadataImage image = new MetadataImageBuilder() + .addTopic(topicId1, topicName1, 3) + .addTopic(topicId2, topicName2, 2) + .build(); + + context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class)); + + context.shareGroupHeartbeat( + new ShareGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId.toString()) + .setMemberEpoch(0) + .setSubscribedTopicNames(List.of(topicName1, topicName2))); + + context.groupMetadataManager.replay( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId), + new ShareGroupStatePartitionMetadataValue() + .setInitializedTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicId(topicId1) + .setTopicName(topicName1) + .setPartitions(List.of(0, 1, 2)) + )) + .setInitializingTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicId(topicId2) + .setTopicName(topicName2) + .setPartitions(List.of(0, 1)) + )) + .setDeletingTopics(List.of()) + ); + + List expectedResult = List.of( + new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(topicId1) + .setPartitions(List.of( + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(0), + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(1), + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(2) + )) + ); + + DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(List.of( + new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName(topicName1) + .setPartitions(List.of(0, 1, 2)), + new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName(topicName2) + .setPartitions(List.of(0, 1)) + )); + List errorTopicResponseList = new ArrayList<>(); + + List result = + context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId, requestData, errorTopicResponseList); + + assertTrue(errorTopicResponseList.isEmpty()); + assertEquals(expectedResult, result); + } + + @Test + public void testSharePartitionsEligibleForOffsetDeletionUninitializedAndErrorTopics() { + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withShareGroupAssignor(assignor) + .build(); + + String groupId = "share-group"; + Uuid memberId = Uuid.randomUuid(); + String topicName1 = "topic-1"; + String topicName2 = "topic-2"; + String topicName3 = "topic-3"; + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + + MetadataImage image = new MetadataImageBuilder() + .addTopic(topicId1, topicName1, 3) + .addTopic(topicId2, topicName2, 2) + .build(); + + context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class)); + + context.shareGroupHeartbeat( + new ShareGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId.toString()) + .setMemberEpoch(0) + .setSubscribedTopicNames(List.of(topicName1, topicName2))); + + context.groupMetadataManager.replay( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId), + new ShareGroupStatePartitionMetadataValue() + .setInitializedTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicId(topicId1) + .setTopicName(topicName1) + .setPartitions(List.of(0, 1, 2)) + )) + .setInitializingTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicId(topicId2) + .setTopicName(topicName2) + .setPartitions(List.of(0, 1)) + )) + .setDeletingTopics(List.of()) + ); + + List expectedResult = List.of( + new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(topicId1) + .setPartitions(List.of( + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(0), + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(1), + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(2) + )) + ); + + DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(List.of( + new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName(topicName1) + .setPartitions(List.of(0, 1, 2)), + new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName(topicName2) + .setPartitions(List.of(0, 1)), + new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName(topicName3) + .setPartitions(List.of(0, 1)) + )); + List errorTopicResponseList = new ArrayList<>(); + + List result = + context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId, requestData, errorTopicResponseList); + + assertEquals( + List.of( + new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic() + .setTopicName(topicName3) + .setPartitions(List.of( + new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()), + new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()) + )) + ), + errorTopicResponseList + ); + assertEquals(expectedResult, result); + } + @Test public void testShareGroupHeartbeatInitializeOnPartitionUpdate() { MockPartitionAssignor assignor = new MockPartitionAssignor("range");