KAFKA-18629: Delete share group state impl [1/N] (#18712)

Reviewers: Christo Lolov <lolovc@amazon.com>, Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Sushant Mahajan 2025-01-28 17:13:01 +05:30 committed by GitHub
parent 5631be20a6
commit f32932cc25
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 980 additions and 73 deletions

View File

@ -17,13 +17,16 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DeleteShareGroupStateResponse extends AbstractResponse {
@ -65,4 +68,46 @@ public class DeleteShareGroupStateResponse extends AbstractResponse {
new DeleteShareGroupStateResponseData(new ByteBufferAccessor(buffer), version)
);
}
public static DeleteShareGroupStateResponseData toResponseData(Uuid topicId, int partitionId) {
return new DeleteShareGroupStateResponseData()
.setResults(List.of(
new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(topicId)
.setPartitions(List.of(
new DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)))));
}
public static DeleteShareGroupStateResponseData.PartitionResult toErrorResponsePartitionResult(
int partitionId,
Errors error,
String errorMessage
) {
return new DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)
.setErrorCode(error.code())
.setErrorMessage(errorMessage);
}
public static DeleteShareGroupStateResponseData.DeleteStateResult toResponseDeleteStateResult(Uuid topicId, List<DeleteShareGroupStateResponseData.PartitionResult> partitionResults) {
return new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(topicId)
.setPartitions(partitionResults);
}
public static DeleteShareGroupStateResponseData.PartitionResult toResponsePartitionResult(int partitionId) {
return new DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId);
}
public static DeleteShareGroupStateResponseData toErrorResponseData(Uuid topicId, int partitionId, Errors error, String errorMessage) {
return new DeleteShareGroupStateResponseData().setResults(
Collections.singletonList(new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(topicId)
.setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)
.setErrorCode(error.code())
.setErrorMessage(errorMessage)))));
}
}

View File

@ -3148,9 +3148,22 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleDeleteShareGroupStateRequest(request: RequestChannel.Request): Unit = {
val deleteShareGroupStateRequest = request.body[DeleteShareGroupStateRequest]
// TODO: Implement the DeleteShareGroupStateRequest handling
requestHelper.sendMaybeThrottle(request, deleteShareGroupStateRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
shareCoordinator match {
case None => requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
deleteShareGroupStateRequest.getErrorResponse(requestThrottleMs,
new ApiException("Share coordinator is not enabled.")))
case Some(coordinator) => coordinator.deleteState(request.context, deleteShareGroupStateRequest.data)
.handle[Unit] { (response, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request, deleteShareGroupStateRequest.getErrorResponse(exception))
} else {
requestHelper.sendMaybeThrottle(request, new DeleteShareGroupStateResponse(response))
}
}
}
}
def handleReadShareGroupStateSummaryRequest(request: RequestChannel.Request): Unit = {

View File

@ -10547,6 +10547,96 @@ class KafkaApisTest extends Logging {
})
}
@Test
def testDeleteShareGroupStateSuccess(): Unit = {
val topicId = Uuid.randomUuid();
val deleteRequestData = new DeleteShareGroupStateRequestData()
.setGroupId("group1")
.setTopics(List(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId)
.setPartitions(List(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(1)
).asJava)
).asJava)
val deleteStateResultData: util.List[DeleteShareGroupStateResponseData.DeleteStateResult] = List(
new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(topicId)
.setPartitions(List(
new DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(1)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null)
).asJava)
).asJava
val config = Map(
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true",
)
val response = getDeleteShareGroupResponse(
deleteRequestData,
config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = true,
null,
deleteStateResultData
)
assertNotNull(response.data)
assertEquals(1, response.data.results.size)
}
@Test
def testDeleteShareGroupStateAuthorizationFailed(): Unit = {
val topicId = Uuid.randomUuid();
val deleteRequestData = new DeleteShareGroupStateRequestData()
.setGroupId("group1")
.setTopics(List(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId)
.setPartitions(List(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(1)
).asJava)
).asJava)
val deleteStateResultData: util.List[DeleteShareGroupStateResponseData.DeleteStateResult] = List(
new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(topicId)
.setPartitions(List(
new DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(1)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null)
).asJava)
).asJava
val authorizer: Authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
.thenReturn(Seq(AuthorizationResult.DENIED).asJava, Seq(AuthorizationResult.ALLOWED).asJava)
val config = Map(
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true",
)
val response = getDeleteShareGroupResponse(
deleteRequestData,
config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = false,
authorizer,
deleteStateResultData
)
assertNotNull(response.data)
assertEquals(1, response.data.results.size)
response.data.results.forEach(deleteResult => {
assertEquals(1, deleteResult.partitions.size)
assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED.code(), deleteResult.partitions.get(0).errorCode())
})
}
def getShareGroupDescribeResponse(groupIds: util.List[String], configOverrides: Map[String, String] = Map.empty,
verifyNoErr: Boolean = true, authorizer: Authorizer = null,
describedGroups: util.List[ShareGroupDescribeResponseData.DescribedGroup]): ShareGroupDescribeResponse = {
@ -10663,4 +10753,33 @@ class KafkaApisTest extends Logging {
}
response
}
def getDeleteShareGroupResponse(requestData: DeleteShareGroupStateRequestData, configOverrides: Map[String, String] = Map.empty,
verifyNoErr: Boolean = true, authorizer: Authorizer = null,
deleteStateResult: util.List[DeleteShareGroupStateResponseData.DeleteStateResult]): DeleteShareGroupStateResponse = {
val requestChannelRequest = buildRequest(new DeleteShareGroupStateRequest.Builder(requestData, true).build())
val future = new CompletableFuture[DeleteShareGroupStateResponseData]()
when(shareCoordinator.deleteState(
any[RequestContext],
any[DeleteShareGroupStateRequestData]
)).thenReturn(future)
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(
overrideProperties = configOverrides,
authorizer = Option(authorizer),
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching())
future.complete(new DeleteShareGroupStateResponseData()
.setResults(deleteStateResult))
val response = verifyNoThrottling[DeleteShareGroupStateResponse](requestChannelRequest)
if (verifyNoErr) {
val expectedDeleteShareGroupStateResponseData = new DeleteShareGroupStateResponseData()
.setResults(deleteStateResult)
assertEquals(expectedDeleteShareGroupStateResponseData, response.data)
}
response
}
}

View File

@ -17,6 +17,8 @@
package org.apache.kafka.coordinator.share;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@ -86,6 +88,14 @@ public interface ShareCoordinator {
*/
CompletableFuture<ReadShareGroupStateSummaryResponseData> readStateSummary(RequestContext context, ReadShareGroupStateSummaryRequestData request);
/**
* Handle delete share group state call
* @param context - represents the incoming delete share group request context
* @param request - actual RPC request object
* @return completable future representing delete share group RPC response data
*/
CompletableFuture<DeleteShareGroupStateResponseData> deleteState(RequestContext context, DeleteShareGroupStateRequestData request);
/**
* Called when new coordinator is elected
* @param partitionIndex - The partition index (internal topic)

View File

@ -71,4 +71,14 @@ public class ShareCoordinatorRecordHelpers {
)
);
}
public static CoordinatorRecord newShareStateTombstoneRecord(String groupId, Uuid topicId, int partitionId) {
// Always generate share snapshot type record for tombstone.
return CoordinatorRecord.tombstone(
new ShareSnapshotKey()
.setGroupId(groupId)
.setTopicId(topicId)
.setPartition(partitionId)
);
}
}

View File

@ -22,6 +22,8 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@ -29,6 +31,7 @@ import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
import org.apache.kafka.common.requests.RequestContext;
@ -694,6 +697,116 @@ public class ShareCoordinatorService implements ShareCoordinator {
});
}
@Override
public CompletableFuture<DeleteShareGroupStateResponseData> deleteState(RequestContext context, DeleteShareGroupStateRequestData request) {
// Send an empty response if the coordinator is not active.
if (!isActive.get()) {
return CompletableFuture.completedFuture(
generateErrorDeleteStateResponse(
request,
Errors.COORDINATOR_NOT_AVAILABLE,
"Share coordinator is not available."
)
);
}
String groupId = request.groupId();
// Send an empty response if groupId is invalid.
if (isGroupIdEmpty(groupId)) {
log.error("Group id must be specified and non-empty: {}", request);
return CompletableFuture.completedFuture(
new DeleteShareGroupStateResponseData()
);
}
// Send an empty response if topic data is empty.
if (isEmpty(request.topics())) {
log.error("Topic Data is empty: {}", request);
return CompletableFuture.completedFuture(
new DeleteShareGroupStateResponseData()
);
}
// Send an empty response if partition data is empty for any topic.
for (DeleteShareGroupStateRequestData.DeleteStateData topicData : request.topics()) {
if (isEmpty(topicData.partitions())) {
log.error("Partition Data for topic {} is empty: {}", topicData.topicId(), request);
return CompletableFuture.completedFuture(
new DeleteShareGroupStateResponseData()
);
}
}
// A map to store the futures for each topicId and partition.
Map<Uuid, Map<Integer, CompletableFuture<DeleteShareGroupStateResponseData>>> futureMap = new HashMap<>();
// The request received here could have multiple keys of structure group:topic:partition. However,
// the deleteState method in ShareCoordinatorShard expects a single key in the request. Hence, we will
// be looping over the keys below and constructing new DeleteShareGroupStateRequestData objects to pass
// onto the shard method.
for (DeleteShareGroupStateRequestData.DeleteStateData topicData : request.topics()) {
Uuid topicId = topicData.topicId();
for (DeleteShareGroupStateRequestData.PartitionData partitionData : topicData.partitions()) {
SharePartitionKey coordinatorKey = SharePartitionKey.getInstance(request.groupId(), topicId, partitionData.partition());
DeleteShareGroupStateRequestData requestForCurrentPartition = new DeleteShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId)
.setPartitions(List.of(partitionData))));
CompletableFuture<DeleteShareGroupStateResponseData> deleteFuture = runtime.scheduleWriteOperation(
"delete-share-group-state",
topicPartitionFor(coordinatorKey),
Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
coordinator -> coordinator.deleteState(requestForCurrentPartition)
).exceptionally(deleteException ->
handleOperationException(
"delete-share-group-state",
request,
deleteException,
(error, message) -> DeleteShareGroupStateResponse.toErrorResponseData(
topicData.topicId(),
partitionData.partition(),
error,
"Unable to delete share group state: " + deleteException.getMessage()
),
log
));
futureMap.computeIfAbsent(topicId, k -> new HashMap<>())
.put(partitionData.partition(), deleteFuture);
}
}
// Combine all futures into a single CompletableFuture<Void>.
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(futureMap.values().stream()
.flatMap(map -> map.values().stream()).toArray(CompletableFuture[]::new));
// Transform the combined CompletableFuture<Void> into CompletableFuture<DeleteShareGroupStateResponseData>.
return combinedFuture.thenApply(v -> {
List<DeleteShareGroupStateResponseData.DeleteStateResult> deleteStateResult = new ArrayList<>(futureMap.size());
futureMap.forEach(
(topicId, topicEntry) -> {
List<DeleteShareGroupStateResponseData.PartitionResult> partitionResults = new ArrayList<>(topicEntry.size());
topicEntry.forEach(
(partitionId, responseFuture) -> {
// ResponseFut would already be completed by now since we have used
// CompletableFuture::allOf to create a combined future from the future map.
partitionResults.add(
responseFuture.getNow(null).results().get(0).partitions().get(0)
);
}
);
deleteStateResult.add(DeleteShareGroupStateResponse.toResponseDeleteStateResult(topicId, partitionResults));
}
);
return new DeleteShareGroupStateResponseData()
.setResults(deleteStateResult);
});
}
private ReadShareGroupStateResponseData generateErrorReadStateResponse(
ReadShareGroupStateRequestData request,
Errors error,
@ -746,6 +859,23 @@ public class ShareCoordinatorService implements ShareCoordinator {
}).collect(Collectors.toList()));
}
private DeleteShareGroupStateResponseData generateErrorDeleteStateResponse(
DeleteShareGroupStateRequestData request,
Errors error,
String errorMessage
) {
return new DeleteShareGroupStateResponseData().setResults(request.topics().stream()
.map(topicData -> {
DeleteShareGroupStateResponseData.DeleteStateResult resultData = new DeleteShareGroupStateResponseData.DeleteStateResult();
resultData.setTopicId(topicData.topicId());
resultData.setPartitions(topicData.partitions().stream()
.map(partitionData -> DeleteShareGroupStateResponse.toErrorResponsePartitionResult(
partitionData.partition(), error, errorMessage
)).collect(Collectors.toList()));
return resultData;
}).collect(Collectors.toList()));
}
private static boolean isGroupIdEmpty(String groupId) {
return groupId == null || groupId.isEmpty();
}

View File

@ -20,6 +20,8 @@ package org.apache.kafka.coordinator.share;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@ -28,6 +30,7 @@ import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
import org.apache.kafka.common.requests.TransactionResult;
@ -228,16 +231,25 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
private void handleShareSnapshot(ShareSnapshotKey key, ShareSnapshotValue value, long offset) {
SharePartitionKey mapKey = SharePartitionKey.getInstance(key.groupId(), key.topicId(), key.partition());
maybeUpdateLeaderEpochMap(mapKey, value.leaderEpoch());
maybeUpdateStateEpochMap(mapKey, value.stateEpoch());
if (value == null) {
log.debug("Tombstone records received for share partition key: {}", mapKey);
// Consider this a tombstone.
shareStateMap.remove(mapKey);
leaderEpochMap.remove(mapKey);
stateEpochMap.remove(mapKey);
snapshotUpdateCount.remove(mapKey);
} else {
maybeUpdateLeaderEpochMap(mapKey, value.leaderEpoch());
maybeUpdateStateEpochMap(mapKey, value.stateEpoch());
ShareGroupOffset offsetRecord = ShareGroupOffset.fromRecord(value);
// this record is the complete snapshot
shareStateMap.put(mapKey, offsetRecord);
// if number of share updates is exceeded, then reset it
if (snapshotUpdateCount.containsKey(mapKey)) {
if (snapshotUpdateCount.get(mapKey) >= config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) {
snapshotUpdateCount.put(mapKey, 0);
ShareGroupOffset offsetRecord = ShareGroupOffset.fromRecord(value);
// This record is the complete snapshot.
shareStateMap.put(mapKey, offsetRecord);
// If number of share updates is exceeded, then reset it.
if (snapshotUpdateCount.containsKey(mapKey)) {
if (snapshotUpdateCount.get(mapKey) >= config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) {
snapshotUpdateCount.put(mapKey, 0);
}
}
}
@ -475,6 +487,49 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
);
}
/**
* This method writes tombstone records corresponding to the requested topic partitions.
* <p>
* This method as called by the ShareCoordinatorService will be provided with
* the request data which covers only key i.e. group1:topic1:partition1. The implementation
* below was done keeping this in mind.
*
* @param request - ReadShareGroupStateSummaryRequestData for a single key
* @return CoordinatorResult(records, response)
*/
public CoordinatorResult<DeleteShareGroupStateResponseData, CoordinatorRecord> deleteState(
DeleteShareGroupStateRequestData request
) {
// Records to write (with both key and value of snapshot type), response to caller
// only one key will be there in the request by design.
Optional<CoordinatorResult<DeleteShareGroupStateResponseData, CoordinatorRecord>> error = maybeGetDeleteStateError(request);
if (error.isPresent()) {
return error.get();
}
DeleteShareGroupStateRequestData.DeleteStateData topicData = request.topics().get(0);
DeleteShareGroupStateRequestData.PartitionData partitionData = topicData.partitions().get(0);
SharePartitionKey key = SharePartitionKey.getInstance(request.groupId(), topicData.topicId(), partitionData.partition());
CoordinatorRecord record = generateTombstoneRecord(key);
// build successful response if record is correctly created
DeleteShareGroupStateResponseData responseData = new DeleteShareGroupStateResponseData()
.setResults(
List.of(
DeleteShareGroupStateResponse.toResponseDeleteStateResult(key.topicId(),
List.of(
DeleteShareGroupStateResponse.toResponsePartitionResult(
key.partition()
)
)
)
)
);
return new CoordinatorResult<>(Collections.singletonList(record), responseData);
}
/**
* Util method to generate a ShareSnapshot or ShareUpdate type record for a key, based on various conditions.
* <p>
@ -537,6 +592,14 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
}
}
private CoordinatorRecord generateTombstoneRecord(SharePartitionKey key) {
return ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord(
key.groupId(),
key.topicId(),
key.partition()
);
}
private List<PersisterStateBatch> mergeBatches(
List<PersisterStateBatch> soFar,
WriteShareGroupStateRequestData.PartitionData partitionData) {
@ -670,6 +733,37 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
return Optional.empty();
}
private Optional<CoordinatorResult<DeleteShareGroupStateResponseData, CoordinatorRecord>> maybeGetDeleteStateError(
DeleteShareGroupStateRequestData request
) {
DeleteShareGroupStateRequestData.DeleteStateData topicData = request.topics().get(0);
DeleteShareGroupStateRequestData.PartitionData partitionData = topicData.partitions().get(0);
Uuid topicId = topicData.topicId();
int partitionId = partitionData.partition();
if (topicId == null) {
return Optional.of(getDeleteErrorResponse(Errors.INVALID_REQUEST, NULL_TOPIC_ID, null, partitionId));
}
if (partitionId < 0) {
return Optional.of(getDeleteErrorResponse(Errors.INVALID_REQUEST, NEGATIVE_PARTITION_ID, topicId, partitionId));
}
if (metadataImage == null) {
log.error("Metadata image is null");
return Optional.of(getDeleteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, topicId, partitionId));
}
if (metadataImage.topics().getTopic(topicId) == null ||
metadataImage.topics().getPartition(topicId, partitionId) == null) {
log.error("Topic/TopicPartition not found in metadata image.");
return Optional.of(getDeleteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, topicId, partitionId));
}
return Optional.empty();
}
private CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord> getWriteErrorResponse(
Errors error,
Exception exception,
@ -681,6 +775,17 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
return new CoordinatorResult<>(Collections.emptyList(), responseData);
}
private CoordinatorResult<DeleteShareGroupStateResponseData, CoordinatorRecord> getDeleteErrorResponse(
Errors error,
Exception exception,
Uuid topicId,
int partitionId
) {
String message = exception == null ? error.message() : exception.getMessage();
DeleteShareGroupStateResponseData responseData = DeleteShareGroupStateResponse.toErrorResponseData(topicId, partitionId, error, message);
return new CoordinatorResult<>(Collections.emptyList(), responseData);
}
// Visible for testing
Integer getLeaderMapValue(SharePartitionKey key) {
return this.leaderEpochMap.get(key);

View File

@ -112,4 +112,25 @@ public class ShareCoordinatorRecordHelpersTest {
assertEquals(expectedRecord, record);
}
@Test
public void testNewShareStateTombstoneRecord() {
String groupId = "test-group";
Uuid topicId = Uuid.randomUuid();
int partitionId = 1;
CoordinatorRecord record = ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord(
groupId,
topicId,
partitionId
);
CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
new ShareSnapshotKey()
.setGroupId(groupId)
.setTopicId(topicId)
.setPartition(partitionId)
);
assertEquals(expectedRecord, record);
}
}

View File

@ -23,6 +23,8 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@ -49,7 +51,6 @@ import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -79,7 +80,7 @@ class ShareCoordinatorServiceTest {
private CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> mockRuntime() {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mock(CoordinatorRuntime.class);
when(runtime.activeTopicPartitions())
.thenReturn(Collections.singletonList(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)));
.thenReturn(List.of(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)));
return runtime;
}
@ -133,13 +134,13 @@ class ShareCoordinatorServiceTest {
.setTopics(Arrays.asList(
new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(topicId1)
.setPartitions(Collections.singletonList(
.setPartitions(List.of(
new WriteShareGroupStateRequestData.PartitionData()
.setPartition(partition1)
.setStartOffset(0)
.setStateEpoch(1)
.setLeaderEpoch(1)
.setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch()
.setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@ -148,13 +149,13 @@ class ShareCoordinatorServiceTest {
)),
new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(topicId2)
.setPartitions(Collections.singletonList(
.setPartitions(List.of(
new WriteShareGroupStateRequestData.PartitionData()
.setPartition(partition2)
.setStartOffset(0)
.setStateEpoch(1)
.setLeaderEpoch(1)
.setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch()
.setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@ -165,18 +166,18 @@ class ShareCoordinatorServiceTest {
);
WriteShareGroupStateResponseData response1 = new WriteShareGroupStateResponseData()
.setResults(Collections.singletonList(
.setResults(List.of(
new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId1)
.setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult()
.setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partition1)))
));
WriteShareGroupStateResponseData response2 = new WriteShareGroupStateResponseData()
.setResults(Collections.singletonList(
.setResults(List.of(
new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId2)
.setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult()
.setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partition2)))
));
@ -199,11 +200,11 @@ class ShareCoordinatorServiceTest {
HashSet<WriteShareGroupStateResponseData.WriteStateResult> expectedResult = new HashSet<>(Arrays.asList(
new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId2)
.setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult()
.setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partition2))),
new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId1)
.setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult()
.setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partition1)))));
assertEquals(expectedResult, result);
verify(time, times(2)).hiResClockMs();
@ -243,14 +244,14 @@ class ShareCoordinatorServiceTest {
.setTopics(Arrays.asList(
new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(topicId1)
.setPartitions(Collections.singletonList(
.setPartitions(List.of(
new ReadShareGroupStateRequestData.PartitionData()
.setPartition(partition1)
.setLeaderEpoch(1)
)),
new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(topicId2)
.setPartitions(Collections.singletonList(
.setPartitions(List.of(
new ReadShareGroupStateRequestData.PartitionData()
.setPartition(partition2)
.setLeaderEpoch(1)
@ -260,12 +261,12 @@ class ShareCoordinatorServiceTest {
ReadShareGroupStateResponseData.ReadStateResult topicData1 = new ReadShareGroupStateResponseData.ReadStateResult()
.setTopicId(topicId1)
.setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult()
.setPartitions(List.of(new ReadShareGroupStateResponseData.PartitionResult()
.setPartition(partition1)
.setErrorCode(Errors.NONE.code())
.setStateEpoch(1)
.setStartOffset(0)
.setStateBatches(Collections.singletonList(new ReadShareGroupStateResponseData.StateBatch()
.setStateBatches(List.of(new ReadShareGroupStateResponseData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@ -275,7 +276,7 @@ class ShareCoordinatorServiceTest {
ReadShareGroupStateResponseData.ReadStateResult topicData2 = new ReadShareGroupStateResponseData.ReadStateResult()
.setTopicId(topicId2)
.setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult()
.setPartitions(List.of(new ReadShareGroupStateResponseData.PartitionResult()
.setPartition(partition2)
.setErrorCode(Errors.NONE.code())
.setStateEpoch(1)
@ -301,9 +302,9 @@ class ShareCoordinatorServiceTest {
any()
))
.thenReturn(CompletableFuture.completedFuture(new ReadShareGroupStateResponseData()
.setResults(Collections.singletonList(topicData1))))
.setResults(List.of(topicData1))))
.thenReturn(CompletableFuture.completedFuture(new ReadShareGroupStateResponseData()
.setResults(Collections.singletonList(topicData2))));
.setResults(List.of(topicData2))));
CompletableFuture<ReadShareGroupStateResponseData> future = service.readState(
requestContext(ApiKeys.READ_SHARE_GROUP_STATE),
@ -345,14 +346,14 @@ class ShareCoordinatorServiceTest {
.setTopics(Arrays.asList(
new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(topicId1)
.setPartitions(Collections.singletonList(
.setPartitions(List.of(
new ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartition(partition1)
.setLeaderEpoch(1)
)),
new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(topicId2)
.setPartitions(Collections.singletonList(
.setPartitions(List.of(
new ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartition(partition2)
.setLeaderEpoch(1)
@ -362,7 +363,7 @@ class ShareCoordinatorServiceTest {
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult topicData1 = new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
.setTopicId(topicId1)
.setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(partition1)
.setErrorCode(Errors.NONE.code())
.setStateEpoch(1)
@ -371,7 +372,7 @@ class ShareCoordinatorServiceTest {
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult topicData2 = new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
.setTopicId(topicId2)
.setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(partition2)
.setErrorCode(Errors.NONE.code())
.setStateEpoch(1)
@ -385,9 +386,9 @@ class ShareCoordinatorServiceTest {
any()
))
.thenReturn(CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResponseData()
.setResults(Collections.singletonList(topicData1))))
.setResults(List.of(topicData1))))
.thenReturn(CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResponseData()
.setResults(Collections.singletonList(topicData2))));
.setResults(List.of(topicData2))));
CompletableFuture<ReadShareGroupStateSummaryResponseData> future = service.readStateSummary(
requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY),
@ -402,6 +403,93 @@ class ShareCoordinatorServiceTest {
assertEquals(expectedResult, result);
}
@Test
public void testDeleteStateSuccess() throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Metrics metrics = new Metrics();
ShareCoordinatorMetrics coordinatorMetrics = new ShareCoordinatorMetrics(metrics);
Time time = mock(Time.class);
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
coordinatorMetrics,
time,
mock(Timer.class),
mock(PartitionWriter.class)
);
service.startup(() -> 1);
String groupId = "group1";
Uuid topicId1 = Uuid.randomUuid();
int partition1 = 0;
Uuid topicId2 = Uuid.randomUuid();
int partition2 = 1;
DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(Arrays.asList(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId1)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(partition1)
)),
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId2)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(partition2)
))
)
);
DeleteShareGroupStateResponseData response1 = new DeleteShareGroupStateResponseData()
.setResults(List.of(
new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(topicId1)
.setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(partition1)))
));
DeleteShareGroupStateResponseData response2 = new DeleteShareGroupStateResponseData()
.setResults(List.of(
new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(topicId2)
.setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(partition2)))
));
when(runtime.scheduleWriteOperation(
eq("delete-share-group-state"),
eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)),
eq(Duration.ofMillis(5000)),
any()
))
.thenReturn(CompletableFuture.completedFuture(response1))
.thenReturn(CompletableFuture.completedFuture(response2));
CompletableFuture<DeleteShareGroupStateResponseData> future = service.deleteState(
requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE),
request
);
HashSet<DeleteShareGroupStateResponseData.DeleteStateResult> result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
HashSet<DeleteShareGroupStateResponseData.DeleteStateResult> expectedResult = new HashSet<>(Arrays.asList(
new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(topicId2)
.setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(partition2))),
new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(topicId1)
.setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(partition1)))));
assertEquals(expectedResult, result);
}
@Test
public void testWriteStateValidationsError() throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
@ -433,7 +521,7 @@ class ShareCoordinatorServiceTest {
assertEquals(new WriteShareGroupStateResponseData(),
service.writeState(
requestContext(ApiKeys.WRITE_SHARE_GROUP_STATE),
new WriteShareGroupStateRequestData().setGroupId(groupId).setTopics(Collections.singletonList(
new WriteShareGroupStateRequestData().setGroupId(groupId).setTopics(List.of(
new WriteShareGroupStateRequestData.WriteStateData().setTopicId(topicId)))
).get(5, TimeUnit.SECONDS)
);
@ -442,8 +530,8 @@ class ShareCoordinatorServiceTest {
assertEquals(new WriteShareGroupStateResponseData(),
service.writeState(
requestContext(ApiKeys.WRITE_SHARE_GROUP_STATE),
new WriteShareGroupStateRequestData().setGroupId(null).setTopics(Collections.singletonList(
new WriteShareGroupStateRequestData.WriteStateData().setTopicId(topicId).setPartitions(Collections.singletonList(
new WriteShareGroupStateRequestData().setGroupId(null).setTopics(List.of(
new WriteShareGroupStateRequestData.WriteStateData().setTopicId(topicId).setPartitions(List.of(
new WriteShareGroupStateRequestData.PartitionData().setPartition(partition)))))
).get(5, TimeUnit.SECONDS)
);
@ -480,7 +568,7 @@ class ShareCoordinatorServiceTest {
assertEquals(new ReadShareGroupStateResponseData(),
service.readState(
requestContext(ApiKeys.READ_SHARE_GROUP_STATE),
new ReadShareGroupStateRequestData().setGroupId(groupId).setTopics(Collections.singletonList(
new ReadShareGroupStateRequestData().setGroupId(groupId).setTopics(List.of(
new ReadShareGroupStateRequestData.ReadStateData().setTopicId(topicId)))
).get(5, TimeUnit.SECONDS)
);
@ -489,8 +577,8 @@ class ShareCoordinatorServiceTest {
assertEquals(new ReadShareGroupStateResponseData(),
service.readState(
requestContext(ApiKeys.READ_SHARE_GROUP_STATE),
new ReadShareGroupStateRequestData().setGroupId(null).setTopics(Collections.singletonList(
new ReadShareGroupStateRequestData.ReadStateData().setTopicId(topicId).setPartitions(Collections.singletonList(
new ReadShareGroupStateRequestData().setGroupId(null).setTopics(List.of(
new ReadShareGroupStateRequestData.ReadStateData().setTopicId(topicId).setPartitions(List.of(
new ReadShareGroupStateRequestData.PartitionData().setPartition(partition)))))
).get(5, TimeUnit.SECONDS)
);
@ -527,7 +615,7 @@ class ShareCoordinatorServiceTest {
assertEquals(new ReadShareGroupStateSummaryResponseData(),
service.readStateSummary(
requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY),
new ReadShareGroupStateSummaryRequestData().setGroupId(groupId).setTopics(Collections.singletonList(
new ReadShareGroupStateSummaryRequestData().setGroupId(groupId).setTopics(List.of(
new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData().setTopicId(topicId)))
).get(5, TimeUnit.SECONDS)
);
@ -536,13 +624,60 @@ class ShareCoordinatorServiceTest {
assertEquals(new ReadShareGroupStateSummaryResponseData(),
service.readStateSummary(
requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY),
new ReadShareGroupStateSummaryRequestData().setGroupId(null).setTopics(Collections.singletonList(
new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData().setTopicId(topicId).setPartitions(Collections.singletonList(
new ReadShareGroupStateSummaryRequestData().setGroupId(null).setTopics(List.of(
new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData().setTopicId(topicId).setPartitions(List.of(
new ReadShareGroupStateSummaryRequestData.PartitionData().setPartition(partition)))))
).get(5, TimeUnit.SECONDS)
);
}
@Test
public void testDeleteStateValidationsError() throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
mock(PartitionWriter.class)
);
service.startup(() -> 1);
String groupId = "group1";
Uuid topicId = Uuid.randomUuid();
int partition = 0;
// 1. Empty topicsData
assertEquals(new DeleteShareGroupStateResponseData(),
service.deleteState(
requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE),
new DeleteShareGroupStateRequestData().setGroupId(groupId)
).get(5, TimeUnit.SECONDS)
);
// 2. Empty partitionsData
assertEquals(new DeleteShareGroupStateResponseData(),
service.deleteState(
requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE),
new DeleteShareGroupStateRequestData().setGroupId(groupId).setTopics(List.of(
new DeleteShareGroupStateRequestData.DeleteStateData().setTopicId(topicId)))
).get(5, TimeUnit.SECONDS)
);
// 3. Invalid groupId
assertEquals(new DeleteShareGroupStateResponseData(),
service.deleteState(
requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE),
new DeleteShareGroupStateRequestData().setGroupId(null).setTopics(List.of(
new DeleteShareGroupStateRequestData.DeleteStateData().setTopicId(topicId).setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData().setPartition(partition)))))
).get(5, TimeUnit.SECONDS)
);
}
@Test
public void testWriteStateWhenNotStarted() throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
@ -568,13 +703,13 @@ class ShareCoordinatorServiceTest {
.setTopics(Arrays.asList(
new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(topicId1)
.setPartitions(Collections.singletonList(
.setPartitions(List.of(
new WriteShareGroupStateRequestData.PartitionData()
.setPartition(partition1)
.setStartOffset(0)
.setStateEpoch(1)
.setLeaderEpoch(1)
.setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch()
.setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@ -583,13 +718,13 @@ class ShareCoordinatorServiceTest {
)),
new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(topicId2)
.setPartitions(Collections.singletonList(
.setPartitions(List.of(
new WriteShareGroupStateRequestData.PartitionData()
.setPartition(partition2)
.setStartOffset(0)
.setStateEpoch(1)
.setLeaderEpoch(1)
.setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch()
.setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@ -609,13 +744,13 @@ class ShareCoordinatorServiceTest {
HashSet<WriteShareGroupStateResponseData.WriteStateResult> expectedResult = new HashSet<>(Arrays.asList(
new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId2)
.setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult()
.setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partition2)
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage("Share coordinator is not available."))),
new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId1)
.setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult()
.setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partition1)
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage("Share coordinator is not available.")))));
@ -647,14 +782,14 @@ class ShareCoordinatorServiceTest {
.setTopics(Arrays.asList(
new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(topicId1)
.setPartitions(Collections.singletonList(
.setPartitions(List.of(
new ReadShareGroupStateRequestData.PartitionData()
.setPartition(partition1)
.setLeaderEpoch(1)
)),
new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(topicId2)
.setPartitions(Collections.singletonList(
.setPartitions(List.of(
new ReadShareGroupStateRequestData.PartitionData()
.setPartition(partition2)
.setLeaderEpoch(1)
@ -672,13 +807,13 @@ class ShareCoordinatorServiceTest {
HashSet<ReadShareGroupStateResponseData.ReadStateResult> expectedResult = new HashSet<>(Arrays.asList(
new ReadShareGroupStateResponseData.ReadStateResult()
.setTopicId(topicId2)
.setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult()
.setPartitions(List.of(new ReadShareGroupStateResponseData.PartitionResult()
.setPartition(partition2)
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage("Share coordinator is not available."))),
new ReadShareGroupStateResponseData.ReadStateResult()
.setTopicId(topicId1)
.setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult()
.setPartitions(List.of(new ReadShareGroupStateResponseData.PartitionResult()
.setPartition(partition1)
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage("Share coordinator is not available.")))));
@ -710,14 +845,14 @@ class ShareCoordinatorServiceTest {
.setTopics(Arrays.asList(
new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(topicId1)
.setPartitions(Collections.singletonList(
.setPartitions(List.of(
new ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartition(partition1)
.setLeaderEpoch(1)
)),
new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(topicId2)
.setPartitions(Collections.singletonList(
.setPartitions(List.of(
new ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartition(partition2)
.setLeaderEpoch(1)
@ -735,13 +870,74 @@ class ShareCoordinatorServiceTest {
HashSet<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> expectedResult = new HashSet<>(Arrays.asList(
new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
.setTopicId(topicId2)
.setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(partition2)
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage("Share coordinator is not available."))),
new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
.setTopicId(topicId1)
.setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(partition1)
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage("Share coordinator is not available.")))));
assertEquals(expectedResult, result);
}
@Test
public void testDeleteStateWhenNotStarted() throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
mock(PartitionWriter.class)
);
String groupId = "group1";
Uuid topicId1 = Uuid.randomUuid();
int partition1 = 0;
Uuid topicId2 = Uuid.randomUuid();
int partition2 = 1;
DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(Arrays.asList(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId1)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(partition1)
)),
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId2)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(partition2)
))
)
);
CompletableFuture<DeleteShareGroupStateResponseData> future = service.deleteState(
requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE),
request
);
HashSet<DeleteShareGroupStateResponseData.DeleteStateResult> result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
HashSet<DeleteShareGroupStateResponseData.DeleteStateResult> expectedResult = new HashSet<>(Arrays.asList(
new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(topicId2)
.setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(partition2)
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage("Share coordinator is not available."))),
new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(topicId1)
.setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(partition1)
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage("Share coordinator is not available.")))));
@ -771,23 +967,23 @@ class ShareCoordinatorServiceTest {
.thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
assertEquals(new WriteShareGroupStateResponseData()
.setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult()
.setResults(List.of(new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId)
.setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult()
.setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage("Unable to write share group state: This server does not host this topic-partition."))))),
service.writeState(
requestContext(ApiKeys.WRITE_SHARE_GROUP_STATE),
new WriteShareGroupStateRequestData().setGroupId(groupId)
.setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData()
.setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(topicId)
.setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData()
.setPartition(partition)
.setLeaderEpoch(1)
.setStartOffset(1)
.setStateEpoch(1)
.setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch()
.setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(2)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@ -821,18 +1017,18 @@ class ShareCoordinatorServiceTest {
.thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
assertEquals(new ReadShareGroupStateResponseData()
.setResults(Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult()
.setResults(List.of(new ReadShareGroupStateResponseData.ReadStateResult()
.setTopicId(topicId)
.setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult()
.setPartitions(List.of(new ReadShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setErrorMessage("Unable to read share group state: The server experienced an unexpected error when processing the request."))))),
service.readState(
requestContext(ApiKeys.READ_SHARE_GROUP_STATE),
new ReadShareGroupStateRequestData().setGroupId(groupId)
.setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData()
.setTopics(List.of(new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(topicId)
.setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new ReadShareGroupStateRequestData.PartitionData()
.setPartition(partition)
.setLeaderEpoch(1)
))
@ -864,18 +1060,18 @@ class ShareCoordinatorServiceTest {
.thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
assertEquals(new ReadShareGroupStateSummaryResponseData()
.setResults(Collections.singletonList(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
.setResults(List.of(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
.setTopicId(topicId)
.setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(partition)
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setErrorMessage("Unable to read share group state summary: The server experienced an unexpected error when processing the request."))))),
service.readStateSummary(
requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY),
new ReadShareGroupStateSummaryRequestData().setGroupId(groupId)
.setTopics(Collections.singletonList(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopics(List.of(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(topicId)
.setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartitions(List.of(new ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartition(partition)
.setLeaderEpoch(1)
))
@ -884,6 +1080,48 @@ class ShareCoordinatorServiceTest {
);
}
@Test
public void testDeleteFutureReturnsError() throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
mock(PartitionWriter.class)
);
service.startup(() -> 1);
String groupId = "group1";
Uuid topicId = Uuid.randomUuid();
int partition = 0;
when(runtime.scheduleWriteOperation(any(), any(), any(), any()))
.thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
assertEquals(new DeleteShareGroupStateResponseData()
.setResults(List.of(new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(topicId)
.setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage("Unable to delete share group state: This server does not host this topic-partition."))))),
service.deleteState(
requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE),
new DeleteShareGroupStateRequestData().setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId)
.setPartitions(List.of(new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(partition)
))
))
).get(5, TimeUnit.SECONDS)
);
}
@Test
public void testTopicPartitionFor() {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();

View File

@ -18,6 +18,8 @@
package org.apache.kafka.coordinator.share;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@ -26,6 +28,7 @@ import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
@ -40,6 +43,7 @@ import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.share.SharePartitionKey;
@ -58,10 +62,12 @@ import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@ -957,6 +963,216 @@ class ShareCoordinatorShardTest {
verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
}
@Test
public void testDeleteStateSuccess() {
ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build();
SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)))));
CoordinatorResult<DeleteShareGroupStateResponseData, CoordinatorRecord> result = shard.deleteState(request);
// apply a record in to verify delete
CoordinatorRecord record = ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID,
TOPIC_ID,
PARTITION,
new ShareGroupOffset.Builder()
.setSnapshotEpoch(0)
.setStateEpoch(0)
.setLeaderEpoch(0)
.setStateBatches(List.of(
new PersisterStateBatch(
0,
10,
(byte) 0,
(short) 1
)
)
)
.build()
);
shard.replay(0L, 0L, (short) 0, record);
assertNotNull(shard.getShareStateMapValue(shareCoordinatorKey));
assertNotNull(shard.getLeaderMapValue(shareCoordinatorKey));
assertNotNull(shard.getStateEpochMapValue(shareCoordinatorKey));
// apply tombstone
shard.replay(0L, 0L, (short) 0, result.records().get(0));
DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
List<CoordinatorRecord> expectedRecords = List.of(
ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord(
GROUP_ID, TOPIC_ID, PARTITION)
);
assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records());
assertNull(shard.getShareStateMapValue(shareCoordinatorKey));
assertNull(shard.getLeaderMapValue(shareCoordinatorKey));
assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
}
@Test
public void testDeleteStateFirstRecordDeleteSuccess() {
ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build();
SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)))));
CoordinatorResult<DeleteShareGroupStateResponseData, CoordinatorRecord> result = shard.deleteState(request);
assertNull(shard.getShareStateMapValue(shareCoordinatorKey));
assertNull(shard.getLeaderMapValue(shareCoordinatorKey));
assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
// apply tombstone
shard.replay(0L, 0L, (short) 0, result.records().get(0));
DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
List<CoordinatorRecord> expectedRecords = List.of(
ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord(
GROUP_ID, TOPIC_ID, PARTITION)
);
assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records());
assertNull(shard.getShareStateMapValue(shareCoordinatorKey));
assertNull(shard.getLeaderMapValue(shareCoordinatorKey));
assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
}
@Test
public void testDeleteStateInvalidRequestData() {
ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build();
// invalid partition
int partition = -1;
DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(List.of(new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(partition)))));
CoordinatorResult<DeleteShareGroupStateResponseData, CoordinatorRecord> result = shard.deleteState(request);
DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toErrorResponseData(
TOPIC_ID, partition, Errors.INVALID_REQUEST, ShareCoordinatorShard.NEGATIVE_PARTITION_ID.getMessage());
List<CoordinatorRecord> expectedRecords = List.of();
assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records());
assertEquals(expectedRecords, result.records());
}
@Test
public void testDeleteNullMetadataImage() {
ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build();
shard.onNewMetadataImage(null, null);
DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(List.of(new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(0)))));
CoordinatorResult<DeleteShareGroupStateResponseData, CoordinatorRecord> result = shard.deleteState(request);
DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toErrorResponseData(
TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.UNKNOWN_TOPIC_OR_PARTITION.message());
List<CoordinatorRecord> expectedRecords = List.of();
assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records());
}
@Test
public void testDeleteTopicIdNonExistentInMetadataImage() {
ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build();
MetadataImage image = mock(MetadataImage.class);
shard.onNewMetadataImage(image, null);
DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(List.of(new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(0)))));
// topic id not found in cache
TopicsImage topicsImage = mock(TopicsImage.class);
when(topicsImage.getTopic(eq(TOPIC_ID))).thenReturn(
null
);
when(image.topics()).thenReturn(
topicsImage
);
CoordinatorResult<DeleteShareGroupStateResponseData, CoordinatorRecord> result = shard.deleteState(request);
DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toErrorResponseData(
TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.UNKNOWN_TOPIC_OR_PARTITION.message());
List<CoordinatorRecord> expectedRecords = List.of();
assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records());
verify(topicsImage, times(1)).getTopic(eq(TOPIC_ID));
}
@Test
public void testDeletePartitionIdNonExistentInMetadataImage() {
ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build();
MetadataImage image = mock(MetadataImage.class);
shard.onNewMetadataImage(image, null);
DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(List.of(new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(0)))));
// topic id found in cache
TopicsImage topicsImage = mock(TopicsImage.class);
when(topicsImage.getTopic(eq(TOPIC_ID))).thenReturn(
mock(TopicImage.class)
);
when(image.topics()).thenReturn(
topicsImage
);
// partition id not found
when(topicsImage.getPartition(eq(TOPIC_ID), eq(0))).thenReturn(
null
);
CoordinatorResult<DeleteShareGroupStateResponseData, CoordinatorRecord> result = shard.deleteState(request);
DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toErrorResponseData(
TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.UNKNOWN_TOPIC_OR_PARTITION.message());
List<CoordinatorRecord> expectedRecords = List.of();
assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records());
verify(topicsImage, times(1)).getTopic(eq(TOPIC_ID));
verify(topicsImage, times(1)).getPartition(eq(TOPIC_ID), eq(0));
}
private static ShareGroupOffset groupOffset(ApiMessage record) {
if (record instanceof ShareSnapshotValue) {
return ShareGroupOffset.fromRecord((ShareSnapshotValue) record);