From 4f28973bd19cb94986415765f55cf5ab5bfae3e6 Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Sat, 22 Feb 2025 21:42:08 +0530 Subject: [PATCH] KAFKA-18827: Initialize share state, share coordinator impl. [1/N] (#18968) In this PR, we have added the share coordinator and KafkaApis side impl of the intialize share group state RPC. ref: https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka#KIP932:QueuesforKafka-InitializeShareGroupStateAPI Reviewers: Andrew Schofield --- .../InitializeShareGroupStateRequest.java | 22 +- .../InitializeShareGroupStateResponse.java | 70 +++- .../main/scala/kafka/server/KafkaApis.scala | 30 +- .../unit/kafka/server/KafkaApisTest.scala | 121 ++++++ .../coordinator/share/ShareCoordinator.java | 10 + .../share/ShareCoordinatorService.java | 146 ++++++- .../share/ShareCoordinatorShard.java | 205 +++++++--- .../coordinator/share/ShareGroupOffset.java | 21 +- .../share/ShareCoordinatorServiceTest.java | 361 ++++++++++++++---- .../share/ShareCoordinatorShardTest.java | 181 +++++++++ 10 files changed, 1007 insertions(+), 160 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateRequest.java index fc9abc71613..15d50acaa37 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateRequest.java @@ -34,7 +34,7 @@ public class InitializeShareGroupStateRequest extends AbstractRequest { private final InitializeShareGroupStateRequestData data; public Builder(InitializeShareGroupStateRequestData data) { - this(data, false); + this(data, true); } public Builder(InitializeShareGroupStateRequestData data, boolean enableUnstableLastVersion) { @@ -64,15 +64,15 @@ public class InitializeShareGroupStateRequest extends AbstractRequest { public InitializeShareGroupStateResponse getErrorResponse(int throttleTimeMs, Throwable e) { List results = new ArrayList<>(); data.topics().forEach( - topicResult -> results.add(new InitializeShareGroupStateResponseData.InitializeStateResult() - .setTopicId(topicResult.topicId()) - .setPartitions(topicResult.partitions().stream() - .map(partitionData -> new InitializeShareGroupStateResponseData.PartitionResult() - .setPartition(partitionData.partition()) - .setErrorCode(Errors.forException(e).code())) - .collect(Collectors.toList())))); + topicResult -> results.add(new InitializeShareGroupStateResponseData.InitializeStateResult() + .setTopicId(topicResult.topicId()) + .setPartitions(topicResult.partitions().stream() + .map(partitionData -> new InitializeShareGroupStateResponseData.PartitionResult() + .setPartition(partitionData.partition()) + .setErrorCode(Errors.forException(e).code())) + .collect(Collectors.toList())))); return new InitializeShareGroupStateResponse(new InitializeShareGroupStateResponseData() - .setResults(results)); + .setResults(results)); } @Override @@ -82,8 +82,8 @@ public class InitializeShareGroupStateRequest extends AbstractRequest { public static InitializeShareGroupStateRequest parse(ByteBuffer buffer, short version) { return new InitializeShareGroupStateRequest( - new InitializeShareGroupStateRequestData(new ByteBufferAccessor(buffer), version), - version + new InitializeShareGroupStateRequestData(new ByteBufferAccessor(buffer), version), + version ); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateResponse.java index 44880c2cb86..be88f0944d0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateResponse.java @@ -17,13 +17,17 @@ package org.apache.kafka.common.requests; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.InitializeShareGroupStateRequestData; import org.apache.kafka.common.message.InitializeShareGroupStateResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; public class InitializeShareGroupStateResponse extends AbstractResponse { @@ -43,9 +47,9 @@ public class InitializeShareGroupStateResponse extends AbstractResponse { public Map errorCounts() { Map counts = new HashMap<>(); data.results().forEach( - result -> result.partitions().forEach( - partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode())) - ) + result -> result.partitions().forEach( + partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode())) + ) ); return counts; } @@ -62,7 +66,65 @@ public class InitializeShareGroupStateResponse extends AbstractResponse { public static InitializeShareGroupStateResponse parse(ByteBuffer buffer, short version) { return new InitializeShareGroupStateResponse( - new InitializeShareGroupStateResponseData(new ByteBufferAccessor(buffer), version) + new InitializeShareGroupStateResponseData(new ByteBufferAccessor(buffer), version) ); } + + public static InitializeShareGroupStateResponseData toGlobalErrorResponse(InitializeShareGroupStateRequestData request, Errors error) { + List initStateResults = new ArrayList<>(); + request.topics().forEach(topicData -> { + List partitionResults = new ArrayList<>(); + topicData.partitions().forEach(partitionData -> partitionResults.add( + toErrorResponsePartitionResult(partitionData.partition(), error, error.message())) + ); + initStateResults.add(toResponseInitializeStateResult(topicData.topicId(), partitionResults)); + }); + return new InitializeShareGroupStateResponseData().setResults(initStateResults); + } + + public static InitializeShareGroupStateResponseData.PartitionResult toErrorResponsePartitionResult( + int partitionId, + Errors error, + String errorMessage + ) { + return new InitializeShareGroupStateResponseData.PartitionResult() + .setPartition(partitionId) + .setErrorCode(error.code()) + .setErrorMessage(errorMessage); + } + + public static InitializeShareGroupStateResponseData.InitializeStateResult toResponseInitializeStateResult( + Uuid topicId, + List partitionResults + ) { + return new InitializeShareGroupStateResponseData.InitializeStateResult() + .setTopicId(topicId) + .setPartitions(partitionResults); + } + + public static InitializeShareGroupStateResponseData toErrorResponseData(Uuid topicId, int partitionId, Errors error, String errorMessage) { + return new InitializeShareGroupStateResponseData().setResults(List.of( + new InitializeShareGroupStateResponseData.InitializeStateResult() + .setTopicId(topicId) + .setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult() + .setPartition(partitionId) + .setErrorCode(error.code()) + .setErrorMessage(errorMessage))) + )); + } + + public static InitializeShareGroupStateResponseData.PartitionResult toResponsePartitionResult(int partitionId) { + return new InitializeShareGroupStateResponseData.PartitionResult().setPartition(partitionId); + } + + public static InitializeShareGroupStateResponseData toResponseData(Uuid topicId, int partitionId) { + return new InitializeShareGroupStateResponseData().setResults(List.of( + new InitializeShareGroupStateResponseData.InitializeStateResult() + .setTopicId(topicId) + .setPartitions(List.of( + new InitializeShareGroupStateResponseData.PartitionResult() + .setPartition(partitionId) + )) + )); + } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 7adf9c7c3cb..aa1f392506e 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -3100,11 +3100,33 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleInitializeShareGroupStateRequest(request: RequestChannel.Request): Unit = { + def handleInitializeShareGroupStateRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { val initializeShareGroupStateRequest = request.body[InitializeShareGroupStateRequest] - // TODO: Implement the InitializeShareGroupStateRequest handling - requestHelper.sendMaybeThrottle(request, initializeShareGroupStateRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) + + if (!authorizeClusterOperation(request, CLUSTER_ACTION)) { + requestHelper.sendMaybeThrottle(request, new InitializeShareGroupStateResponse( + InitializeShareGroupStateResponse.toGlobalErrorResponse( + initializeShareGroupStateRequest.data(), + Errors.CLUSTER_AUTHORIZATION_FAILED + ))) + return CompletableFuture.completedFuture[Unit](()) + } + + shareCoordinator match { + case None => requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => + initializeShareGroupStateRequest.getErrorResponse(requestThrottleMs, + new ApiException("Share coordinator is not enabled."))) + CompletableFuture.completedFuture[Unit](()) + + case Some(coordinator) => coordinator.initializeState(request.context, initializeShareGroupStateRequest.data) + .handle[Unit] { (response, exception) => + if (exception != null) { + requestHelper.sendMaybeThrottle(request, initializeShareGroupStateRequest.getErrorResponse(exception)) + } else { + requestHelper.sendMaybeThrottle(request, new InitializeShareGroupStateResponse(response)) + } + } + } } def handleReadShareGroupStateRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 0670bf0b36d..5f30cfd83e3 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -10879,6 +10879,98 @@ class KafkaApisTest extends Logging { }) } + @Test + def testInitializeShareGroupStateSuccess(): Unit = { + val topicId = Uuid.randomUuid(); + val initRequestData = new InitializeShareGroupStateRequestData() + .setGroupId("group1") + .setTopics(List( + new InitializeShareGroupStateRequestData.InitializeStateData() + .setTopicId(topicId) + .setPartitions(List( + new InitializeShareGroupStateRequestData.PartitionData() + .setPartition(1) + .setStateEpoch(0) + ).asJava) + ).asJava) + + val initStateResultData: util.List[InitializeShareGroupStateResponseData.InitializeStateResult] = List( + new InitializeShareGroupStateResponseData.InitializeStateResult() + .setTopicId(topicId) + .setPartitions(List( + new InitializeShareGroupStateResponseData.PartitionResult() + .setPartition(1) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage(null) + ).asJava) + ).asJava + + val config = Map( + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true", + ) + + val response = getInitializeShareGroupResponse( + initRequestData, + config ++ ShareCoordinatorTestConfig.testConfigMap().asScala, + verifyNoErr = true, + null, + initStateResultData + ) + + assertNotNull(response.data) + assertEquals(1, response.data.results.size) + } + + @Test + def testInitializeShareGroupStateAuthorizationFailed(): Unit = { + val topicId = Uuid.randomUuid(); + val initRequestData = new InitializeShareGroupStateRequestData() + .setGroupId("group1") + .setTopics(List( + new InitializeShareGroupStateRequestData.InitializeStateData() + .setTopicId(topicId) + .setPartitions(List( + new InitializeShareGroupStateRequestData.PartitionData() + .setPartition(1) + .setStateEpoch(0) + ).asJava) + ).asJava) + + val initStateResultData: util.List[InitializeShareGroupStateResponseData.InitializeStateResult] = List( + new InitializeShareGroupStateResponseData.InitializeStateResult() + .setTopicId(topicId) + .setPartitions(List( + new InitializeShareGroupStateResponseData.PartitionResult() + .setPartition(1) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage(null) + ).asJava) + ).asJava + + val authorizer: Authorizer = mock(classOf[Authorizer]) + when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) + .thenReturn(Seq(AuthorizationResult.DENIED).asJava, Seq(AuthorizationResult.ALLOWED).asJava) + + val config = Map( + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true", + ) + + val response = getInitializeShareGroupResponse( + initRequestData, + config ++ ShareCoordinatorTestConfig.testConfigMap().asScala, + verifyNoErr = false, + authorizer, + initStateResultData + ) + + assertNotNull(response.data) + assertEquals(1, response.data.results.size) + response.data.results.forEach(deleteResult => { + assertEquals(1, deleteResult.partitions.size) + assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED.code(), deleteResult.partitions.get(0).errorCode()) + }) + } + def getShareGroupDescribeResponse(groupIds: util.List[String], configOverrides: Map[String, String] = Map.empty, verifyNoErr: Boolean = true, authorizer: Authorizer = null, describedGroups: util.List[ShareGroupDescribeResponseData.DescribedGroup]): ShareGroupDescribeResponse = { @@ -11024,4 +11116,33 @@ class KafkaApisTest extends Logging { } response } + + def getInitializeShareGroupResponse(requestData: InitializeShareGroupStateRequestData, configOverrides: Map[String, String] = Map.empty, + verifyNoErr: Boolean = true, authorizer: Authorizer = null, + initStateResult: util.List[InitializeShareGroupStateResponseData.InitializeStateResult]): InitializeShareGroupStateResponse = { + val requestChannelRequest = buildRequest(new InitializeShareGroupStateRequest.Builder(requestData, true).build()) + + val future = new CompletableFuture[InitializeShareGroupStateResponseData]() + when(shareCoordinator.initializeState( + any[RequestContext], + any[InitializeShareGroupStateRequestData] + )).thenReturn(future) + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + kafkaApis = createKafkaApis( + overrideProperties = configOverrides, + authorizer = Option(authorizer), + ) + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching()) + + future.complete(new InitializeShareGroupStateResponseData() + .setResults(initStateResult)) + + val response = verifyNoThrottling[InitializeShareGroupStateResponse](requestChannelRequest) + if (verifyNoErr) { + val expectedInitShareGroupStateResponseData = new InitializeShareGroupStateResponseData() + .setResults(initStateResult) + assertEquals(expectedInitShareGroupStateResponseData, response.data) + } + response + } } diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java index 61e96e37f07..f04cb1385f0 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java @@ -19,6 +19,8 @@ package org.apache.kafka.coordinator.share; import org.apache.kafka.common.message.DeleteShareGroupStateRequestData; import org.apache.kafka.common.message.DeleteShareGroupStateResponseData; +import org.apache.kafka.common.message.InitializeShareGroupStateRequestData; +import org.apache.kafka.common.message.InitializeShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateRequestData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData; @@ -96,6 +98,14 @@ public interface ShareCoordinator { */ CompletableFuture deleteState(RequestContext context, DeleteShareGroupStateRequestData request); + /** + * Handle initialize share group state call + * @param context - represents the incoming initialize share group request context + * @param request - actual RPC request object + * @return completable future representing initialize share group RPC response data + */ + CompletableFuture initializeState(RequestContext context, InitializeShareGroupStateRequestData request); + /** * Called when new coordinator is elected * @param partitionIndex - The partition index (internal topic) diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java index 87071b3f97d..764e008136e 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java @@ -24,6 +24,8 @@ import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.message.DeleteShareGroupStateRequestData; import org.apache.kafka.common.message.DeleteShareGroupStateResponseData; +import org.apache.kafka.common.message.InitializeShareGroupStateRequestData; +import org.apache.kafka.common.message.InitializeShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateRequestData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData; @@ -32,6 +34,7 @@ import org.apache.kafka.common.message.WriteShareGroupStateRequestData; import org.apache.kafka.common.message.WriteShareGroupStateResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.DeleteShareGroupStateResponse; +import org.apache.kafka.common.requests.InitializeShareGroupStateResponse; import org.apache.kafka.common.requests.ReadShareGroupStateResponse; import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse; import org.apache.kafka.common.requests.RequestContext; @@ -70,10 +73,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.IntSupplier; -import java.util.stream.Collectors; import static org.apache.kafka.coordinator.common.runtime.CoordinatorOperationExceptionHelper.handleOperationException; +@SuppressWarnings("ClassDataAbstractionCoupling") public class ShareCoordinatorService implements ShareCoordinator { private final ShareCoordinatorConfig config; private final Logger log; @@ -682,11 +685,11 @@ public class ShareCoordinatorService implements ShareCoordinator { (topicId, topicEntry) -> { List partitionResults = new ArrayList<>(topicEntry.size()); topicEntry.forEach( - (partitionId, responseFuture) -> { + (partitionId, responseFut) -> { // ResponseFut would already be completed by now since we have used // CompletableFuture::allOf to create a combined future from the future map. partitionResults.add( - responseFuture.getNow(null).results().get(0).partitions().get(0) + responseFut.getNow(null).results().get(0).partitions().get(0) ); } ); @@ -792,11 +795,11 @@ public class ShareCoordinatorService implements ShareCoordinator { (topicId, topicEntry) -> { List partitionResults = new ArrayList<>(topicEntry.size()); topicEntry.forEach( - (partitionId, responseFuture) -> { + (partitionId, responseFut) -> { // ResponseFut would already be completed by now since we have used // CompletableFuture::allOf to create a combined future from the future map. partitionResults.add( - responseFuture.getNow(null).results().get(0).partitions().get(0) + responseFut.getNow(null).results().get(0).partitions().get(0) ); } ); @@ -808,6 +811,106 @@ public class ShareCoordinatorService implements ShareCoordinator { }); } + @Override + public CompletableFuture initializeState(RequestContext context, InitializeShareGroupStateRequestData request) { + // Send an empty response if the coordinator is not active. + if (!isActive.get()) { + return CompletableFuture.completedFuture( + generateErrorInitStateResponse( + request, + Errors.COORDINATOR_NOT_AVAILABLE, + "Share coordinator is not available." + ) + ); + } + + String groupId = request.groupId(); + // Send an empty response if groupId is invalid. + if (isGroupIdEmpty(groupId)) { + log.error("Group id must be specified and non-empty: {}", request); + return CompletableFuture.completedFuture( + new InitializeShareGroupStateResponseData() + ); + } + + // Send an empty response if topic data is empty. + if (isEmpty(request.topics())) { + log.error("Topic Data is empty: {}", request); + return CompletableFuture.completedFuture( + new InitializeShareGroupStateResponseData() + ); + } + + // A map to store the futures for each topicId and partition. + Map>> futureMap = new HashMap<>(); + + // The request received here could have multiple keys of structure group:topic:partition. However, + // the initializeState method in ShareCoordinatorShard expects a single key in the request. Hence, we will + // be looping over the keys below and constructing new InitializeShareGroupStateRequestData objects to pass + // onto the shard method. + + for (InitializeShareGroupStateRequestData.InitializeStateData topicData : request.topics()) { + Uuid topicId = topicData.topicId(); + for (InitializeShareGroupStateRequestData.PartitionData partitionData : topicData.partitions()) { + SharePartitionKey coordinatorKey = SharePartitionKey.getInstance(request.groupId(), topicId, partitionData.partition()); + + InitializeShareGroupStateRequestData requestForCurrentPartition = new InitializeShareGroupStateRequestData() + .setGroupId(groupId) + .setTopics(List.of(new InitializeShareGroupStateRequestData.InitializeStateData() + .setTopicId(topicId) + .setPartitions(List.of(partitionData)))); + + CompletableFuture initializeFuture = runtime.scheduleWriteOperation( + "initialize-share-group-state", + topicPartitionFor(coordinatorKey), + Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()), + coordinator -> coordinator.initializeState(requestForCurrentPartition) + ).exceptionally(initializeException -> + handleOperationException( + "initialize-share-group-state", + request, + initializeException, + (error, message) -> InitializeShareGroupStateResponse.toErrorResponseData( + topicData.topicId(), + partitionData.partition(), + error, + "Unable to initialize share group state: " + initializeException.getMessage() + ), + log + )); + + futureMap.computeIfAbsent(topicId, k -> new HashMap<>()) + .put(partitionData.partition(), initializeFuture); + } + } + + // Combine all futures into a single CompletableFuture. + CompletableFuture combinedFuture = CompletableFuture.allOf(futureMap.values().stream() + .flatMap(map -> map.values().stream()).toArray(CompletableFuture[]::new)); + + // Transform the combined CompletableFuture into CompletableFuture. + return combinedFuture.thenApply(v -> { + List initializeStateResult = new ArrayList<>(futureMap.size()); + futureMap.forEach( + (topicId, topicEntry) -> { + List partitionResults = new ArrayList<>(topicEntry.size()); + topicEntry.forEach( + (partitionId, responseFut) -> { + // ResponseFut would already be completed by now since we have used + // CompletableFuture::allOf to create a combined future from the future map. + partitionResults.add( + responseFut.getNow(null).results().get(0).partitions().get(0) + ); + } + ); + initializeStateResult.add(InitializeShareGroupStateResponse.toResponseInitializeStateResult(topicId, partitionResults)); + } + ); + return new InitializeShareGroupStateResponseData() + .setResults(initializeStateResult); + }); + } + private ReadShareGroupStateResponseData generateErrorReadStateResponse( ReadShareGroupStateRequestData request, Errors error, @@ -820,9 +923,9 @@ public class ShareCoordinatorService implements ShareCoordinator { resultData.setPartitions(topicData.partitions().stream() .map(partitionData -> ReadShareGroupStateResponse.toErrorResponsePartitionResult( partitionData.partition(), error, errorMessage - )).collect(Collectors.toList())); + )).toList()); return resultData; - }).collect(Collectors.toList())); + }).toList()); } private ReadShareGroupStateSummaryResponseData generateErrorReadStateSummaryResponse( @@ -837,9 +940,9 @@ public class ShareCoordinatorService implements ShareCoordinator { resultData.setPartitions(topicData.partitions().stream() .map(partitionData -> ReadShareGroupStateSummaryResponse.toErrorResponsePartitionResult( partitionData.partition(), error, errorMessage - )).collect(Collectors.toList())); + )).toList()); return resultData; - }).collect(Collectors.toList())); + }).toList()); } private WriteShareGroupStateResponseData generateErrorWriteStateResponse( @@ -855,9 +958,9 @@ public class ShareCoordinatorService implements ShareCoordinator { resultData.setPartitions(topicData.partitions().stream() .map(partitionData -> WriteShareGroupStateResponse.toErrorResponsePartitionResult( partitionData.partition(), error, errorMessage - )).collect(Collectors.toList())); + )).toList()); return resultData; - }).collect(Collectors.toList())); + }).toList()); } private DeleteShareGroupStateResponseData generateErrorDeleteStateResponse( @@ -872,9 +975,26 @@ public class ShareCoordinatorService implements ShareCoordinator { resultData.setPartitions(topicData.partitions().stream() .map(partitionData -> DeleteShareGroupStateResponse.toErrorResponsePartitionResult( partitionData.partition(), error, errorMessage - )).collect(Collectors.toList())); + )).toList()); return resultData; - }).collect(Collectors.toList())); + }).toList()); + } + + private InitializeShareGroupStateResponseData generateErrorInitStateResponse( + InitializeShareGroupStateRequestData request, + Errors error, + String errorMessage + ) { + return new InitializeShareGroupStateResponseData().setResults(request.topics().stream() + .map(topicData -> { + InitializeShareGroupStateResponseData.InitializeStateResult resultData = new InitializeShareGroupStateResponseData.InitializeStateResult(); + resultData.setTopicId(topicData.topicId()); + resultData.setPartitions(topicData.partitions().stream() + .map(partitionData -> InitializeShareGroupStateResponse.toErrorResponsePartitionResult( + partitionData.partition(), error, errorMessage + )).toList()); + return resultData; + }).toList()); } private static boolean isGroupIdEmpty(String groupId) { diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java index a71a911907a..1022a36fb65 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java @@ -22,6 +22,8 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.DeleteShareGroupStateRequestData; import org.apache.kafka.common.message.DeleteShareGroupStateResponseData; +import org.apache.kafka.common.message.InitializeShareGroupStateRequestData; +import org.apache.kafka.common.message.InitializeShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateRequestData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData; @@ -31,6 +33,7 @@ import org.apache.kafka.common.message.WriteShareGroupStateResponseData; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.DeleteShareGroupStateResponse; +import org.apache.kafka.common.requests.InitializeShareGroupStateResponse; import org.apache.kafka.common.requests.ReadShareGroupStateResponse; import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse; import org.apache.kafka.common.requests.TransactionResult; @@ -66,7 +69,6 @@ import org.slf4j.Logger; import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; public class ShareCoordinatorShard implements CoordinatorShard { private final Logger log; @@ -317,16 +319,12 @@ public class ShareCoordinatorShard implements CoordinatorShard(Collections.singletonList(record), responseData); } @@ -346,7 +344,7 @@ public class ShareCoordinatorShard implements CoordinatorShard error = maybeGetReadStateError(request); if (error.isPresent()) { - return new CoordinatorResult<>(Collections.emptyList(), error.get()); + return new CoordinatorResult<>(List.of(), error.get()); } ReadShareGroupStateRequestData.ReadStateData topicData = request.topics().get(0); @@ -366,7 +364,7 @@ public class ShareCoordinatorShard implements CoordinatorShard(Collections.emptyList(), responseData); + return new CoordinatorResult<>(List.of(), responseData); } // It is OK to info log this since this reaching this codepoint should be quite infrequent. @@ -406,7 +404,7 @@ public class ShareCoordinatorShard implements CoordinatorShard(Collections.emptyList(), responseData); + return new CoordinatorResult<>(List.of(), responseData); } /** @@ -482,7 +480,7 @@ public class ShareCoordinatorShard implements CoordinatorShard, CoordinatorRecord> lastRedundantOffset() { return new CoordinatorResult<>( - Collections.emptyList(), + List.of(), this.offsetsManager.lastRedundantOffset() ); } @@ -494,7 +492,7 @@ public class ShareCoordinatorShard implements CoordinatorShard(Collections.singletonList(record), responseData); } + /** + * This method writes a share snapshot records corresponding to the requested topic partitions. + *

+ * This method as called by the ShareCoordinatorService will be provided with + * the request data which covers only key i.e. group1:topic1:partition1. The implementation + * below was done keeping this in mind. + * + * @param request - InitializeShareGroupStateRequestData for a single key + * @return CoordinatorResult(records, response) + */ + + public CoordinatorResult initializeState( + InitializeShareGroupStateRequestData request + ) { + // Records to write (with both key and value of snapshot type), response to caller + // only one key will be there in the request by design. + Optional> error = maybeGetInitializeStateError(request); + if (error.isPresent()) { + return error.get(); + } + + InitializeShareGroupStateRequestData.InitializeStateData topicData = request.topics().get(0); + InitializeShareGroupStateRequestData.PartitionData partitionData = topicData.partitions().get(0); + SharePartitionKey key = SharePartitionKey.getInstance(request.groupId(), topicData.topicId(), partitionData.partition()); + + CoordinatorRecord record = generateInitializeStateRecord(partitionData, key); + // build successful response if record is correctly created + InitializeShareGroupStateResponseData responseData = new InitializeShareGroupStateResponseData().setResults( + List.of(InitializeShareGroupStateResponse.toResponseInitializeStateResult(key.topicId(), + List.of(InitializeShareGroupStateResponse.toResponsePartitionResult( + key.partition())) + )) + ); + + return new CoordinatorResult<>(List.of(record), responseData); + } + /** * Util method to generate a ShareSnapshot or ShareUpdate type record for a key, based on various conditions. *

@@ -555,7 +584,7 @@ public class ShareCoordinatorShard implements CoordinatorShard= config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) { ShareGroupOffset currentState = shareStateMap.get(key); // shareStateMap will have the entry as containsKey is true @@ -587,7 +616,7 @@ public class ShareCoordinatorShard implements CoordinatorShard mergeBatches( List soFar, WriteShareGroupStateRequestData.PartitionData partitionData) { @@ -609,15 +656,13 @@ public class ShareCoordinatorShard implements CoordinatorShard mergeBatches( List soFar, WriteShareGroupStateRequestData.PartitionData partitionData, - long startOffset) { - return new PersisterStateBatchCombiner( - soFar, - partitionData.stateBatches().stream() - .map(PersisterStateBatch::from) - .collect(Collectors.toList()), + long startOffset + ) { + return new PersisterStateBatchCombiner(soFar, partitionData.stateBatches().stream() + .map(PersisterStateBatch::from) + .toList(), startOffset - ) - .combineStateBatches(); + ).combineStateBatches(); } private Optional> maybeGetWriteStateError( @@ -631,30 +676,30 @@ public class ShareCoordinatorShard implements CoordinatorShard partitionData.leaderEpoch()) { log.error("Request leader epoch smaller than last recorded."); - return Optional.of(getWriteErrorResponse(Errors.FENCED_LEADER_EPOCH, null, topicId, partitionId)); + return Optional.of(getWriteErrorCoordinatorResult(Errors.FENCED_LEADER_EPOCH, null, topicId, partitionId)); } if (partitionData.stateEpoch() != -1 && stateEpochMap.containsKey(mapKey) && stateEpochMap.get(mapKey) > partitionData.stateEpoch()) { log.error("Request state epoch smaller than last recorded."); - return Optional.of(getWriteErrorResponse(Errors.FENCED_STATE_EPOCH, null, topicId, partitionId)); + return Optional.of(getWriteErrorCoordinatorResult(Errors.FENCED_STATE_EPOCH, null, topicId, partitionId)); } if (metadataImage == null) { log.error("Metadata image is null"); - return Optional.of(getWriteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, topicId, partitionId)); + return Optional.of(getWriteErrorCoordinatorResult(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, topicId, partitionId)); } if (metadataImage.topics().getTopic(topicId) == null || metadataImage.topics().getPartition(topicId, partitionId) == null) { log.error("Topic/TopicPartition not found in metadata image."); - return Optional.of(getWriteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, topicId, partitionId)); + return Optional.of(getWriteErrorCoordinatorResult(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, topicId, partitionId)); } return Optional.empty(); @@ -743,28 +788,65 @@ public class ShareCoordinatorShard implements CoordinatorShard getWriteErrorResponse( + private Optional> maybeGetInitializeStateError( + InitializeShareGroupStateRequestData request + ) { + InitializeShareGroupStateRequestData.InitializeStateData topicData = request.topics().get(0); + InitializeShareGroupStateRequestData.PartitionData partitionData = topicData.partitions().get(0); + + Uuid topicId = topicData.topicId(); + int partitionId = partitionData.partition(); + + if (topicId == null) { + return Optional.of(getInitializeErrorCoordinatorResult(Errors.INVALID_REQUEST, NULL_TOPIC_ID, null, partitionId)); + } + + if (partitionId < 0) { + return Optional.of(getInitializeErrorCoordinatorResult(Errors.INVALID_REQUEST, NEGATIVE_PARTITION_ID, topicId, partitionId)); + } + + SharePartitionKey key = SharePartitionKey.getInstance(request.groupId(), topicId, partitionId); + if (partitionData.stateEpoch() != -1 && stateEpochMap.containsKey(key) && stateEpochMap.get(key) > partitionData.stateEpoch()) { + log.error("Initialize request state epoch smaller than last recorded."); + return Optional.of(getInitializeErrorCoordinatorResult(Errors.FENCED_STATE_EPOCH, Errors.FENCED_STATE_EPOCH.exception(), topicId, partitionId)); + } + + if (metadataImage == null) { + log.error("Metadata image is null"); + return Optional.of(getInitializeErrorCoordinatorResult(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, topicId, partitionId)); + } + + if (metadataImage.topics().getTopic(topicId) == null || + metadataImage.topics().getPartition(topicId, partitionId) == null) { + log.error("Topic/TopicPartition not found in metadata image."); + return Optional.of(getInitializeErrorCoordinatorResult(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, topicId, partitionId)); + } + + return Optional.empty(); + } + + private CoordinatorResult getWriteErrorCoordinatorResult( Errors error, Exception exception, Uuid topicId, @@ -772,10 +854,10 @@ public class ShareCoordinatorShard implements CoordinatorShard(Collections.emptyList(), responseData); + return new CoordinatorResult<>(List.of(), responseData); } - private CoordinatorResult getDeleteErrorResponse( + private CoordinatorResult getDeleteErrorCoordinatorResult( Errors error, Exception exception, Uuid topicId, @@ -783,7 +865,18 @@ public class ShareCoordinatorShard implements CoordinatorShard(Collections.emptyList(), responseData); + return new CoordinatorResult<>(List.of(), responseData); + } + + private CoordinatorResult getInitializeErrorCoordinatorResult( + Errors error, + Exception exception, + Uuid topicId, + int partitionId + ) { + String message = exception == null ? error.message() : exception.getMessage(); + InitializeShareGroupStateResponseData responseData = InitializeShareGroupStateResponse.toErrorResponseData(topicId, partitionId, error, message); + return new CoordinatorResult<>(List.of(), responseData); } // Visible for testing @@ -820,7 +913,7 @@ public class ShareCoordinatorShard implements CoordinatorShard stateBatchAsSet() { diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java index 8738db4dd9a..399643e32a9 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java @@ -26,6 +26,8 @@ import org.apache.kafka.common.errors.CoordinatorNotAvailableException; import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.message.DeleteShareGroupStateRequestData; import org.apache.kafka.common.message.DeleteShareGroupStateResponseData; +import org.apache.kafka.common.message.InitializeShareGroupStateRequestData; +import org.apache.kafka.common.message.InitializeShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateRequestData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData; @@ -51,7 +53,6 @@ import org.apache.kafka.server.util.timer.Timer; import org.junit.jupiter.api.Test; import java.time.Duration; -import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -133,7 +134,7 @@ class ShareCoordinatorServiceTest { WriteShareGroupStateRequestData request = new WriteShareGroupStateRequestData() .setGroupId(groupId) - .setTopics(Arrays.asList( + .setTopics(List.of( new WriteShareGroupStateRequestData.WriteStateData() .setTopicId(topicId1) .setPartitions(List.of( @@ -199,7 +200,7 @@ class ShareCoordinatorServiceTest { HashSet result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results()); - HashSet expectedResult = new HashSet<>(Arrays.asList( + HashSet expectedResult = new HashSet<>(List.of( new WriteShareGroupStateResponseData.WriteStateResult() .setTopicId(topicId2) .setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult() @@ -210,7 +211,7 @@ class ShareCoordinatorServiceTest { .setPartition(partition1))))); assertEquals(expectedResult, result); verify(time, times(2)).hiResClockMs(); - Set expectedMetrics = new HashSet<>(Arrays.asList( + Set expectedMetrics = new HashSet<>(List.of( metrics.metricName("write-latency-avg", ShareCoordinatorMetrics.METRICS_GROUP), metrics.metricName("write-latency-max", ShareCoordinatorMetrics.METRICS_GROUP), metrics.metricName("write-rate", ShareCoordinatorMetrics.METRICS_GROUP), @@ -243,7 +244,7 @@ class ShareCoordinatorServiceTest { ReadShareGroupStateRequestData request = new ReadShareGroupStateRequestData() .setGroupId(groupId) - .setTopics(Arrays.asList( + .setTopics(List.of( new ReadShareGroupStateRequestData.ReadStateData() .setTopicId(topicId1) .setPartitions(List.of( @@ -283,7 +284,7 @@ class ShareCoordinatorServiceTest { .setErrorCode(Errors.NONE.code()) .setStateEpoch(1) .setStartOffset(0) - .setStateBatches(Arrays.asList( + .setStateBatches(List.of( new ReadShareGroupStateResponseData.StateBatch() .setFirstOffset(0) .setLastOffset(10) @@ -315,7 +316,7 @@ class ShareCoordinatorServiceTest { HashSet result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results()); - HashSet expectedResult = new HashSet<>(Arrays.asList( + HashSet expectedResult = new HashSet<>(List.of( topicData1, topicData2)); assertEquals(expectedResult, result); @@ -345,23 +346,20 @@ class ShareCoordinatorServiceTest { ReadShareGroupStateSummaryRequestData request = new ReadShareGroupStateSummaryRequestData() .setGroupId(groupId) - .setTopics(Arrays.asList( - new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() - .setTopicId(topicId1) - .setPartitions(List.of( - new ReadShareGroupStateSummaryRequestData.PartitionData() - .setPartition(partition1) - .setLeaderEpoch(1) - )), - new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() - .setTopicId(topicId2) - .setPartitions(List.of( - new ReadShareGroupStateSummaryRequestData.PartitionData() - .setPartition(partition2) - .setLeaderEpoch(1) - )) - ) - ); + .setTopics(List.of( + new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopicId(topicId1) + .setPartitions(List.of( + new ReadShareGroupStateSummaryRequestData.PartitionData() + .setPartition(partition1) + .setLeaderEpoch(1))), + new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopicId(topicId2) + .setPartitions(List.of( + new ReadShareGroupStateSummaryRequestData.PartitionData() + .setPartition(partition2) + .setLeaderEpoch(1))) + )); ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult topicData1 = new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() .setTopicId(topicId1) @@ -385,12 +383,12 @@ class ShareCoordinatorServiceTest { eq("read-share-group-state-summary"), eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)), any(), - any() - )) - .thenReturn(CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResponseData() - .setResults(List.of(topicData1)))) - .thenReturn(CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResponseData() - .setResults(List.of(topicData2)))); + any()) + ).thenReturn(CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResponseData() + .setResults(List.of(topicData1))) + ).thenReturn(CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResponseData() + .setResults(List.of(topicData2))) + ); CompletableFuture future = service.readStateSummary( requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY), @@ -399,7 +397,7 @@ class ShareCoordinatorServiceTest { HashSet result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results()); - HashSet expectedResult = new HashSet<>(Arrays.asList( + HashSet expectedResult = new HashSet<>(List.of( topicData1, topicData2)); assertEquals(expectedResult, result); @@ -432,21 +430,18 @@ class ShareCoordinatorServiceTest { DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData() .setGroupId(groupId) - .setTopics(Arrays.asList( - new DeleteShareGroupStateRequestData.DeleteStateData() - .setTopicId(topicId1) - .setPartitions(List.of( - new DeleteShareGroupStateRequestData.PartitionData() - .setPartition(partition1) - )), - new DeleteShareGroupStateRequestData.DeleteStateData() - .setTopicId(topicId2) - .setPartitions(List.of( - new DeleteShareGroupStateRequestData.PartitionData() - .setPartition(partition2) - )) - ) - ); + .setTopics(List.of( + new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(topicId1) + .setPartitions(List.of( + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(partition1))), + new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(topicId2) + .setPartitions(List.of( + new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(partition2))) + )); DeleteShareGroupStateResponseData response1 = new DeleteShareGroupStateResponseData() .setResults(List.of( @@ -469,9 +464,7 @@ class ShareCoordinatorServiceTest { eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)), eq(Duration.ofMillis(5000)), any() - )) - .thenReturn(CompletableFuture.completedFuture(response1)) - .thenReturn(CompletableFuture.completedFuture(response2)); + )).thenReturn(CompletableFuture.completedFuture(response1)).thenReturn(CompletableFuture.completedFuture(response2)); CompletableFuture future = service.deleteState( requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE), @@ -480,7 +473,7 @@ class ShareCoordinatorServiceTest { HashSet result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results()); - HashSet expectedResult = new HashSet<>(Arrays.asList( + HashSet expectedResult = new HashSet<>(List.of( new DeleteShareGroupStateResponseData.DeleteStateResult() .setTopicId(topicId2) .setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult() @@ -492,6 +485,91 @@ class ShareCoordinatorServiceTest { assertEquals(expectedResult, result); } + @Test + public void testInitializeStateSuccess() throws ExecutionException, InterruptedException, TimeoutException { + CoordinatorRuntime runtime = mockRuntime(); + Metrics metrics = new Metrics(); + ShareCoordinatorMetrics coordinatorMetrics = new ShareCoordinatorMetrics(metrics); + Time time = mock(Time.class); + ShareCoordinatorService service = new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorTestConfig.testConfig(), + runtime, + coordinatorMetrics, + time, + mock(Timer.class), + mock(PartitionWriter.class) + ); + + service.startup(() -> 1); + + String groupId = "group1"; + Uuid topicId1 = Uuid.randomUuid(); + int partition1 = 0; + + Uuid topicId2 = Uuid.randomUuid(); + int partition2 = 1; + + InitializeShareGroupStateRequestData request = new InitializeShareGroupStateRequestData() + .setGroupId(groupId) + .setTopics(List.of( + new InitializeShareGroupStateRequestData.InitializeStateData() + .setTopicId(topicId1) + .setPartitions(List.of( + new InitializeShareGroupStateRequestData.PartitionData() + .setPartition(partition1) + .setStartOffset(0) + .setStateEpoch(1))), + new InitializeShareGroupStateRequestData.InitializeStateData() + .setTopicId(topicId2) + .setPartitions(List.of( + new InitializeShareGroupStateRequestData.PartitionData() + .setPartition(partition2) + .setStartOffset(5) + .setStateEpoch(1))) + )); + + InitializeShareGroupStateResponseData response1 = new InitializeShareGroupStateResponseData().setResults(List.of( + new InitializeShareGroupStateResponseData.InitializeStateResult() + .setTopicId(topicId1) + .setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult() + .setPartition(partition1))) + )); + + InitializeShareGroupStateResponseData response2 = new InitializeShareGroupStateResponseData().setResults(List.of( + new InitializeShareGroupStateResponseData.InitializeStateResult() + .setTopicId(topicId2) + .setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult() + .setPartition(partition2))) + )); + + when(runtime.scheduleWriteOperation( + eq("initialize-share-group-state"), + eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)), + eq(Duration.ofMillis(5000)), + any())).thenReturn(CompletableFuture.completedFuture(response1)).thenReturn(CompletableFuture.completedFuture(response2) + ); + + CompletableFuture future = service.initializeState( + requestContext(ApiKeys.INITIALIZE_SHARE_GROUP_STATE), + request + ); + + HashSet result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results()); + + HashSet expectedResult = new HashSet<>(List.of( + new InitializeShareGroupStateResponseData.InitializeStateResult() + .setTopicId(topicId2) + .setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult() + .setPartition(partition2))), + new InitializeShareGroupStateResponseData.InitializeStateResult() + .setTopicId(topicId1) + .setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult() + .setPartition(partition1))) + )); + assertEquals(expectedResult, result); + } + @Test public void testWriteStateValidationsError() throws ExecutionException, InterruptedException, TimeoutException { CoordinatorRuntime runtime = mockRuntime(); @@ -680,6 +758,53 @@ class ShareCoordinatorServiceTest { ); } + @Test + public void testInitializeStateValidationError() throws ExecutionException, InterruptedException, TimeoutException { + CoordinatorRuntime runtime = mockRuntime(); + ShareCoordinatorService service = new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorTestConfig.testConfig(), + runtime, + new ShareCoordinatorMetrics(), + Time.SYSTEM, + mock(Timer.class), + mock(PartitionWriter.class) + ); + + service.startup(() -> 1); + + String groupId = "group1"; + Uuid topicId = Uuid.randomUuid(); + int partition = 0; + + // 1. Empty topicsData + assertEquals(new InitializeShareGroupStateResponseData(), + service.initializeState( + requestContext(ApiKeys.INITIALIZE_SHARE_GROUP_STATE), + new InitializeShareGroupStateRequestData().setGroupId(groupId) + ).get(5, TimeUnit.SECONDS) + ); + + // 2. Empty partitionsData + assertEquals(new InitializeShareGroupStateResponseData(), + service.initializeState( + requestContext(ApiKeys.INITIALIZE_SHARE_GROUP_STATE), + new InitializeShareGroupStateRequestData().setGroupId(groupId).setTopics(List.of( + new InitializeShareGroupStateRequestData.InitializeStateData().setTopicId(topicId))) + ).get(5, TimeUnit.SECONDS) + ); + + // 3. Invalid groupId + assertEquals(new InitializeShareGroupStateResponseData(), + service.initializeState( + requestContext(ApiKeys.INITIALIZE_SHARE_GROUP_STATE), + new InitializeShareGroupStateRequestData().setGroupId(null).setTopics(List.of( + new InitializeShareGroupStateRequestData.InitializeStateData().setTopicId(topicId).setPartitions(List.of( + new InitializeShareGroupStateRequestData.PartitionData().setPartition(partition))))) + ).get(5, TimeUnit.SECONDS) + ); + } + @Test public void testWriteStateWhenNotStarted() throws ExecutionException, InterruptedException, TimeoutException { CoordinatorRuntime runtime = mockRuntime(); @@ -702,7 +827,7 @@ class ShareCoordinatorServiceTest { WriteShareGroupStateRequestData request = new WriteShareGroupStateRequestData() .setGroupId(groupId) - .setTopics(Arrays.asList( + .setTopics(List.of( new WriteShareGroupStateRequestData.WriteStateData() .setTopicId(topicId1) .setPartitions(List.of( @@ -743,7 +868,7 @@ class ShareCoordinatorServiceTest { HashSet result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results()); - HashSet expectedResult = new HashSet<>(Arrays.asList( + HashSet expectedResult = new HashSet<>(List.of( new WriteShareGroupStateResponseData.WriteStateResult() .setTopicId(topicId2) .setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult() @@ -781,7 +906,7 @@ class ShareCoordinatorServiceTest { ReadShareGroupStateRequestData request = new ReadShareGroupStateRequestData() .setGroupId(groupId) - .setTopics(Arrays.asList( + .setTopics(List.of( new ReadShareGroupStateRequestData.ReadStateData() .setTopicId(topicId1) .setPartitions(List.of( @@ -806,7 +931,7 @@ class ShareCoordinatorServiceTest { HashSet result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results()); - HashSet expectedResult = new HashSet<>(Arrays.asList( + HashSet expectedResult = new HashSet<>(List.of( new ReadShareGroupStateResponseData.ReadStateResult() .setTopicId(topicId2) .setPartitions(List.of(new ReadShareGroupStateResponseData.PartitionResult() @@ -844,7 +969,7 @@ class ShareCoordinatorServiceTest { ReadShareGroupStateSummaryRequestData request = new ReadShareGroupStateSummaryRequestData() .setGroupId(groupId) - .setTopics(Arrays.asList( + .setTopics(List.of( new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() .setTopicId(topicId1) .setPartitions(List.of( @@ -869,7 +994,7 @@ class ShareCoordinatorServiceTest { HashSet result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results()); - HashSet expectedResult = new HashSet<>(Arrays.asList( + HashSet expectedResult = new HashSet<>(List.of( new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() .setTopicId(topicId2) .setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult() @@ -907,7 +1032,7 @@ class ShareCoordinatorServiceTest { DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData() .setGroupId(groupId) - .setTopics(Arrays.asList( + .setTopics(List.of( new DeleteShareGroupStateRequestData.DeleteStateData() .setTopicId(topicId1) .setPartitions(List.of( @@ -930,7 +1055,7 @@ class ShareCoordinatorServiceTest { HashSet result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results()); - HashSet expectedResult = new HashSet<>(Arrays.asList( + HashSet expectedResult = new HashSet<>(List.of( new DeleteShareGroupStateResponseData.DeleteStateResult() .setTopicId(topicId2) .setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult() @@ -946,6 +1071,66 @@ class ShareCoordinatorServiceTest { assertEquals(expectedResult, result); } + @Test + public void testInitializeStateWhenNotStarted() throws ExecutionException, InterruptedException, TimeoutException { + CoordinatorRuntime runtime = mockRuntime(); + ShareCoordinatorService service = new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorTestConfig.testConfig(), + runtime, + new ShareCoordinatorMetrics(), + Time.SYSTEM, + mock(Timer.class), + mock(PartitionWriter.class) + ); + + String groupId = "group1"; + Uuid topicId1 = Uuid.randomUuid(); + int partition1 = 0; + + Uuid topicId2 = Uuid.randomUuid(); + int partition2 = 1; + + InitializeShareGroupStateRequestData request = new InitializeShareGroupStateRequestData() + .setGroupId(groupId) + .setTopics(List.of( + new InitializeShareGroupStateRequestData.InitializeStateData() + .setTopicId(topicId1) + .setPartitions(List.of( + new InitializeShareGroupStateRequestData.PartitionData() + .setPartition(partition1) + )), + new InitializeShareGroupStateRequestData.InitializeStateData() + .setTopicId(topicId2) + .setPartitions(List.of( + new InitializeShareGroupStateRequestData.PartitionData() + .setPartition(partition2) + )) + )); + + CompletableFuture future = service.initializeState( + requestContext(ApiKeys.INITIALIZE_SHARE_GROUP_STATE), + request + ); + + HashSet result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results()); + + HashSet expectedResult = new HashSet<>(List.of( + new InitializeShareGroupStateResponseData.InitializeStateResult() + .setTopicId(topicId2) + .setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult() + .setPartition(partition2) + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + .setErrorMessage("Share coordinator is not available."))), + new InitializeShareGroupStateResponseData.InitializeStateResult() + .setTopicId(topicId1) + .setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult() + .setPartition(partition1) + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + .setErrorMessage("Share coordinator is not available."))))); + assertEquals(expectedResult, result); + } + @Test public void testWriteFutureReturnsError() throws ExecutionException, InterruptedException, TimeoutException { CoordinatorRuntime runtime = mockRuntime(); @@ -1124,6 +1309,47 @@ class ShareCoordinatorServiceTest { ); } + @Test + public void testInitializeFutureReturnsError() throws ExecutionException, InterruptedException, TimeoutException { + CoordinatorRuntime runtime = mockRuntime(); + ShareCoordinatorService service = new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorTestConfig.testConfig(), + runtime, + new ShareCoordinatorMetrics(), + Time.SYSTEM, + mock(Timer.class), + mock(PartitionWriter.class) + ); + + service.startup(() -> 1); + + String groupId = "group1"; + Uuid topicId = Uuid.randomUuid(); + int partition = 0; + + when(runtime.scheduleWriteOperation(any(), any(), any(), any())).thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception())); + + assertEquals( + new InitializeShareGroupStateResponseData().setResults(List.of(new InitializeShareGroupStateResponseData.InitializeStateResult() + .setTopicId(topicId) + .setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult() + .setPartition(partition) + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + .setErrorMessage("Unable to initialize share group state: This server does not host this topic-partition."))) + )), + service.initializeState(requestContext(ApiKeys.INITIALIZE_SHARE_GROUP_STATE), + new InitializeShareGroupStateRequestData().setGroupId(groupId) + .setTopics(List.of(new InitializeShareGroupStateRequestData.InitializeStateData() + .setTopicId(topicId) + .setPartitions(List.of(new InitializeShareGroupStateRequestData.PartitionData() + .setPartition(partition) + )) + )) + ).get(5, TimeUnit.SECONDS) + ); + } + @Test public void testTopicPartitionFor() { CoordinatorRuntime runtime = mockRuntime(); @@ -1676,7 +1902,7 @@ class ShareCoordinatorServiceTest { } private void checkMetrics(Metrics metrics) { - Set usualMetrics = new HashSet<>(Arrays.asList( + Set usualMetrics = new HashSet<>(List.of( metrics.metricName("write-latency-avg", ShareCoordinatorMetrics.METRICS_GROUP), metrics.metricName("write-latency-max", ShareCoordinatorMetrics.METRICS_GROUP), metrics.metricName("write-rate", ShareCoordinatorMetrics.METRICS_GROUP), @@ -1687,17 +1913,12 @@ class ShareCoordinatorServiceTest { } private void checkPruneMetric(Metrics metrics, String topic, int partition, boolean checkPresence) { - boolean isPresent = metrics.metrics().containsKey( - metrics.metricName( - "last-pruned-offset", - ShareCoordinatorMetrics.METRICS_GROUP, - "The offset at which the share-group state topic was last pruned.", - Map.of( - "topic", topic, - "partition", Integer.toString(partition) - ) - ) - ); + boolean isPresent = metrics.metrics().containsKey(metrics.metricName( + "last-pruned-offset", + ShareCoordinatorMetrics.METRICS_GROUP, + "The offset at which the share-group state topic was last pruned.", + Map.of("topic", topic, "partition", Integer.toString(partition)) + )); assertEquals(checkPresence, isPresent); } } diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java index 7ebb5ce3954..ab136f8dc31 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java @@ -20,6 +20,8 @@ package org.apache.kafka.coordinator.share; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.DeleteShareGroupStateRequestData; import org.apache.kafka.common.message.DeleteShareGroupStateResponseData; +import org.apache.kafka.common.message.InitializeShareGroupStateRequestData; +import org.apache.kafka.common.message.InitializeShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateRequestData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData; @@ -29,6 +31,7 @@ import org.apache.kafka.common.message.WriteShareGroupStateResponseData; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.DeleteShareGroupStateResponse; +import org.apache.kafka.common.requests.InitializeShareGroupStateResponse; import org.apache.kafka.common.requests.ReadShareGroupStateResponse; import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse; import org.apache.kafka.common.requests.WriteShareGroupStateResponse; @@ -1173,6 +1176,184 @@ class ShareCoordinatorShardTest { verify(topicsImage, times(1)).getPartition(eq(TOPIC_ID), eq(0)); } + @Test + public void testInitializeStateSuccess() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + + SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION); + + InitializeShareGroupStateRequestData request = new InitializeShareGroupStateRequestData() + .setGroupId(GROUP_ID) + .setTopics(List.of(new InitializeShareGroupStateRequestData.InitializeStateData() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new InitializeShareGroupStateRequestData.PartitionData() + .setPartition(PARTITION) + .setStartOffset(10) + .setStateEpoch(5))) + )); + + assertNull(shard.getShareStateMapValue(shareCoordinatorKey)); + assertNull(shard.getStateEpochMapValue(shareCoordinatorKey)); + + CoordinatorResult result = shard.initializeState(request); + result.records().forEach(record -> shard.replay(0L, 0L, (short) 0, record)); + + InitializeShareGroupStateResponseData expectedData = InitializeShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION); + List expectedRecords = List.of( + ShareCoordinatorRecordHelpers.newShareSnapshotRecord( + GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0)) + )); + + assertEquals(expectedData, result.response()); + assertEquals(expectedRecords, result.records()); + + assertNotNull(shard.getShareStateMapValue(shareCoordinatorKey)); + assertNotNull(shard.getStateEpochMapValue(shareCoordinatorKey)); + } + + @Test + public void testInitializeStateInvalidRequestData() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + + // invalid partition + int partition = -1; + + InitializeShareGroupStateRequestData request = new InitializeShareGroupStateRequestData() + .setGroupId(GROUP_ID) + .setTopics(List.of(new InitializeShareGroupStateRequestData.InitializeStateData() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new InitializeShareGroupStateRequestData.PartitionData() + .setPartition(partition) + )) + )); + + CoordinatorResult result = shard.initializeState(request); + + InitializeShareGroupStateResponseData expectedData = InitializeShareGroupStateResponse.toErrorResponseData( + TOPIC_ID, partition, Errors.INVALID_REQUEST, ShareCoordinatorShard.NEGATIVE_PARTITION_ID.getMessage()); + List expectedRecords = List.of(); + + assertEquals(expectedData, result.response()); + assertEquals(expectedRecords, result.records()); + + // invalid state epoch + int stateEpoch = 1; + partition = 0; + shard.replay(0L, 0L, (short) 0, ShareCoordinatorRecordHelpers.newShareSnapshotRecord( + GROUP_ID, TOPIC_ID, partition, new ShareGroupOffset.Builder() + .setStateEpoch(5) + .setSnapshotEpoch(0) + .setStateBatches(List.of()) + .build() + )); + + request = new InitializeShareGroupStateRequestData() + .setGroupId(GROUP_ID) + .setTopics(List.of(new InitializeShareGroupStateRequestData.InitializeStateData() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new InitializeShareGroupStateRequestData.PartitionData() + .setPartition(partition) + )) + )); + + result = shard.initializeState(request); + + expectedData = InitializeShareGroupStateResponse.toErrorResponseData( + TOPIC_ID, partition, Errors.FENCED_STATE_EPOCH, Errors.FENCED_STATE_EPOCH.exception().getMessage()); + expectedRecords = List.of(); + + assertEquals(expectedData, result.response()); + assertEquals(expectedRecords, result.records()); + } + + @Test + public void testInitializeNullMetadataImage() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + shard.onNewMetadataImage(null, null); + + InitializeShareGroupStateRequestData request = new InitializeShareGroupStateRequestData() + .setGroupId(GROUP_ID) + .setTopics(List.of(new InitializeShareGroupStateRequestData.InitializeStateData() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new InitializeShareGroupStateRequestData.PartitionData() + .setPartition(0) + )) + )); + + CoordinatorResult result = shard.initializeState(request); + + InitializeShareGroupStateResponseData expectedData = InitializeShareGroupStateResponse.toErrorResponseData( + TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.UNKNOWN_TOPIC_OR_PARTITION.message()); + List expectedRecords = List.of(); + + assertEquals(expectedData, result.response()); + assertEquals(expectedRecords, result.records()); + } + + @Test + public void testInitializeTopicIdNonExistentInMetadataImage() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + MetadataImage image = mock(MetadataImage.class); + shard.onNewMetadataImage(image, null); + + InitializeShareGroupStateRequestData request = new InitializeShareGroupStateRequestData() + .setGroupId(GROUP_ID) + .setTopics(List.of(new InitializeShareGroupStateRequestData.InitializeStateData() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new InitializeShareGroupStateRequestData.PartitionData() + .setPartition(0) + )) + )); + + // topic id not found in cache + TopicsImage topicsImage = mock(TopicsImage.class); + when(topicsImage.getTopic(eq(TOPIC_ID))).thenReturn(null); + when(image.topics()).thenReturn(topicsImage); + CoordinatorResult result = shard.initializeState(request); + + InitializeShareGroupStateResponseData expectedData = InitializeShareGroupStateResponse.toErrorResponseData( + TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.UNKNOWN_TOPIC_OR_PARTITION.message()); + List expectedRecords = List.of(); + + assertEquals(expectedData, result.response()); + assertEquals(expectedRecords, result.records()); + verify(topicsImage, times(1)).getTopic(eq(TOPIC_ID)); + } + + @Test + public void testInitializePartitionIdNonExistentInMetadataImage() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + MetadataImage image = mock(MetadataImage.class); + shard.onNewMetadataImage(image, null); + + InitializeShareGroupStateRequestData request = new InitializeShareGroupStateRequestData() + .setGroupId(GROUP_ID) + .setTopics(List.of(new InitializeShareGroupStateRequestData.InitializeStateData() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new InitializeShareGroupStateRequestData.PartitionData() + .setPartition(0) + )) + )); + + // topic id found in cache + TopicsImage topicsImage = mock(TopicsImage.class); + when(topicsImage.getTopic(eq(TOPIC_ID))).thenReturn(mock(TopicImage.class)); + when(image.topics()).thenReturn(topicsImage); + + // partition id not found + when(topicsImage.getPartition(eq(TOPIC_ID), eq(0))).thenReturn(null); + CoordinatorResult result = shard.initializeState(request); + + InitializeShareGroupStateResponseData expectedData = InitializeShareGroupStateResponse.toErrorResponseData( + TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.UNKNOWN_TOPIC_OR_PARTITION.message()); + List expectedRecords = List.of(); + + assertEquals(expectedData, result.response()); + assertEquals(expectedRecords, result.records()); + verify(topicsImage, times(1)).getTopic(eq(TOPIC_ID)); + verify(topicsImage, times(1)).getPartition(eq(TOPIC_ID), eq(0)); + } + private static ShareGroupOffset groupOffset(ApiMessage record) { if (record instanceof ShareSnapshotValue) { return ShareGroupOffset.fromRecord((ShareSnapshotValue) record);