diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateResponse.java index 78c87a63987..69087ebc0c8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateResponse.java @@ -17,13 +17,16 @@ package org.apache.kafka.common.requests; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.DeleteShareGroupStateResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; public class DeleteShareGroupStateResponse extends AbstractResponse { @@ -65,4 +68,46 @@ public class DeleteShareGroupStateResponse extends AbstractResponse { new DeleteShareGroupStateResponseData(new ByteBufferAccessor(buffer), version) ); } + + public static DeleteShareGroupStateResponseData toResponseData(Uuid topicId, int partitionId) { + return new DeleteShareGroupStateResponseData() + .setResults(List.of( + new DeleteShareGroupStateResponseData.DeleteStateResult() + .setTopicId(topicId) + .setPartitions(List.of( + new DeleteShareGroupStateResponseData.PartitionResult() + .setPartition(partitionId))))); + } + + public static DeleteShareGroupStateResponseData.PartitionResult toErrorResponsePartitionResult( + int partitionId, + Errors error, + String errorMessage + ) { + return new DeleteShareGroupStateResponseData.PartitionResult() + .setPartition(partitionId) + .setErrorCode(error.code()) + .setErrorMessage(errorMessage); + } + + public static DeleteShareGroupStateResponseData.DeleteStateResult toResponseDeleteStateResult(Uuid topicId, List partitionResults) { + return new DeleteShareGroupStateResponseData.DeleteStateResult() + .setTopicId(topicId) + .setPartitions(partitionResults); + } + + public static DeleteShareGroupStateResponseData.PartitionResult toResponsePartitionResult(int partitionId) { + return new DeleteShareGroupStateResponseData.PartitionResult() + .setPartition(partitionId); + } + + public static DeleteShareGroupStateResponseData toErrorResponseData(Uuid topicId, int partitionId, Errors error, String errorMessage) { + return new DeleteShareGroupStateResponseData().setResults( + Collections.singletonList(new DeleteShareGroupStateResponseData.DeleteStateResult() + .setTopicId(topicId) + .setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult() + .setPartition(partitionId) + .setErrorCode(error.code()) + .setErrorMessage(errorMessage))))); + } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 11623b02a3d..82132d868bf 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -3148,9 +3148,22 @@ class KafkaApis(val requestChannel: RequestChannel, def handleDeleteShareGroupStateRequest(request: RequestChannel.Request): Unit = { val deleteShareGroupStateRequest = request.body[DeleteShareGroupStateRequest] - // TODO: Implement the DeleteShareGroupStateRequest handling - requestHelper.sendMaybeThrottle(request, deleteShareGroupStateRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) + authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) + + shareCoordinator match { + case None => requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => + deleteShareGroupStateRequest.getErrorResponse(requestThrottleMs, + new ApiException("Share coordinator is not enabled."))) + + case Some(coordinator) => coordinator.deleteState(request.context, deleteShareGroupStateRequest.data) + .handle[Unit] { (response, exception) => + if (exception != null) { + requestHelper.sendMaybeThrottle(request, deleteShareGroupStateRequest.getErrorResponse(exception)) + } else { + requestHelper.sendMaybeThrottle(request, new DeleteShareGroupStateResponse(response)) + } + } + } } def handleReadShareGroupStateSummaryRequest(request: RequestChannel.Request): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 18930be5ee6..bcf9ec587d5 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -10547,6 +10547,96 @@ class KafkaApisTest extends Logging { }) } + @Test + def testDeleteShareGroupStateSuccess(): Unit = { + val topicId = Uuid.randomUuid(); + val deleteRequestData = new DeleteShareGroupStateRequestData() + .setGroupId("group1") + .setTopics(List( + new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(topicId) + .setPartitions(List( + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(1) + ).asJava) + ).asJava) + + val deleteStateResultData: util.List[DeleteShareGroupStateResponseData.DeleteStateResult] = List( + new DeleteShareGroupStateResponseData.DeleteStateResult() + .setTopicId(topicId) + .setPartitions(List( + new DeleteShareGroupStateResponseData.PartitionResult() + .setPartition(1) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage(null) + ).asJava) + ).asJava + + val config = Map( + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true", + ) + + val response = getDeleteShareGroupResponse( + deleteRequestData, + config ++ ShareCoordinatorTestConfig.testConfigMap().asScala, + verifyNoErr = true, + null, + deleteStateResultData + ) + + assertNotNull(response.data) + assertEquals(1, response.data.results.size) + } + + @Test + def testDeleteShareGroupStateAuthorizationFailed(): Unit = { + val topicId = Uuid.randomUuid(); + val deleteRequestData = new DeleteShareGroupStateRequestData() + .setGroupId("group1") + .setTopics(List( + new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(topicId) + .setPartitions(List( + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(1) + ).asJava) + ).asJava) + + val deleteStateResultData: util.List[DeleteShareGroupStateResponseData.DeleteStateResult] = List( + new DeleteShareGroupStateResponseData.DeleteStateResult() + .setTopicId(topicId) + .setPartitions(List( + new DeleteShareGroupStateResponseData.PartitionResult() + .setPartition(1) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage(null) + ).asJava) + ).asJava + + val authorizer: Authorizer = mock(classOf[Authorizer]) + when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) + .thenReturn(Seq(AuthorizationResult.DENIED).asJava, Seq(AuthorizationResult.ALLOWED).asJava) + + val config = Map( + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true", + ) + + val response = getDeleteShareGroupResponse( + deleteRequestData, + config ++ ShareCoordinatorTestConfig.testConfigMap().asScala, + verifyNoErr = false, + authorizer, + deleteStateResultData + ) + + assertNotNull(response.data) + assertEquals(1, response.data.results.size) + response.data.results.forEach(deleteResult => { + assertEquals(1, deleteResult.partitions.size) + assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED.code(), deleteResult.partitions.get(0).errorCode()) + }) + } + def getShareGroupDescribeResponse(groupIds: util.List[String], configOverrides: Map[String, String] = Map.empty, verifyNoErr: Boolean = true, authorizer: Authorizer = null, describedGroups: util.List[ShareGroupDescribeResponseData.DescribedGroup]): ShareGroupDescribeResponse = { @@ -10663,4 +10753,33 @@ class KafkaApisTest extends Logging { } response } + + def getDeleteShareGroupResponse(requestData: DeleteShareGroupStateRequestData, configOverrides: Map[String, String] = Map.empty, + verifyNoErr: Boolean = true, authorizer: Authorizer = null, + deleteStateResult: util.List[DeleteShareGroupStateResponseData.DeleteStateResult]): DeleteShareGroupStateResponse = { + val requestChannelRequest = buildRequest(new DeleteShareGroupStateRequest.Builder(requestData, true).build()) + + val future = new CompletableFuture[DeleteShareGroupStateResponseData]() + when(shareCoordinator.deleteState( + any[RequestContext], + any[DeleteShareGroupStateRequestData] + )).thenReturn(future) + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + kafkaApis = createKafkaApis( + overrideProperties = configOverrides, + authorizer = Option(authorizer), + ) + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching()) + + future.complete(new DeleteShareGroupStateResponseData() + .setResults(deleteStateResult)) + + val response = verifyNoThrottling[DeleteShareGroupStateResponse](requestChannelRequest) + if (verifyNoErr) { + val expectedDeleteShareGroupStateResponseData = new DeleteShareGroupStateResponseData() + .setResults(deleteStateResult) + assertEquals(expectedDeleteShareGroupStateResponseData, response.data) + } + response + } } diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java index 72427ac8705..61e96e37f07 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java @@ -17,6 +17,8 @@ package org.apache.kafka.coordinator.share; +import org.apache.kafka.common.message.DeleteShareGroupStateRequestData; +import org.apache.kafka.common.message.DeleteShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateRequestData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData; @@ -86,6 +88,14 @@ public interface ShareCoordinator { */ CompletableFuture readStateSummary(RequestContext context, ReadShareGroupStateSummaryRequestData request); + /** + * Handle delete share group state call + * @param context - represents the incoming delete share group request context + * @param request - actual RPC request object + * @return completable future representing delete share group RPC response data + */ + CompletableFuture deleteState(RequestContext context, DeleteShareGroupStateRequestData request); + /** * Called when new coordinator is elected * @param partitionIndex - The partition index (internal topic) diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpers.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpers.java index e1e9e9a11f9..17c869c4499 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpers.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpers.java @@ -71,4 +71,14 @@ public class ShareCoordinatorRecordHelpers { ) ); } + + public static CoordinatorRecord newShareStateTombstoneRecord(String groupId, Uuid topicId, int partitionId) { + // Always generate share snapshot type record for tombstone. + return CoordinatorRecord.tombstone( + new ShareSnapshotKey() + .setGroupId(groupId) + .setTopicId(topicId) + .setPartition(partitionId) + ); + } } diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java index 05fcffb43c6..3139e2ef07d 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java @@ -22,6 +22,8 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.DeleteShareGroupStateRequestData; +import org.apache.kafka.common.message.DeleteShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateRequestData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData; @@ -29,6 +31,7 @@ import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData; import org.apache.kafka.common.message.WriteShareGroupStateRequestData; import org.apache.kafka.common.message.WriteShareGroupStateResponseData; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.DeleteShareGroupStateResponse; import org.apache.kafka.common.requests.ReadShareGroupStateResponse; import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse; import org.apache.kafka.common.requests.RequestContext; @@ -694,6 +697,116 @@ public class ShareCoordinatorService implements ShareCoordinator { }); } + @Override + public CompletableFuture deleteState(RequestContext context, DeleteShareGroupStateRequestData request) { + // Send an empty response if the coordinator is not active. + if (!isActive.get()) { + return CompletableFuture.completedFuture( + generateErrorDeleteStateResponse( + request, + Errors.COORDINATOR_NOT_AVAILABLE, + "Share coordinator is not available." + ) + ); + } + + String groupId = request.groupId(); + // Send an empty response if groupId is invalid. + if (isGroupIdEmpty(groupId)) { + log.error("Group id must be specified and non-empty: {}", request); + return CompletableFuture.completedFuture( + new DeleteShareGroupStateResponseData() + ); + } + + // Send an empty response if topic data is empty. + if (isEmpty(request.topics())) { + log.error("Topic Data is empty: {}", request); + return CompletableFuture.completedFuture( + new DeleteShareGroupStateResponseData() + ); + } + + // Send an empty response if partition data is empty for any topic. + for (DeleteShareGroupStateRequestData.DeleteStateData topicData : request.topics()) { + if (isEmpty(topicData.partitions())) { + log.error("Partition Data for topic {} is empty: {}", topicData.topicId(), request); + return CompletableFuture.completedFuture( + new DeleteShareGroupStateResponseData() + ); + } + } + + // A map to store the futures for each topicId and partition. + Map>> futureMap = new HashMap<>(); + + // The request received here could have multiple keys of structure group:topic:partition. However, + // the deleteState method in ShareCoordinatorShard expects a single key in the request. Hence, we will + // be looping over the keys below and constructing new DeleteShareGroupStateRequestData objects to pass + // onto the shard method. + + for (DeleteShareGroupStateRequestData.DeleteStateData topicData : request.topics()) { + Uuid topicId = topicData.topicId(); + for (DeleteShareGroupStateRequestData.PartitionData partitionData : topicData.partitions()) { + SharePartitionKey coordinatorKey = SharePartitionKey.getInstance(request.groupId(), topicId, partitionData.partition()); + + DeleteShareGroupStateRequestData requestForCurrentPartition = new DeleteShareGroupStateRequestData() + .setGroupId(groupId) + .setTopics(List.of(new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(topicId) + .setPartitions(List.of(partitionData)))); + + CompletableFuture deleteFuture = runtime.scheduleWriteOperation( + "delete-share-group-state", + topicPartitionFor(coordinatorKey), + Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()), + coordinator -> coordinator.deleteState(requestForCurrentPartition) + ).exceptionally(deleteException -> + handleOperationException( + "delete-share-group-state", + request, + deleteException, + (error, message) -> DeleteShareGroupStateResponse.toErrorResponseData( + topicData.topicId(), + partitionData.partition(), + error, + "Unable to delete share group state: " + deleteException.getMessage() + ), + log + )); + + futureMap.computeIfAbsent(topicId, k -> new HashMap<>()) + .put(partitionData.partition(), deleteFuture); + } + } + + // Combine all futures into a single CompletableFuture. + CompletableFuture combinedFuture = CompletableFuture.allOf(futureMap.values().stream() + .flatMap(map -> map.values().stream()).toArray(CompletableFuture[]::new)); + + // Transform the combined CompletableFuture into CompletableFuture. + return combinedFuture.thenApply(v -> { + List deleteStateResult = new ArrayList<>(futureMap.size()); + futureMap.forEach( + (topicId, topicEntry) -> { + List partitionResults = new ArrayList<>(topicEntry.size()); + topicEntry.forEach( + (partitionId, responseFuture) -> { + // ResponseFut would already be completed by now since we have used + // CompletableFuture::allOf to create a combined future from the future map. + partitionResults.add( + responseFuture.getNow(null).results().get(0).partitions().get(0) + ); + } + ); + deleteStateResult.add(DeleteShareGroupStateResponse.toResponseDeleteStateResult(topicId, partitionResults)); + } + ); + return new DeleteShareGroupStateResponseData() + .setResults(deleteStateResult); + }); + } + private ReadShareGroupStateResponseData generateErrorReadStateResponse( ReadShareGroupStateRequestData request, Errors error, @@ -746,6 +859,23 @@ public class ShareCoordinatorService implements ShareCoordinator { }).collect(Collectors.toList())); } + private DeleteShareGroupStateResponseData generateErrorDeleteStateResponse( + DeleteShareGroupStateRequestData request, + Errors error, + String errorMessage + ) { + return new DeleteShareGroupStateResponseData().setResults(request.topics().stream() + .map(topicData -> { + DeleteShareGroupStateResponseData.DeleteStateResult resultData = new DeleteShareGroupStateResponseData.DeleteStateResult(); + resultData.setTopicId(topicData.topicId()); + resultData.setPartitions(topicData.partitions().stream() + .map(partitionData -> DeleteShareGroupStateResponse.toErrorResponsePartitionResult( + partitionData.partition(), error, errorMessage + )).collect(Collectors.toList())); + return resultData; + }).collect(Collectors.toList())); + } + private static boolean isGroupIdEmpty(String groupId) { return groupId == null || groupId.isEmpty(); } diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java index 53a64174f38..a71a911907a 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java @@ -20,6 +20,8 @@ package org.apache.kafka.coordinator.share; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.message.DeleteShareGroupStateRequestData; +import org.apache.kafka.common.message.DeleteShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateRequestData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData; @@ -28,6 +30,7 @@ import org.apache.kafka.common.message.WriteShareGroupStateRequestData; import org.apache.kafka.common.message.WriteShareGroupStateResponseData; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.DeleteShareGroupStateResponse; import org.apache.kafka.common.requests.ReadShareGroupStateResponse; import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse; import org.apache.kafka.common.requests.TransactionResult; @@ -228,16 +231,25 @@ public class ShareCoordinatorShard implements CoordinatorShard= config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) { - snapshotUpdateCount.put(mapKey, 0); + ShareGroupOffset offsetRecord = ShareGroupOffset.fromRecord(value); + // This record is the complete snapshot. + shareStateMap.put(mapKey, offsetRecord); + // If number of share updates is exceeded, then reset it. + if (snapshotUpdateCount.containsKey(mapKey)) { + if (snapshotUpdateCount.get(mapKey) >= config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) { + snapshotUpdateCount.put(mapKey, 0); + } } } @@ -475,6 +487,49 @@ public class ShareCoordinatorShard implements CoordinatorShard + * This method as called by the ShareCoordinatorService will be provided with + * the request data which covers only key i.e. group1:topic1:partition1. The implementation + * below was done keeping this in mind. + * + * @param request - ReadShareGroupStateSummaryRequestData for a single key + * @return CoordinatorResult(records, response) + */ + + public CoordinatorResult deleteState( + DeleteShareGroupStateRequestData request + ) { + // Records to write (with both key and value of snapshot type), response to caller + // only one key will be there in the request by design. + Optional> error = maybeGetDeleteStateError(request); + if (error.isPresent()) { + return error.get(); + } + + DeleteShareGroupStateRequestData.DeleteStateData topicData = request.topics().get(0); + DeleteShareGroupStateRequestData.PartitionData partitionData = topicData.partitions().get(0); + SharePartitionKey key = SharePartitionKey.getInstance(request.groupId(), topicData.topicId(), partitionData.partition()); + + CoordinatorRecord record = generateTombstoneRecord(key); + // build successful response if record is correctly created + DeleteShareGroupStateResponseData responseData = new DeleteShareGroupStateResponseData() + .setResults( + List.of( + DeleteShareGroupStateResponse.toResponseDeleteStateResult(key.topicId(), + List.of( + DeleteShareGroupStateResponse.toResponsePartitionResult( + key.partition() + ) + ) + ) + ) + ); + + return new CoordinatorResult<>(Collections.singletonList(record), responseData); + } + /** * Util method to generate a ShareSnapshot or ShareUpdate type record for a key, based on various conditions. *

@@ -537,6 +592,14 @@ public class ShareCoordinatorShard implements CoordinatorShard mergeBatches( List soFar, WriteShareGroupStateRequestData.PartitionData partitionData) { @@ -670,6 +733,37 @@ public class ShareCoordinatorShard implements CoordinatorShard> maybeGetDeleteStateError( + DeleteShareGroupStateRequestData request + ) { + DeleteShareGroupStateRequestData.DeleteStateData topicData = request.topics().get(0); + DeleteShareGroupStateRequestData.PartitionData partitionData = topicData.partitions().get(0); + + Uuid topicId = topicData.topicId(); + int partitionId = partitionData.partition(); + + if (topicId == null) { + return Optional.of(getDeleteErrorResponse(Errors.INVALID_REQUEST, NULL_TOPIC_ID, null, partitionId)); + } + + if (partitionId < 0) { + return Optional.of(getDeleteErrorResponse(Errors.INVALID_REQUEST, NEGATIVE_PARTITION_ID, topicId, partitionId)); + } + + if (metadataImage == null) { + log.error("Metadata image is null"); + return Optional.of(getDeleteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, topicId, partitionId)); + } + + if (metadataImage.topics().getTopic(topicId) == null || + metadataImage.topics().getPartition(topicId, partitionId) == null) { + log.error("Topic/TopicPartition not found in metadata image."); + return Optional.of(getDeleteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, topicId, partitionId)); + } + + return Optional.empty(); + } + private CoordinatorResult getWriteErrorResponse( Errors error, Exception exception, @@ -681,6 +775,17 @@ public class ShareCoordinatorShard implements CoordinatorShard(Collections.emptyList(), responseData); } + private CoordinatorResult getDeleteErrorResponse( + Errors error, + Exception exception, + Uuid topicId, + int partitionId + ) { + String message = exception == null ? error.message() : exception.getMessage(); + DeleteShareGroupStateResponseData responseData = DeleteShareGroupStateResponse.toErrorResponseData(topicId, partitionId, error, message); + return new CoordinatorResult<>(Collections.emptyList(), responseData); + } + // Visible for testing Integer getLeaderMapValue(SharePartitionKey key) { return this.leaderEpochMap.get(key); diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java index 37d9f9b5127..9de59e499d1 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java @@ -112,4 +112,25 @@ public class ShareCoordinatorRecordHelpersTest { assertEquals(expectedRecord, record); } + + @Test + public void testNewShareStateTombstoneRecord() { + String groupId = "test-group"; + Uuid topicId = Uuid.randomUuid(); + int partitionId = 1; + CoordinatorRecord record = ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord( + groupId, + topicId, + partitionId + ); + + CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone( + new ShareSnapshotKey() + .setGroupId(groupId) + .setTopicId(topicId) + .setPartition(partitionId) + ); + + assertEquals(expectedRecord, record); + } } diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java index 7f7776cd93e..331595119b6 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java @@ -23,6 +23,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.CoordinatorNotAvailableException; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.DeleteShareGroupStateRequestData; +import org.apache.kafka.common.message.DeleteShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateRequestData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData; @@ -49,7 +51,6 @@ import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -79,7 +80,7 @@ class ShareCoordinatorServiceTest { private CoordinatorRuntime mockRuntime() { CoordinatorRuntime runtime = mock(CoordinatorRuntime.class); when(runtime.activeTopicPartitions()) - .thenReturn(Collections.singletonList(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0))); + .thenReturn(List.of(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0))); return runtime; } @@ -133,13 +134,13 @@ class ShareCoordinatorServiceTest { .setTopics(Arrays.asList( new WriteShareGroupStateRequestData.WriteStateData() .setTopicId(topicId1) - .setPartitions(Collections.singletonList( + .setPartitions(List.of( new WriteShareGroupStateRequestData.PartitionData() .setPartition(partition1) .setStartOffset(0) .setStateEpoch(1) .setLeaderEpoch(1) - .setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch() + .setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch() .setFirstOffset(0) .setLastOffset(10) .setDeliveryCount((short) 1) @@ -148,13 +149,13 @@ class ShareCoordinatorServiceTest { )), new WriteShareGroupStateRequestData.WriteStateData() .setTopicId(topicId2) - .setPartitions(Collections.singletonList( + .setPartitions(List.of( new WriteShareGroupStateRequestData.PartitionData() .setPartition(partition2) .setStartOffset(0) .setStateEpoch(1) .setLeaderEpoch(1) - .setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch() + .setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch() .setFirstOffset(0) .setLastOffset(10) .setDeliveryCount((short) 1) @@ -165,18 +166,18 @@ class ShareCoordinatorServiceTest { ); WriteShareGroupStateResponseData response1 = new WriteShareGroupStateResponseData() - .setResults(Collections.singletonList( + .setResults(List.of( new WriteShareGroupStateResponseData.WriteStateResult() .setTopicId(topicId1) - .setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult() + .setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult() .setPartition(partition1))) )); WriteShareGroupStateResponseData response2 = new WriteShareGroupStateResponseData() - .setResults(Collections.singletonList( + .setResults(List.of( new WriteShareGroupStateResponseData.WriteStateResult() .setTopicId(topicId2) - .setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult() + .setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult() .setPartition(partition2))) )); @@ -199,11 +200,11 @@ class ShareCoordinatorServiceTest { HashSet expectedResult = new HashSet<>(Arrays.asList( new WriteShareGroupStateResponseData.WriteStateResult() .setTopicId(topicId2) - .setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult() + .setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult() .setPartition(partition2))), new WriteShareGroupStateResponseData.WriteStateResult() .setTopicId(topicId1) - .setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult() + .setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult() .setPartition(partition1))))); assertEquals(expectedResult, result); verify(time, times(2)).hiResClockMs(); @@ -243,14 +244,14 @@ class ShareCoordinatorServiceTest { .setTopics(Arrays.asList( new ReadShareGroupStateRequestData.ReadStateData() .setTopicId(topicId1) - .setPartitions(Collections.singletonList( + .setPartitions(List.of( new ReadShareGroupStateRequestData.PartitionData() .setPartition(partition1) .setLeaderEpoch(1) )), new ReadShareGroupStateRequestData.ReadStateData() .setTopicId(topicId2) - .setPartitions(Collections.singletonList( + .setPartitions(List.of( new ReadShareGroupStateRequestData.PartitionData() .setPartition(partition2) .setLeaderEpoch(1) @@ -260,12 +261,12 @@ class ShareCoordinatorServiceTest { ReadShareGroupStateResponseData.ReadStateResult topicData1 = new ReadShareGroupStateResponseData.ReadStateResult() .setTopicId(topicId1) - .setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult() + .setPartitions(List.of(new ReadShareGroupStateResponseData.PartitionResult() .setPartition(partition1) .setErrorCode(Errors.NONE.code()) .setStateEpoch(1) .setStartOffset(0) - .setStateBatches(Collections.singletonList(new ReadShareGroupStateResponseData.StateBatch() + .setStateBatches(List.of(new ReadShareGroupStateResponseData.StateBatch() .setFirstOffset(0) .setLastOffset(10) .setDeliveryCount((short) 1) @@ -275,7 +276,7 @@ class ShareCoordinatorServiceTest { ReadShareGroupStateResponseData.ReadStateResult topicData2 = new ReadShareGroupStateResponseData.ReadStateResult() .setTopicId(topicId2) - .setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult() + .setPartitions(List.of(new ReadShareGroupStateResponseData.PartitionResult() .setPartition(partition2) .setErrorCode(Errors.NONE.code()) .setStateEpoch(1) @@ -301,9 +302,9 @@ class ShareCoordinatorServiceTest { any() )) .thenReturn(CompletableFuture.completedFuture(new ReadShareGroupStateResponseData() - .setResults(Collections.singletonList(topicData1)))) + .setResults(List.of(topicData1)))) .thenReturn(CompletableFuture.completedFuture(new ReadShareGroupStateResponseData() - .setResults(Collections.singletonList(topicData2)))); + .setResults(List.of(topicData2)))); CompletableFuture future = service.readState( requestContext(ApiKeys.READ_SHARE_GROUP_STATE), @@ -345,14 +346,14 @@ class ShareCoordinatorServiceTest { .setTopics(Arrays.asList( new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() .setTopicId(topicId1) - .setPartitions(Collections.singletonList( + .setPartitions(List.of( new ReadShareGroupStateSummaryRequestData.PartitionData() .setPartition(partition1) .setLeaderEpoch(1) )), new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() .setTopicId(topicId2) - .setPartitions(Collections.singletonList( + .setPartitions(List.of( new ReadShareGroupStateSummaryRequestData.PartitionData() .setPartition(partition2) .setLeaderEpoch(1) @@ -362,7 +363,7 @@ class ShareCoordinatorServiceTest { ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult topicData1 = new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() .setTopicId(topicId1) - .setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult() .setPartition(partition1) .setErrorCode(Errors.NONE.code()) .setStateEpoch(1) @@ -371,7 +372,7 @@ class ShareCoordinatorServiceTest { ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult topicData2 = new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() .setTopicId(topicId2) - .setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult() .setPartition(partition2) .setErrorCode(Errors.NONE.code()) .setStateEpoch(1) @@ -385,9 +386,9 @@ class ShareCoordinatorServiceTest { any() )) .thenReturn(CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResponseData() - .setResults(Collections.singletonList(topicData1)))) + .setResults(List.of(topicData1)))) .thenReturn(CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResponseData() - .setResults(Collections.singletonList(topicData2)))); + .setResults(List.of(topicData2)))); CompletableFuture future = service.readStateSummary( requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY), @@ -402,6 +403,93 @@ class ShareCoordinatorServiceTest { assertEquals(expectedResult, result); } + @Test + public void testDeleteStateSuccess() throws ExecutionException, InterruptedException, TimeoutException { + CoordinatorRuntime runtime = mockRuntime(); + Metrics metrics = new Metrics(); + ShareCoordinatorMetrics coordinatorMetrics = new ShareCoordinatorMetrics(metrics); + Time time = mock(Time.class); + ShareCoordinatorService service = new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorTestConfig.testConfig(), + runtime, + coordinatorMetrics, + time, + mock(Timer.class), + mock(PartitionWriter.class) + ); + + service.startup(() -> 1); + + String groupId = "group1"; + Uuid topicId1 = Uuid.randomUuid(); + int partition1 = 0; + + Uuid topicId2 = Uuid.randomUuid(); + int partition2 = 1; + + DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData() + .setGroupId(groupId) + .setTopics(Arrays.asList( + new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(topicId1) + .setPartitions(List.of( + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(partition1) + )), + new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(topicId2) + .setPartitions(List.of( + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(partition2) + )) + ) + ); + + DeleteShareGroupStateResponseData response1 = new DeleteShareGroupStateResponseData() + .setResults(List.of( + new DeleteShareGroupStateResponseData.DeleteStateResult() + .setTopicId(topicId1) + .setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult() + .setPartition(partition1))) + )); + + DeleteShareGroupStateResponseData response2 = new DeleteShareGroupStateResponseData() + .setResults(List.of( + new DeleteShareGroupStateResponseData.DeleteStateResult() + .setTopicId(topicId2) + .setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult() + .setPartition(partition2))) + )); + + when(runtime.scheduleWriteOperation( + eq("delete-share-group-state"), + eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)), + eq(Duration.ofMillis(5000)), + any() + )) + .thenReturn(CompletableFuture.completedFuture(response1)) + .thenReturn(CompletableFuture.completedFuture(response2)); + + CompletableFuture future = service.deleteState( + requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE), + request + ); + + HashSet result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results()); + + HashSet expectedResult = new HashSet<>(Arrays.asList( + new DeleteShareGroupStateResponseData.DeleteStateResult() + .setTopicId(topicId2) + .setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult() + .setPartition(partition2))), + new DeleteShareGroupStateResponseData.DeleteStateResult() + .setTopicId(topicId1) + .setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult() + .setPartition(partition1))))); + assertEquals(expectedResult, result); + } + @Test public void testWriteStateValidationsError() throws ExecutionException, InterruptedException, TimeoutException { CoordinatorRuntime runtime = mockRuntime(); @@ -433,7 +521,7 @@ class ShareCoordinatorServiceTest { assertEquals(new WriteShareGroupStateResponseData(), service.writeState( requestContext(ApiKeys.WRITE_SHARE_GROUP_STATE), - new WriteShareGroupStateRequestData().setGroupId(groupId).setTopics(Collections.singletonList( + new WriteShareGroupStateRequestData().setGroupId(groupId).setTopics(List.of( new WriteShareGroupStateRequestData.WriteStateData().setTopicId(topicId))) ).get(5, TimeUnit.SECONDS) ); @@ -442,8 +530,8 @@ class ShareCoordinatorServiceTest { assertEquals(new WriteShareGroupStateResponseData(), service.writeState( requestContext(ApiKeys.WRITE_SHARE_GROUP_STATE), - new WriteShareGroupStateRequestData().setGroupId(null).setTopics(Collections.singletonList( - new WriteShareGroupStateRequestData.WriteStateData().setTopicId(topicId).setPartitions(Collections.singletonList( + new WriteShareGroupStateRequestData().setGroupId(null).setTopics(List.of( + new WriteShareGroupStateRequestData.WriteStateData().setTopicId(topicId).setPartitions(List.of( new WriteShareGroupStateRequestData.PartitionData().setPartition(partition))))) ).get(5, TimeUnit.SECONDS) ); @@ -480,7 +568,7 @@ class ShareCoordinatorServiceTest { assertEquals(new ReadShareGroupStateResponseData(), service.readState( requestContext(ApiKeys.READ_SHARE_GROUP_STATE), - new ReadShareGroupStateRequestData().setGroupId(groupId).setTopics(Collections.singletonList( + new ReadShareGroupStateRequestData().setGroupId(groupId).setTopics(List.of( new ReadShareGroupStateRequestData.ReadStateData().setTopicId(topicId))) ).get(5, TimeUnit.SECONDS) ); @@ -489,8 +577,8 @@ class ShareCoordinatorServiceTest { assertEquals(new ReadShareGroupStateResponseData(), service.readState( requestContext(ApiKeys.READ_SHARE_GROUP_STATE), - new ReadShareGroupStateRequestData().setGroupId(null).setTopics(Collections.singletonList( - new ReadShareGroupStateRequestData.ReadStateData().setTopicId(topicId).setPartitions(Collections.singletonList( + new ReadShareGroupStateRequestData().setGroupId(null).setTopics(List.of( + new ReadShareGroupStateRequestData.ReadStateData().setTopicId(topicId).setPartitions(List.of( new ReadShareGroupStateRequestData.PartitionData().setPartition(partition))))) ).get(5, TimeUnit.SECONDS) ); @@ -527,7 +615,7 @@ class ShareCoordinatorServiceTest { assertEquals(new ReadShareGroupStateSummaryResponseData(), service.readStateSummary( requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY), - new ReadShareGroupStateSummaryRequestData().setGroupId(groupId).setTopics(Collections.singletonList( + new ReadShareGroupStateSummaryRequestData().setGroupId(groupId).setTopics(List.of( new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData().setTopicId(topicId))) ).get(5, TimeUnit.SECONDS) ); @@ -536,13 +624,60 @@ class ShareCoordinatorServiceTest { assertEquals(new ReadShareGroupStateSummaryResponseData(), service.readStateSummary( requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY), - new ReadShareGroupStateSummaryRequestData().setGroupId(null).setTopics(Collections.singletonList( - new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData().setTopicId(topicId).setPartitions(Collections.singletonList( + new ReadShareGroupStateSummaryRequestData().setGroupId(null).setTopics(List.of( + new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData().setTopicId(topicId).setPartitions(List.of( new ReadShareGroupStateSummaryRequestData.PartitionData().setPartition(partition))))) ).get(5, TimeUnit.SECONDS) ); } + @Test + public void testDeleteStateValidationsError() throws ExecutionException, InterruptedException, TimeoutException { + CoordinatorRuntime runtime = mockRuntime(); + ShareCoordinatorService service = new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorTestConfig.testConfig(), + runtime, + new ShareCoordinatorMetrics(), + Time.SYSTEM, + mock(Timer.class), + mock(PartitionWriter.class) + ); + + service.startup(() -> 1); + + String groupId = "group1"; + Uuid topicId = Uuid.randomUuid(); + int partition = 0; + + // 1. Empty topicsData + assertEquals(new DeleteShareGroupStateResponseData(), + service.deleteState( + requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE), + new DeleteShareGroupStateRequestData().setGroupId(groupId) + ).get(5, TimeUnit.SECONDS) + ); + + // 2. Empty partitionsData + assertEquals(new DeleteShareGroupStateResponseData(), + service.deleteState( + requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE), + new DeleteShareGroupStateRequestData().setGroupId(groupId).setTopics(List.of( + new DeleteShareGroupStateRequestData.DeleteStateData().setTopicId(topicId))) + ).get(5, TimeUnit.SECONDS) + ); + + // 3. Invalid groupId + assertEquals(new DeleteShareGroupStateResponseData(), + service.deleteState( + requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE), + new DeleteShareGroupStateRequestData().setGroupId(null).setTopics(List.of( + new DeleteShareGroupStateRequestData.DeleteStateData().setTopicId(topicId).setPartitions(List.of( + new DeleteShareGroupStateRequestData.PartitionData().setPartition(partition))))) + ).get(5, TimeUnit.SECONDS) + ); + } + @Test public void testWriteStateWhenNotStarted() throws ExecutionException, InterruptedException, TimeoutException { CoordinatorRuntime runtime = mockRuntime(); @@ -568,13 +703,13 @@ class ShareCoordinatorServiceTest { .setTopics(Arrays.asList( new WriteShareGroupStateRequestData.WriteStateData() .setTopicId(topicId1) - .setPartitions(Collections.singletonList( + .setPartitions(List.of( new WriteShareGroupStateRequestData.PartitionData() .setPartition(partition1) .setStartOffset(0) .setStateEpoch(1) .setLeaderEpoch(1) - .setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch() + .setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch() .setFirstOffset(0) .setLastOffset(10) .setDeliveryCount((short) 1) @@ -583,13 +718,13 @@ class ShareCoordinatorServiceTest { )), new WriteShareGroupStateRequestData.WriteStateData() .setTopicId(topicId2) - .setPartitions(Collections.singletonList( + .setPartitions(List.of( new WriteShareGroupStateRequestData.PartitionData() .setPartition(partition2) .setStartOffset(0) .setStateEpoch(1) .setLeaderEpoch(1) - .setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch() + .setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch() .setFirstOffset(0) .setLastOffset(10) .setDeliveryCount((short) 1) @@ -609,13 +744,13 @@ class ShareCoordinatorServiceTest { HashSet expectedResult = new HashSet<>(Arrays.asList( new WriteShareGroupStateResponseData.WriteStateResult() .setTopicId(topicId2) - .setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult() + .setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult() .setPartition(partition2) .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) .setErrorMessage("Share coordinator is not available."))), new WriteShareGroupStateResponseData.WriteStateResult() .setTopicId(topicId1) - .setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult() + .setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult() .setPartition(partition1) .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) .setErrorMessage("Share coordinator is not available."))))); @@ -647,14 +782,14 @@ class ShareCoordinatorServiceTest { .setTopics(Arrays.asList( new ReadShareGroupStateRequestData.ReadStateData() .setTopicId(topicId1) - .setPartitions(Collections.singletonList( + .setPartitions(List.of( new ReadShareGroupStateRequestData.PartitionData() .setPartition(partition1) .setLeaderEpoch(1) )), new ReadShareGroupStateRequestData.ReadStateData() .setTopicId(topicId2) - .setPartitions(Collections.singletonList( + .setPartitions(List.of( new ReadShareGroupStateRequestData.PartitionData() .setPartition(partition2) .setLeaderEpoch(1) @@ -672,13 +807,13 @@ class ShareCoordinatorServiceTest { HashSet expectedResult = new HashSet<>(Arrays.asList( new ReadShareGroupStateResponseData.ReadStateResult() .setTopicId(topicId2) - .setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult() + .setPartitions(List.of(new ReadShareGroupStateResponseData.PartitionResult() .setPartition(partition2) .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) .setErrorMessage("Share coordinator is not available."))), new ReadShareGroupStateResponseData.ReadStateResult() .setTopicId(topicId1) - .setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult() + .setPartitions(List.of(new ReadShareGroupStateResponseData.PartitionResult() .setPartition(partition1) .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) .setErrorMessage("Share coordinator is not available."))))); @@ -710,14 +845,14 @@ class ShareCoordinatorServiceTest { .setTopics(Arrays.asList( new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() .setTopicId(topicId1) - .setPartitions(Collections.singletonList( + .setPartitions(List.of( new ReadShareGroupStateSummaryRequestData.PartitionData() .setPartition(partition1) .setLeaderEpoch(1) )), new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() .setTopicId(topicId2) - .setPartitions(Collections.singletonList( + .setPartitions(List.of( new ReadShareGroupStateSummaryRequestData.PartitionData() .setPartition(partition2) .setLeaderEpoch(1) @@ -735,13 +870,74 @@ class ShareCoordinatorServiceTest { HashSet expectedResult = new HashSet<>(Arrays.asList( new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() .setTopicId(topicId2) - .setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult() .setPartition(partition2) .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) .setErrorMessage("Share coordinator is not available."))), new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() .setTopicId(topicId1) - .setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partition1) + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + .setErrorMessage("Share coordinator is not available."))))); + assertEquals(expectedResult, result); + } + + @Test + public void testDeleteStateWhenNotStarted() throws ExecutionException, InterruptedException, TimeoutException { + CoordinatorRuntime runtime = mockRuntime(); + ShareCoordinatorService service = new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorTestConfig.testConfig(), + runtime, + new ShareCoordinatorMetrics(), + Time.SYSTEM, + mock(Timer.class), + mock(PartitionWriter.class) + ); + + String groupId = "group1"; + Uuid topicId1 = Uuid.randomUuid(); + int partition1 = 0; + + Uuid topicId2 = Uuid.randomUuid(); + int partition2 = 1; + + DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData() + .setGroupId(groupId) + .setTopics(Arrays.asList( + new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(topicId1) + .setPartitions(List.of( + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(partition1) + )), + new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(topicId2) + .setPartitions(List.of( + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(partition2) + )) + ) + ); + + CompletableFuture future = service.deleteState( + requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE), + request + ); + + HashSet result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results()); + + HashSet expectedResult = new HashSet<>(Arrays.asList( + new DeleteShareGroupStateResponseData.DeleteStateResult() + .setTopicId(topicId2) + .setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult() + .setPartition(partition2) + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + .setErrorMessage("Share coordinator is not available."))), + new DeleteShareGroupStateResponseData.DeleteStateResult() + .setTopicId(topicId1) + .setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult() .setPartition(partition1) .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) .setErrorMessage("Share coordinator is not available."))))); @@ -771,23 +967,23 @@ class ShareCoordinatorServiceTest { .thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception())); assertEquals(new WriteShareGroupStateResponseData() - .setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult() + .setResults(List.of(new WriteShareGroupStateResponseData.WriteStateResult() .setTopicId(topicId) - .setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult() + .setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult() .setPartition(partition) .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) .setErrorMessage("Unable to write share group state: This server does not host this topic-partition."))))), service.writeState( requestContext(ApiKeys.WRITE_SHARE_GROUP_STATE), new WriteShareGroupStateRequestData().setGroupId(groupId) - .setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData() + .setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData() .setTopicId(topicId) - .setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData() .setPartition(partition) .setLeaderEpoch(1) .setStartOffset(1) .setStateEpoch(1) - .setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch() + .setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch() .setFirstOffset(2) .setLastOffset(10) .setDeliveryCount((short) 1) @@ -821,18 +1017,18 @@ class ShareCoordinatorServiceTest { .thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception())); assertEquals(new ReadShareGroupStateResponseData() - .setResults(Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult() + .setResults(List.of(new ReadShareGroupStateResponseData.ReadStateResult() .setTopicId(topicId) - .setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult() + .setPartitions(List.of(new ReadShareGroupStateResponseData.PartitionResult() .setPartition(partition) .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()) .setErrorMessage("Unable to read share group state: The server experienced an unexpected error when processing the request."))))), service.readState( requestContext(ApiKeys.READ_SHARE_GROUP_STATE), new ReadShareGroupStateRequestData().setGroupId(groupId) - .setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData() + .setTopics(List.of(new ReadShareGroupStateRequestData.ReadStateData() .setTopicId(topicId) - .setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new ReadShareGroupStateRequestData.PartitionData() .setPartition(partition) .setLeaderEpoch(1) )) @@ -864,18 +1060,18 @@ class ShareCoordinatorServiceTest { .thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception())); assertEquals(new ReadShareGroupStateSummaryResponseData() - .setResults(Collections.singletonList(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setResults(List.of(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() .setTopicId(topicId) - .setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult() .setPartition(partition) .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()) .setErrorMessage("Unable to read share group state summary: The server experienced an unexpected error when processing the request."))))), service.readStateSummary( requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY), new ReadShareGroupStateSummaryRequestData().setGroupId(groupId) - .setTopics(Collections.singletonList(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopics(List.of(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() .setTopicId(topicId) - .setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryRequestData.PartitionData() + .setPartitions(List.of(new ReadShareGroupStateSummaryRequestData.PartitionData() .setPartition(partition) .setLeaderEpoch(1) )) @@ -884,6 +1080,48 @@ class ShareCoordinatorServiceTest { ); } + @Test + public void testDeleteFutureReturnsError() throws ExecutionException, InterruptedException, TimeoutException { + CoordinatorRuntime runtime = mockRuntime(); + ShareCoordinatorService service = new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorTestConfig.testConfig(), + runtime, + new ShareCoordinatorMetrics(), + Time.SYSTEM, + mock(Timer.class), + mock(PartitionWriter.class) + ); + + service.startup(() -> 1); + + String groupId = "group1"; + Uuid topicId = Uuid.randomUuid(); + int partition = 0; + + when(runtime.scheduleWriteOperation(any(), any(), any(), any())) + .thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception())); + + assertEquals(new DeleteShareGroupStateResponseData() + .setResults(List.of(new DeleteShareGroupStateResponseData.DeleteStateResult() + .setTopicId(topicId) + .setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult() + .setPartition(partition) + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + .setErrorMessage("Unable to delete share group state: This server does not host this topic-partition."))))), + service.deleteState( + requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE), + new DeleteShareGroupStateRequestData().setGroupId(groupId) + .setTopics(List.of(new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(topicId) + .setPartitions(List.of(new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(partition) + )) + )) + ).get(5, TimeUnit.SECONDS) + ); + } + @Test public void testTopicPartitionFor() { CoordinatorRuntime runtime = mockRuntime(); diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java index fcd6933f1e3..7ebb5ce3954 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java @@ -18,6 +18,8 @@ package org.apache.kafka.coordinator.share; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.DeleteShareGroupStateRequestData; +import org.apache.kafka.common.message.DeleteShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateRequestData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData; @@ -26,6 +28,7 @@ import org.apache.kafka.common.message.WriteShareGroupStateRequestData; import org.apache.kafka.common.message.WriteShareGroupStateResponseData; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.DeleteShareGroupStateResponse; import org.apache.kafka.common.requests.ReadShareGroupStateResponse; import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse; import org.apache.kafka.common.requests.WriteShareGroupStateResponse; @@ -40,6 +43,7 @@ import org.apache.kafka.coordinator.share.generated.ShareUpdateValue; import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsImage; import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.share.SharePartitionKey; @@ -58,10 +62,12 @@ import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -957,6 +963,216 @@ class ShareCoordinatorShardTest { verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME); } + @Test + public void testDeleteStateSuccess() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + + SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION); + + DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData() + .setGroupId(GROUP_ID) + .setTopics(Collections.singletonList(new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(TOPIC_ID) + .setPartitions(Collections.singletonList(new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(PARTITION))))); + + CoordinatorResult result = shard.deleteState(request); + + // apply a record in to verify delete + CoordinatorRecord record = ShareCoordinatorRecordHelpers.newShareSnapshotRecord( + GROUP_ID, + TOPIC_ID, + PARTITION, + new ShareGroupOffset.Builder() + .setSnapshotEpoch(0) + .setStateEpoch(0) + .setLeaderEpoch(0) + .setStateBatches(List.of( + new PersisterStateBatch( + 0, + 10, + (byte) 0, + (short) 1 + ) + ) + ) + .build() + ); + shard.replay(0L, 0L, (short) 0, record); + assertNotNull(shard.getShareStateMapValue(shareCoordinatorKey)); + assertNotNull(shard.getLeaderMapValue(shareCoordinatorKey)); + assertNotNull(shard.getStateEpochMapValue(shareCoordinatorKey)); + + // apply tombstone + shard.replay(0L, 0L, (short) 0, result.records().get(0)); + + DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION); + List expectedRecords = List.of( + ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord( + GROUP_ID, TOPIC_ID, PARTITION) + ); + + assertEquals(expectedData, result.response()); + assertEquals(expectedRecords, result.records()); + + assertNull(shard.getShareStateMapValue(shareCoordinatorKey)); + assertNull(shard.getLeaderMapValue(shareCoordinatorKey)); + assertNull(shard.getStateEpochMapValue(shareCoordinatorKey)); + } + + @Test + public void testDeleteStateFirstRecordDeleteSuccess() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + + SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION); + + DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData() + .setGroupId(GROUP_ID) + .setTopics(Collections.singletonList(new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(TOPIC_ID) + .setPartitions(Collections.singletonList(new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(PARTITION))))); + + CoordinatorResult result = shard.deleteState(request); + + assertNull(shard.getShareStateMapValue(shareCoordinatorKey)); + assertNull(shard.getLeaderMapValue(shareCoordinatorKey)); + assertNull(shard.getStateEpochMapValue(shareCoordinatorKey)); + + // apply tombstone + shard.replay(0L, 0L, (short) 0, result.records().get(0)); + + DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION); + List expectedRecords = List.of( + ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord( + GROUP_ID, TOPIC_ID, PARTITION) + ); + + assertEquals(expectedData, result.response()); + assertEquals(expectedRecords, result.records()); + + assertNull(shard.getShareStateMapValue(shareCoordinatorKey)); + assertNull(shard.getLeaderMapValue(shareCoordinatorKey)); + assertNull(shard.getStateEpochMapValue(shareCoordinatorKey)); + } + + @Test + public void testDeleteStateInvalidRequestData() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + + // invalid partition + int partition = -1; + + DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData() + .setGroupId(GROUP_ID) + .setTopics(List.of(new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(partition))))); + + CoordinatorResult result = shard.deleteState(request); + + DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toErrorResponseData( + TOPIC_ID, partition, Errors.INVALID_REQUEST, ShareCoordinatorShard.NEGATIVE_PARTITION_ID.getMessage()); + List expectedRecords = List.of(); + + assertEquals(expectedData, result.response()); + assertEquals(expectedRecords, result.records()); + assertEquals(expectedRecords, result.records()); + } + + @Test + public void testDeleteNullMetadataImage() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + shard.onNewMetadataImage(null, null); + + DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData() + .setGroupId(GROUP_ID) + .setTopics(List.of(new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(0))))); + + CoordinatorResult result = shard.deleteState(request); + + DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toErrorResponseData( + TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.UNKNOWN_TOPIC_OR_PARTITION.message()); + List expectedRecords = List.of(); + + assertEquals(expectedData, result.response()); + assertEquals(expectedRecords, result.records()); + } + + @Test + public void testDeleteTopicIdNonExistentInMetadataImage() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + MetadataImage image = mock(MetadataImage.class); + shard.onNewMetadataImage(image, null); + + DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData() + .setGroupId(GROUP_ID) + .setTopics(List.of(new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(0))))); + + // topic id not found in cache + TopicsImage topicsImage = mock(TopicsImage.class); + when(topicsImage.getTopic(eq(TOPIC_ID))).thenReturn( + null + ); + when(image.topics()).thenReturn( + topicsImage + ); + CoordinatorResult result = shard.deleteState(request); + + DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toErrorResponseData( + TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.UNKNOWN_TOPIC_OR_PARTITION.message()); + List expectedRecords = List.of(); + + assertEquals(expectedData, result.response()); + assertEquals(expectedRecords, result.records()); + verify(topicsImage, times(1)).getTopic(eq(TOPIC_ID)); + } + + @Test + public void testDeletePartitionIdNonExistentInMetadataImage() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + MetadataImage image = mock(MetadataImage.class); + shard.onNewMetadataImage(image, null); + + DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData() + .setGroupId(GROUP_ID) + .setTopics(List.of(new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(0))))); + + // topic id found in cache + TopicsImage topicsImage = mock(TopicsImage.class); + when(topicsImage.getTopic(eq(TOPIC_ID))).thenReturn( + mock(TopicImage.class) + ); + when(image.topics()).thenReturn( + topicsImage + ); + + // partition id not found + when(topicsImage.getPartition(eq(TOPIC_ID), eq(0))).thenReturn( + null + ); + CoordinatorResult result = shard.deleteState(request); + + DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toErrorResponseData( + TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.UNKNOWN_TOPIC_OR_PARTITION.message()); + List expectedRecords = List.of(); + + assertEquals(expectedData, result.response()); + assertEquals(expectedRecords, result.records()); + verify(topicsImage, times(1)).getTopic(eq(TOPIC_ID)); + verify(topicsImage, times(1)).getPartition(eq(TOPIC_ID), eq(0)); + } + private static ShareGroupOffset groupOffset(ApiMessage record) { if (record instanceof ShareSnapshotValue) { return ShareGroupOffset.fromRecord((ShareSnapshotValue) record);