KAFKA-19115: Utilize initialized topics info to verify delete share group offsets (#19431)

Currently, in the deleteShareGroupOffsets method in
GroupCoordinatorService, the user request was simply forwarded to the
persister without checking if the requested share partitions were
initialized for the group or not. This PR introduces such a check to
make sure that the persister deleteState request is only called for
share partitions that have been initialized for the group.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Sushant Mahajan <smahajan@confluent.io>
This commit is contained in:
Chirag Wadhwa 2025-04-14 16:40:40 +05:30 committed by GitHub
parent fc25436440
commit 270948bf9d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 1387 additions and 318 deletions

View File

@ -28,7 +28,6 @@ import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
@ -1258,45 +1257,40 @@ public class GroupCoordinatorService implements GroupCoordinator {
});
}
private void populateDeleteShareGroupOffsetsFuture(
DeleteShareGroupOffsetsRequestData requestData,
CompletableFuture<DeleteShareGroupOffsetsResponseData> future,
Map<Uuid, String> requestTopicIdToNameMapping,
List<DeleteShareGroupStateRequestData.DeleteStateData> deleteShareGroupStateRequestTopicsData,
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> deleteShareGroupOffsetsResponseTopicList
private CompletableFuture<DeleteShareGroupOffsetsResponseData> persistDeleteShareGroupOffsets(
DeleteShareGroupStateParameters deleteStateRequestParameters,
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList
) {
DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = new DeleteShareGroupStateRequestData()
.setGroupId(requestData.groupId())
.setTopics(deleteShareGroupStateRequestTopicsData);
persister.deleteState(DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData))
.whenComplete((result, error) -> {
if (error != null) {
log.error("Failed to delete share group state");
future.completeExceptionally(error);
return;
}
return persister.deleteState(deleteStateRequestParameters)
.thenCompose(result -> {
if (result == null || result.topicsData() == null) {
log.error("Result is null for the delete share group state");
future.completeExceptionally(new IllegalStateException("Result is null for the delete share group state"));
return;
Exception exception = new IllegalStateException("Result is null for the delete share group state");
return CompletableFuture.completedFuture(
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(exception))
);
}
result.topicsData().forEach(topicData ->
deleteShareGroupOffsetsResponseTopicList.add(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicId(topicData.topicId())
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
.setPartitions(topicData.partitions().stream().map(
partitionData -> new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partitionData.partition())
.setErrorMessage(partitionData.errorCode() == Errors.NONE.code() ? null : Errors.forCode(partitionData.errorCode()).message())
.setErrorCode(partitionData.errorCode())
).toList())
));
errorTopicResponseList.add(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicId(topicData.topicId())
.setTopicName(metadataImage.topics().topicIdToNameView().get(topicData.topicId()))
.setPartitions(topicData.partitions().stream().map(
partitionData -> new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partitionData.partition())
.setErrorMessage(partitionData.errorCode() == Errors.NONE.code() ? null : Errors.forCode(partitionData.errorCode()).message())
.setErrorCode(partitionData.errorCode())
).toList())
)
);
future.complete(
return CompletableFuture.completedFuture(
new DeleteShareGroupOffsetsResponseData()
.setResponses(deleteShareGroupOffsetsResponseTopicList));
.setResponses(errorTopicResponseList)
);
}).exceptionally(throwable -> {
log.error("Failed to delete share group state");
return DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(throwable));
});
}
@ -1590,83 +1584,53 @@ public class GroupCoordinatorService implements GroupCoordinator {
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.INVALID_GROUP_ID));
}
Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>();
List<DeleteShareGroupStateRequestData.DeleteStateData> deleteShareGroupStateRequestTopicsData = new ArrayList<>(requestData.topics().size());
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> deleteShareGroupOffsetsResponseTopicList = new ArrayList<>(requestData.topics().size());
requestData.topics().forEach(topic -> {
Uuid topicId = metadataImage.topics().topicNameToIdView().get(topic.topicName());
if (topicId != null) {
requestTopicIdToNameMapping.put(topicId, topic.topicName());
deleteShareGroupStateRequestTopicsData.add(new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId)
.setPartitions(
topic.partitions().stream().map(
partitionIndex -> new DeleteShareGroupStateRequestData.PartitionData().setPartition(partitionIndex)
).toList()
));
} else {
deleteShareGroupOffsetsResponseTopicList.add(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topic.topicName())
.setPartitions(topic.partitions().stream().map(
partition -> new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
).toList()));
}
});
// If the request for the persister is empty, just complete the operation right away.
if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
if (requestData.topics() == null || requestData.topics().isEmpty()) {
return CompletableFuture.completedFuture(
new DeleteShareGroupOffsetsResponseData()
.setResponses(deleteShareGroupOffsetsResponseTopicList));
);
}
CompletableFuture<DeleteShareGroupOffsetsResponseData> future = new CompletableFuture<>();
return runtime.scheduleReadOperation(
"share-group-delete-offsets-request",
topicPartitionFor(groupId),
(coordinator, lastCommittedOffset) -> coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData)
)
.thenCompose(resultHolder -> {
if (resultHolder == null) {
log.error("Failed to retrieve deleteState request parameters from group coordinator for the group {}", groupId);
return CompletableFuture.completedFuture(
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.UNKNOWN_SERVER_ERROR)
);
}
TopicPartition topicPartition = topicPartitionFor(groupId);
if (resultHolder.topLevelErrorCode() != Errors.NONE.code()) {
return CompletableFuture.completedFuture(
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(
resultHolder.topLevelErrorCode(),
resultHolder.topLevelErrorMessage()
)
);
}
// This is done to make sure the provided group is empty. Offsets can be deleted only for an empty share group.
CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> describeGroupFuture =
runtime.scheduleReadOperation(
"share-group-describe",
topicPartition,
(coordinator, lastCommittedOffset) -> coordinator.shareGroupDescribe(List.of(groupId), lastCommittedOffset)
).exceptionally(exception -> handleOperationException(
"share-group-describe",
List.of(groupId),
exception,
(error, __) -> ShareGroupDescribeRequest.getErrorDescribedGroupList(List.of(groupId), error),
log
));
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList =
resultHolder.errorTopicResponseList() == null ? new ArrayList<>() : new ArrayList<>(resultHolder.errorTopicResponseList());
describeGroupFuture.whenComplete((groups, throwable) -> {
if (throwable != null) {
log.error("Failed to describe the share group {}", groupId, throwable);
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(throwable)));
} else if (groups == null || groups.isEmpty()) {
log.error("Describe share group resulted in empty response for group {}", groupId);
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.GROUP_ID_NOT_FOUND));
} else if (groups.get(0).errorCode() != Errors.NONE.code()) {
log.error("Failed to describe the share group {}", groupId);
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(groups.get(0).errorCode(), groups.get(0).errorMessage()));
} else if (groups.get(0).members() != null && !groups.get(0).members().isEmpty()) {
log.error("Provided group {} is not empty", groupId);
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.NON_EMPTY_GROUP));
} else {
populateDeleteShareGroupOffsetsFuture(
requestData,
future,
requestTopicIdToNameMapping,
deleteShareGroupStateRequestTopicsData,
deleteShareGroupOffsetsResponseTopicList
if (resultHolder.deleteStateRequestParameters() == null) {
return CompletableFuture.completedFuture(
new DeleteShareGroupOffsetsResponseData()
.setResponses(errorTopicResponseList)
);
}
return persistDeleteShareGroupOffsets(
resultHolder.deleteStateRequestParameters(),
errorTopicResponseList
);
}
});
return future;
})
.exceptionally(throwable -> {
log.error("Failed to retrieve deleteState request parameters from group coordinator for the group {}", groupId, throwable);
return DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(throwable));
});
}
/**

View File

@ -26,6 +26,9 @@ import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
@ -131,6 +134,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@ -290,6 +294,69 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
}
}
public static class DeleteShareGroupOffsetsResultHolder {
private final short topLevelErrorCode;
private final String topLevelErrorMessage;
private final List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList;
private final DeleteShareGroupStateParameters deleteStateRequestParameters;
DeleteShareGroupOffsetsResultHolder(short topLevelErrorCode, String topLevelErrorMessage) {
this(topLevelErrorCode, topLevelErrorMessage, null, null);
}
DeleteShareGroupOffsetsResultHolder(
short topLevelErrorCode,
String topLevelErrorMessage,
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList
) {
this(topLevelErrorCode, topLevelErrorMessage, errorTopicResponseList, null);
}
DeleteShareGroupOffsetsResultHolder(
short topLevelErrorCode,
String topLevelErrorMessage,
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList,
DeleteShareGroupStateParameters deleteStateRequestParameters
) {
this.topLevelErrorCode = topLevelErrorCode;
this.topLevelErrorMessage = topLevelErrorMessage;
this.errorTopicResponseList = errorTopicResponseList;
this.deleteStateRequestParameters = deleteStateRequestParameters;
}
public short topLevelErrorCode() {
return this.topLevelErrorCode;
}
public String topLevelErrorMessage() {
return this.topLevelErrorMessage;
}
public List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList() {
return this.errorTopicResponseList;
}
public DeleteShareGroupStateParameters deleteStateRequestParameters() {
return this.deleteStateRequestParameters;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DeleteShareGroupOffsetsResultHolder other = (DeleteShareGroupOffsetsResultHolder) o;
return topLevelErrorCode == other.topLevelErrorCode &&
Objects.equals(topLevelErrorMessage, other.topLevelErrorMessage) &&
Objects.equals(errorTopicResponseList, other.errorTopicResponseList) &&
Objects.equals(deleteStateRequestParameters, other.deleteStateRequestParameters);
}
@Override
public int hashCode() {
return Objects.hash(topLevelErrorCode, topLevelErrorMessage, errorTopicResponseList, deleteStateRequestParameters);
}
}
/**
* The group/offsets expiration key to schedule a timer task.
*
@ -613,6 +680,57 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
return new CoordinatorResult<>(records, responseMap);
}
/**
* Does the following checks to make sure that a DeleteShareGroupOffsets request is valid and can be processed further
* 1. Checks whether the provided group is empty
* 2. Checks the requested topics are presented in the metadataImage
* 3. Checks the requested share partitions are initialized for the group
*
* @param groupId - The group ID
* @param requestData - The request data for DeleteShareGroupOffsetsRequest
* @return {@link DeleteShareGroupOffsetsResultHolder} an object containing top level error code, list of topic responses
* and persister deleteState request parameters
*/
public DeleteShareGroupOffsetsResultHolder shareGroupDeleteOffsetsRequest(
String groupId,
DeleteShareGroupOffsetsRequestData requestData
) {
try {
ShareGroup group = groupMetadataManager.shareGroup(groupId);
group.validateDeleteGroup();
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList = new ArrayList<>();
List<DeleteShareGroupStateRequestData.DeleteStateData> deleteShareGroupStateRequestTopicsData =
groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(
groupId,
requestData,
errorTopicResponseList
);
if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
return new DeleteShareGroupOffsetsResultHolder(Errors.NONE.code(), null, errorTopicResponseList);
}
DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = new DeleteShareGroupStateRequestData()
.setGroupId(requestData.groupId())
.setTopics(deleteShareGroupStateRequestTopicsData);
return new DeleteShareGroupOffsetsResultHolder(
Errors.NONE.code(),
null,
errorTopicResponseList,
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)
);
} catch (GroupIdNotFoundException exception) {
log.error("groupId {} not found", groupId, exception);
return new DeleteShareGroupOffsetsResultHolder(Errors.GROUP_ID_NOT_FOUND.code(), exception.getMessage());
} catch (GroupNotEmptyException exception) {
log.error("Provided group {} is not empty", groupId);
return new DeleteShareGroupOffsetsResultHolder(Errors.NON_EMPTY_GROUP.code(), exception.getMessage());
}
}
/**
* Fetch offsets for a given set of partitions and a given group.
*

View File

@ -41,6 +41,9 @@ import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.ConsumerProtocolSubscription;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
@ -8195,6 +8198,52 @@ public class GroupMetadataManager {
);
}
/**
* Returns a list of delete share group state request topic objects to be used with the persister.
* @param groupId - group ID of the share group
* @param requestData - the request data for DeleteShareGroupOffsets request
* @param errorTopicResponseList - the list of topics not found in the metadata image
* @return List of objects representing the share group state delete request for topics.
*/
public List<DeleteShareGroupStateRequestData.DeleteStateData> sharePartitionsEligibleForOffsetDeletion(
String groupId,
DeleteShareGroupOffsetsRequestData requestData,
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList
) {
List<DeleteShareGroupStateRequestData.DeleteStateData> deleteShareGroupStateRequestTopicsData = new ArrayList<>();
Map<Uuid, Set<Integer>> initializedSharePartitions = initializedShareGroupPartitions(groupId);
requestData.topics().forEach(topic -> {
Uuid topicId = metadataImage.topics().topicNameToIdView().get(topic.topicName());
if (topicId != null) {
// A deleteState request to persister should only be sent with those topic partitions for which corresponding
// share partitions are initialized for the group.
if (initializedSharePartitions.containsKey(topicId)) {
List<DeleteShareGroupStateRequestData.PartitionData> partitions = new ArrayList<>();
topic.partitions().forEach(partition -> {
if (initializedSharePartitions.get(topicId).contains(partition)) {
partitions.add(new DeleteShareGroupStateRequestData.PartitionData().setPartition(partition));
}
});
deleteShareGroupStateRequestTopicsData.add(new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId)
.setPartitions(partitions));
}
} else {
errorTopicResponseList.add(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topic.topicName())
.setPartitions(topic.partitions().stream().map(
partition -> new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
).collect(Collectors.toCollection(ArrayList::new))));
}
});
return deleteShareGroupStateRequestTopicsData;
}
/**
* Validates the DeleteGroups request.
*

View File

@ -121,7 +121,6 @@ import org.mockito.ArgumentMatchers;
import java.net.InetAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -3298,9 +3297,11 @@ public class GroupCoordinatorServiceTest {
.build(true);
service.startup(() -> 1);
String groupId = "share-group-id";
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
@ -3317,14 +3318,30 @@ public class GroupCoordinatorServiceTest {
.setErrorMessage(null))))
);
ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("share-group-id-1");
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
Errors.NONE.code(),
null,
List.of(),
DeleteShareGroupStateParameters.from(
new DeleteShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(partition)
))
))
)
);
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq("share-group-delete-offsets-request"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData);
@ -3343,16 +3360,18 @@ public class GroupCoordinatorServiceTest {
.build(true);
service.startup(() -> 1);
String groupId = "share-group-id";
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = new DeleteShareGroupStateRequestData()
.setGroupId("share-group-id")
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new DeleteShareGroupStateRequestData.PartitionData()
@ -3380,14 +3399,19 @@ public class GroupCoordinatorServiceTest {
)
);
ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("share-group-id-1");
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
Errors.NONE.code(),
null,
List.of(),
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)
);
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq("share-group-delete-offsets-request"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
DeleteShareGroupStateParameters deleteShareGroupStateParameters = DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData);
DeleteShareGroupStateResult deleteShareGroupStateResult = DeleteShareGroupStateResult.from(deleteShareGroupStateResponseData);
@ -3401,152 +3425,6 @@ public class GroupCoordinatorServiceTest {
assertEquals(responseData, future.get());
}
@Test
public void testDeleteShareGroupOffsetsNonexistentTopicWithDefaultPersister() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName("badtopic")
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setResponses(
List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName("badtopic")
.setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()))))
);
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testDeleteShareGroupOffsetsWithDefaultPersisterThrowsError() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("share-group-id-1");
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
when(persister.deleteState(ArgumentMatchers.any()))
.thenReturn(CompletableFuture.failedFuture(new Exception("Unable to validate delete share group state request")));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertFutureThrows(Exception.class, future, "Unable to validate delete share group state request");
}
@Test
public void testDeleteShareGroupOffsetsWithDefaultPersisterNullResult() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("share-group-id-1");
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
when(persister.deleteState(ArgumentMatchers.any()))
.thenReturn(CompletableFuture.completedFuture(null));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertFutureThrows(IllegalStateException.class, future, "Result is null for the delete share group state");
}
@Test
public void testDeleteShareGroupOffsetsWithDefaultPersisterNullTopicData() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("share-group-id-1");
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
DeleteShareGroupStateResult deleteShareGroupStateResult =
new DeleteShareGroupStateResult.Builder().setTopicsData(null).build();
when(persister.deleteState(ArgumentMatchers.any()))
.thenReturn(CompletableFuture.completedFuture(deleteShareGroupStateResult));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertFutureThrows(IllegalStateException.class, future, "Result is null for the delete share group state");
}
@Test
public void testDeleteShareGroupOffsetsCoordinatorNotActive() throws ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
@ -3632,7 +3510,7 @@ public class GroupCoordinatorServiceTest {
}
@Test
public void testDeleteShareGroupOffsetsDescribeThrowsError() throws InterruptedException, ExecutionException {
public void testDeleteShareGroupOffsetsEmptyRequest() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@ -3642,9 +3520,33 @@ public class GroupCoordinatorServiceTest {
.build(true);
service.startup(() -> 1);
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id");
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData();
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testDeleteShareGroupOffsetsRequestThrowsError() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
String groupId = "share-group-id";
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
@ -3655,10 +3557,10 @@ public class GroupCoordinatorServiceTest {
.setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message());
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq("share-group-delete-offsets-request"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
)).thenReturn(CompletableFuture.completedFuture(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception())));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
@ -3667,7 +3569,7 @@ public class GroupCoordinatorServiceTest {
}
@Test
public void testDeleteShareGroupOffsetsDescribeReturnsNull() throws InterruptedException, ExecutionException {
public void testDeleteShareGroupOffsetsRequestReturnsNull() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@ -3677,20 +3579,22 @@ public class GroupCoordinatorServiceTest {
.build(true);
service.startup(() -> 1);
String groupId = "share-group-id";
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
.setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message());
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq("share-group-delete-offsets-request"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(null));
@ -3702,7 +3606,7 @@ public class GroupCoordinatorServiceTest {
}
@Test
public void testDeleteShareGroupOffsetsDescribeReturnsEmpty() throws InterruptedException, ExecutionException {
public void testDeleteShareGroupOffsetsRequestReturnsGroupIdNotFound() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@ -3712,9 +3616,11 @@ public class GroupCoordinatorServiceTest {
.build(true);
service.startup(() -> 1);
String groupId = "share-group-id";
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
@ -3724,11 +3630,19 @@ public class GroupCoordinatorServiceTest {
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
.setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
Errors.GROUP_ID_NOT_FOUND.code(),
Errors.GROUP_ID_NOT_FOUND.message(),
null,
null
);
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq("share-group-delete-offsets-request"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(Collections.emptyList()));
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
@ -3737,7 +3651,7 @@ public class GroupCoordinatorServiceTest {
}
@Test
public void testDeleteShareGroupOffsetsDescribeReturnsError() throws InterruptedException, ExecutionException {
public void testDeleteShareGroupOffsetsRequestReturnsGroupNotEmpty() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@ -3747,48 +3661,11 @@ public class GroupCoordinatorServiceTest {
.build(true);
service.startup(() -> 1);
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
.setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup()
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
.setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testDeleteShareGroupOffsetsGroupIsNotEmpty() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
String groupId = "share-group-id";
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
@ -3798,15 +3675,415 @@ public class GroupCoordinatorServiceTest {
.setErrorCode(Errors.NON_EMPTY_GROUP.code())
.setErrorMessage(Errors.NON_EMPTY_GROUP.message());
ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("share-group-id-1")
.setMembers(List.of(new ShareGroupDescribeResponseData.Member()));
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
Errors.NON_EMPTY_GROUP.code(),
Errors.NON_EMPTY_GROUP.message(),
null,
null
);
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq("share-group-delete-offsets-request"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testDeleteShareGroupOffsetsRequestReturnsNullParameters() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
String groupId = "share-group-id";
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData();
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
Errors.NONE.code(),
null,
null,
null
);
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-delete-offsets-request"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testDeleteShareGroupOffsetsRequestReturnsNullParametersWithErrorTopics() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
String badTopicName = "bad-topic";
Uuid badTopicId = Uuid.randomUuid();
String groupId = "share-group-id";
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition)),
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(badTopicName)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setResponses(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(badTopicName)
.setTopicId(badTopicId)
.setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
))));
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
Errors.NONE.code(),
null,
List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(badTopicName)
.setTopicId(badTopicId)
.setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
))),
null
);
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-delete-offsets-request"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testDeleteShareGroupOffsetsWithDefaultPersisterThrowsError() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
String groupId = "share-group-id";
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
Exception persisterException = new Exception("Unable to validate delete share group state request");
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setErrorCode(Errors.forException(persisterException).code())
.setErrorMessage(Errors.forException(persisterException).message());
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
Errors.NONE.code(),
null,
List.of(),
DeleteShareGroupStateParameters.from(
new DeleteShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(partition)
))
))
)
);
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-delete-offsets-request"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
when(persister.deleteState(ArgumentMatchers.any()))
.thenReturn(CompletableFuture.failedFuture(persisterException));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testDeleteShareGroupOffsetsWithDefaultPersisterNullResult() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
String groupId = "share-group-id";
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
Exception persisterException = new IllegalStateException("Result is null for the delete share group state");
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setErrorCode(Errors.forException(persisterException).code())
.setErrorMessage(Errors.forException(persisterException).message());
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
Errors.NONE.code(),
null,
List.of(),
DeleteShareGroupStateParameters.from(
new DeleteShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(partition)
))
))
)
);
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-delete-offsets-request"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
when(persister.deleteState(ArgumentMatchers.any()))
.thenReturn(CompletableFuture.completedFuture(null));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testDeleteShareGroupOffsetsWithDefaultPersisterNullTopicData() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
String groupId = "share-group-id";
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
Exception persisterException = new IllegalStateException("Result is null for the delete share group state");
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setErrorCode(Errors.forException(persisterException).code())
.setErrorMessage(Errors.forException(persisterException).message());
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
Errors.NONE.code(),
null,
List.of(),
DeleteShareGroupStateParameters.from(
new DeleteShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(partition)
))
))
)
);
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-delete-offsets-request"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
DeleteShareGroupStateResult deleteShareGroupStateResult =
new DeleteShareGroupStateResult.Builder().setTopicsData(null).build();
when(persister.deleteState(ArgumentMatchers.any()))
.thenReturn(CompletableFuture.completedFuture(deleteShareGroupStateResult));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testDeleteShareGroupOffsetsSuccessWithErrorTopicPartitions() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
String badTopicName = "bad-topic";
Uuid badTopicId = Uuid.randomUuid();
String groupId = "share-group-id";
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition)),
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(badTopicName)
.setPartitions(List.of(partition))
));
DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = new DeleteShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(partition)))));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setResponses(
List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(badTopicName)
.setTopicId(badTopicId)
.setPartitions(List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
)),
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(TOPIC_NAME)
.setTopicId(TOPIC_ID)
.setPartitions(List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null)
))
)
);
DeleteShareGroupStateResponseData deleteShareGroupStateResponseData = new DeleteShareGroupStateResponseData()
.setResults(
List.of(new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null)))
)
);
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
Errors.NONE.code(),
null,
List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(badTopicName)
.setTopicId(badTopicId)
.setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
))),
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)
);
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-delete-offsets-request"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
DeleteShareGroupStateParameters deleteShareGroupStateParameters = DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData);
DeleteShareGroupStateResult deleteShareGroupStateResult = DeleteShareGroupStateResult.from(deleteShareGroupStateResponseData);
when(persister.deleteState(
ArgumentMatchers.eq(deleteShareGroupStateParameters)
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupStateResult));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);

View File

@ -23,6 +23,9 @@ import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
@ -124,6 +127,7 @@ import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@ -1949,4 +1953,302 @@ public class GroupCoordinatorShardTest {
verify(groupMetadataManager, times(0)).group(eq("share-group"));
verify(groupMetadataManager, times(0)).shareGroupBuildPartitionDeleteRequest(eq(groupId), anyList());
}
@Test
public void testShareGroupDeleteOffsetsRequestGroupNotFound() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
Time.SYSTEM,
new MockCoordinatorTimer<>(Time.SYSTEM),
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
String groupId = "share-group";
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName("topic-1")
.setPartitions(List.of(0))
));
GroupIdNotFoundException exception = new GroupIdNotFoundException("group Id not found");
doThrow(exception).when(groupMetadataManager).shareGroup(eq(groupId));
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder expectedResult =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(Errors.forException(exception).code(), exception.getMessage());
assertEquals(expectedResult, coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData));
verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
// Not called because of Group not found.
verify(groupMetadataManager, times(0)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any());
}
@Test
public void testShareGroupDeleteOffsetsRequestNonEmptyShareGroup() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
Time.SYSTEM,
new MockCoordinatorTimer<>(Time.SYSTEM),
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
String groupId = "share-group";
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName("topic-1")
.setPartitions(List.of(0))
));
ShareGroup shareGroup = mock(ShareGroup.class);
GroupNotEmptyException exception = new GroupNotEmptyException("group is not empty");
doThrow(exception).when(shareGroup).validateDeleteGroup();
when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup);
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder expectedResult =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(Errors.forException(exception).code(), exception.getMessage());
assertEquals(expectedResult, coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData));
verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
// Not called because of Group not found.
verify(groupMetadataManager, times(0)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any());
}
@Test
public void testShareGroupDeleteOffsetsRequestEmptyResult() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
Time.SYSTEM,
new MockCoordinatorTimer<>(Time.SYSTEM),
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
String groupId = "share-group";
String topicName = "topic-1";
Uuid topicId = Uuid.randomUuid();
int partition = 0;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName)
.setPartitions(List.of(partition))
));
ShareGroup shareGroup = mock(ShareGroup.class);
doNothing().when(shareGroup).validateDeleteGroup();
when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup);
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList = List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topicName)
.setTopicId(topicId)
.setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())))
);
when(groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(eq(groupId), eq(requestData), any()))
.thenAnswer(invocation -> {
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> inputList = invocation.getArgument(2);
inputList.addAll(errorTopicResponseList);
return List.of();
});
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder expectedResult =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(Errors.NONE.code(), null, errorTopicResponseList);
assertEquals(expectedResult, coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData));
verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
verify(groupMetadataManager, times(1)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any());
}
@Test
public void testShareGroupDeleteOffsetsRequestSuccess() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
Time.SYSTEM,
new MockCoordinatorTimer<>(Time.SYSTEM),
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
String groupId = "share-group";
String topicName1 = "topic-1";
Uuid topicId1 = Uuid.randomUuid();
String topicName2 = "topic-2";
Uuid topicId2 = Uuid.randomUuid();
int partition = 0;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName1)
.setPartitions(List.of(partition)),
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName2)
.setPartitions(List.of(partition))
));
ShareGroup shareGroup = mock(ShareGroup.class);
doNothing().when(shareGroup).validateDeleteGroup();
when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup);
List<DeleteShareGroupStateRequestData.DeleteStateData> deleteShareGroupStateRequestTopicsData =
List.of(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId1)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(partition)
)),
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId2)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(partition)
))
);
when(groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(eq(groupId), eq(requestData), any()))
.thenReturn(deleteShareGroupStateRequestTopicsData);
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder expectedResult =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
Errors.NONE.code(),
null,
List.of(),
DeleteShareGroupStateParameters.from(
new DeleteShareGroupStateRequestData()
.setGroupId(requestData.groupId())
.setTopics(deleteShareGroupStateRequestTopicsData)
));
assertEquals(expectedResult, coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData));
verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
verify(groupMetadataManager, times(1)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any());
}
@Test
public void testShareGroupDeleteOffsetsRequestSuccessWithErrorTopics() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
Time.SYSTEM,
new MockCoordinatorTimer<>(Time.SYSTEM),
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
String groupId = "share-group";
String topicName1 = "topic-1";
Uuid topicId1 = Uuid.randomUuid();
String topicName2 = "topic-2";
Uuid topicId2 = Uuid.randomUuid();
int partition = 0;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName1)
.setPartitions(List.of(partition)),
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName2)
.setPartitions(List.of(partition))
));
ShareGroup shareGroup = mock(ShareGroup.class);
doNothing().when(shareGroup).validateDeleteGroup();
when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup);
List<DeleteShareGroupStateRequestData.DeleteStateData> deleteShareGroupStateRequestTopicsData =
List.of(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId1)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(partition)
))
);
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList =
List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topicName2)
.setTopicId(topicId2)
.setPartitions(List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
))
);
when(groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(eq(groupId), eq(requestData), any()))
.thenAnswer(invocation -> {
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> inputList = invocation.getArgument(2);
inputList.addAll(errorTopicResponseList);
return deleteShareGroupStateRequestTopicsData;
});
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder expectedResult =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
Errors.NONE.code(),
null,
errorTopicResponseList,
DeleteShareGroupStateParameters.from(
new DeleteShareGroupStateRequestData()
.setGroupId(requestData.groupId())
.setTopics(deleteShareGroupStateRequestTopicsData)
));
assertEquals(expectedResult, coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData));
verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
verify(groupMetadataManager, times(1)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any());
}
}

View File

@ -43,6 +43,9 @@ import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.ConsumerProtocolAssignment;
import org.apache.kafka.common.message.ConsumerProtocolSubscription;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
@ -20804,6 +20807,362 @@ public class GroupMetadataManagerTest {
assertRecordsEquals(expectedRecords, records);
}
@Test
public void testSharePartitionsEligibleForOffsetDeletionSuccess() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.build();
String groupId = "share-group";
Uuid memberId = Uuid.randomUuid();
String topicName1 = "topic-1";
String topicName2 = "topic-2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
MetadataImage image = new MetadataImageBuilder()
.addTopic(topicId1, topicName1, 3)
.addTopic(topicId2, topicName2, 2)
.build();
context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class));
context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId.toString())
.setMemberEpoch(0)
.setSubscribedTopicNames(List.of(topicName1, topicName2)));
context.groupMetadataManager.replay(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId),
new ShareGroupStatePartitionMetadataValue()
.setInitializingTopics(List.of())
.setInitializedTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(topicId1)
.setTopicName(topicName1)
.setPartitions(List.of(0, 1, 2)),
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(topicId2)
.setTopicName(topicName2)
.setPartitions(List.of(0, 1))
))
.setDeletingTopics(List.of())
);
List<DeleteShareGroupStateRequestData.DeleteStateData> expectedResult = List.of(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId1)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(0),
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(1),
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(2)
)),
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId2)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(0),
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(1)
))
);
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName1)
.setPartitions(List.of(0, 1, 2)),
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName2)
.setPartitions(List.of(0, 1))
));
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList = new ArrayList<>();
List<DeleteShareGroupStateRequestData.DeleteStateData> result =
context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId, requestData, errorTopicResponseList);
assertTrue(errorTopicResponseList.isEmpty());
assertEquals(expectedResult, result);
}
@Test
public void testSharePartitionsEligibleForOffsetDeletionErrorTopics() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.build();
String groupId = "share-group";
Uuid memberId = Uuid.randomUuid();
String topicName1 = "topic-1";
String topicName2 = "topic-2";
Uuid topicId1 = Uuid.randomUuid();
MetadataImage image = new MetadataImageBuilder()
.addTopic(topicId1, topicName1, 3)
.build();
context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class));
context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId.toString())
.setMemberEpoch(0)
.setSubscribedTopicNames(List.of(topicName1)));
context.groupMetadataManager.replay(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId),
new ShareGroupStatePartitionMetadataValue()
.setInitializingTopics(List.of())
.setInitializedTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(topicId1)
.setTopicName(topicName1)
.setPartitions(List.of(0, 1, 2))
))
.setDeletingTopics(List.of())
);
List<DeleteShareGroupStateRequestData.DeleteStateData> expectedResult = List.of(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId1)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(0),
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(1),
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(2)
))
);
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName1)
.setPartitions(List.of(0, 1, 2)),
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName2)
.setPartitions(List.of(0, 1))
));
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList = new ArrayList<>();
List<DeleteShareGroupStateRequestData.DeleteStateData> result =
context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId, requestData, errorTopicResponseList);
assertEquals(
List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topicName2)
.setPartitions(List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()),
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
))
),
errorTopicResponseList
);
assertEquals(expectedResult, result);
}
@Test
public void testSharePartitionsEligibleForOffsetDeletionUninitializedTopics() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.build();
String groupId = "share-group";
Uuid memberId = Uuid.randomUuid();
String topicName1 = "topic-1";
String topicName2 = "topic-2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
MetadataImage image = new MetadataImageBuilder()
.addTopic(topicId1, topicName1, 3)
.addTopic(topicId2, topicName2, 2)
.build();
context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class));
context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId.toString())
.setMemberEpoch(0)
.setSubscribedTopicNames(List.of(topicName1, topicName2)));
context.groupMetadataManager.replay(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId),
new ShareGroupStatePartitionMetadataValue()
.setInitializedTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(topicId1)
.setTopicName(topicName1)
.setPartitions(List.of(0, 1, 2))
))
.setInitializingTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(topicId2)
.setTopicName(topicName2)
.setPartitions(List.of(0, 1))
))
.setDeletingTopics(List.of())
);
List<DeleteShareGroupStateRequestData.DeleteStateData> expectedResult = List.of(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId1)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(0),
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(1),
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(2)
))
);
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName1)
.setPartitions(List.of(0, 1, 2)),
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName2)
.setPartitions(List.of(0, 1))
));
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList = new ArrayList<>();
List<DeleteShareGroupStateRequestData.DeleteStateData> result =
context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId, requestData, errorTopicResponseList);
assertTrue(errorTopicResponseList.isEmpty());
assertEquals(expectedResult, result);
}
@Test
public void testSharePartitionsEligibleForOffsetDeletionUninitializedAndErrorTopics() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.build();
String groupId = "share-group";
Uuid memberId = Uuid.randomUuid();
String topicName1 = "topic-1";
String topicName2 = "topic-2";
String topicName3 = "topic-3";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
MetadataImage image = new MetadataImageBuilder()
.addTopic(topicId1, topicName1, 3)
.addTopic(topicId2, topicName2, 2)
.build();
context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class));
context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId.toString())
.setMemberEpoch(0)
.setSubscribedTopicNames(List.of(topicName1, topicName2)));
context.groupMetadataManager.replay(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId),
new ShareGroupStatePartitionMetadataValue()
.setInitializedTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(topicId1)
.setTopicName(topicName1)
.setPartitions(List.of(0, 1, 2))
))
.setInitializingTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(topicId2)
.setTopicName(topicName2)
.setPartitions(List.of(0, 1))
))
.setDeletingTopics(List.of())
);
List<DeleteShareGroupStateRequestData.DeleteStateData> expectedResult = List.of(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId1)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(0),
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(1),
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(2)
))
);
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName1)
.setPartitions(List.of(0, 1, 2)),
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName2)
.setPartitions(List.of(0, 1)),
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName3)
.setPartitions(List.of(0, 1))
));
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList = new ArrayList<>();
List<DeleteShareGroupStateRequestData.DeleteStateData> result =
context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId, requestData, errorTopicResponseList);
assertEquals(
List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topicName3)
.setPartitions(List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()),
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
))
),
errorTopicResponseList
);
assertEquals(expectedResult, result);
}
@Test
public void testShareGroupHeartbeatInitializeOnPartitionUpdate() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");