From 6e767368908711c6e31cd1bc9fddbce469ba63bf Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Sun, 23 Feb 2025 13:33:13 +0530 Subject: [PATCH] KAFKA-18827: Initialize share group state persister impl [2/N]. (#18992) * In this PR, we have provided implementation for the initialize share group state RPC from the persister perspective. * Tests have been added wherever applicable. Reviewers: Andrew Schofield --- .../persister/DefaultStatePersister.java | 132 ++- .../persister/PersisterStateManager.java | 212 ++++- .../persister/DefaultStatePersisterTest.java | 330 +++++++- .../persister/PersisterStateManagerTest.java | 754 ++++++++++++++++++ 4 files changed, 1379 insertions(+), 49 deletions(-) diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java index fdd6c7cb5da..183521eb5f4 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java @@ -20,6 +20,7 @@ package org.apache.kafka.server.share.persister; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.DeleteShareGroupStateResponse; +import org.apache.kafka.common.requests.InitializeShareGroupStateResponse; import org.apache.kafka.common.requests.ReadShareGroupStateResponse; import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse; import org.apache.kafka.common.requests.WriteShareGroupStateResponse; @@ -33,7 +34,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; /** * The default implementation of the {@link Persister} interface which is used by the @@ -70,7 +70,48 @@ public class DefaultStatePersister implements Persister { * @return A completable future of InitializeShareGroupStateResult */ public CompletableFuture initializeState(InitializeShareGroupStateParameters request) { - throw new RuntimeException("not implemented"); + try { + validate(request); + } catch (Exception e) { + log.error("Unable to validate initialize state request", e); + return CompletableFuture.failedFuture(e); + } + GroupTopicPartitionData gtp = request.groupTopicPartitionData(); + String groupId = gtp.groupId(); + + Map>> futureMap = new HashMap<>(); + List handlers = new ArrayList<>(); + + gtp.topicsData().forEach(topicData -> { + topicData.partitions().forEach(partitionData -> { + CompletableFuture future = futureMap + .computeIfAbsent(topicData.topicId(), k -> new HashMap<>()) + .computeIfAbsent(partitionData.partition(), k -> new CompletableFuture<>()); + + handlers.add( + stateManager.new InitializeStateHandler( + groupId, + topicData.topicId(), + partitionData.partition(), + partitionData.stateEpoch(), + partitionData.startOffset(), + future, + null + ) + ); + }); + }); + + for (PersisterStateManager.PersisterStateManagerHandler handler : handlers) { + stateManager.enqueue(handler); + } + + CompletableFuture combinedFuture = CompletableFuture.allOf( + handlers.stream() + .map(PersisterStateManager.InitializeStateHandler::result) + .toArray(CompletableFuture[]::new)); + + return combinedFuture.thenApply(v -> initializeResponsesToResult(futureMap)); } /** @@ -125,6 +166,51 @@ public class DefaultStatePersister implements Persister { return combinedFuture.thenApply(v -> writeResponsesToResult(futureMap)); } + /** + * Takes in a list of COMPLETED futures and combines the results, + * taking care of errors if any, into a single InitializeShareGroupStateResult + * + * @param futureMap - HashMap of {topic -> {partition -> future}} + * @return Object representing combined result of type InitializeShareGroupStateResult + */ + // visible for testing + InitializeShareGroupStateResult initializeResponsesToResult( + Map>> futureMap + ) { + List> topicsData = futureMap.keySet().stream() + .map(topicId -> { + List partitionErrData = futureMap.get(topicId).entrySet().stream() + .map(partitionFuture -> { + int partition = partitionFuture.getKey(); + CompletableFuture future = partitionFuture.getValue(); + try { + // already completed because of allOf application in the caller + InitializeShareGroupStateResponse partitionResponse = future.join(); + return partitionResponse.data().results().get(0).partitions().stream() + .map(partitionResult -> PartitionFactory.newPartitionErrorData( + partitionResult.partition(), + partitionResult.errorCode(), + partitionResult.errorMessage())) + .toList(); + } catch (Exception e) { + log.error("Unexpected exception while initializing data in share coordinator", e); + return List.of(PartitionFactory.newPartitionErrorData( + partition, + Errors.UNKNOWN_SERVER_ERROR.code(), // No specific public error code exists for InterruptedException / ExecutionException + "Error initializing state in share coordinator: " + e.getMessage()) + ); + } + }) + .flatMap(List::stream) + .toList(); + return new TopicData<>(topicId, partitionErrData); + }) + .toList(); + return new InitializeShareGroupStateResult.Builder() + .setTopicsData(topicsData) + .build(); + } + /** * Takes in a list of COMPLETED futures and combines the results, * taking care of errors if any, into a single WriteShareGroupStateResult @@ -150,10 +236,10 @@ public class DefaultStatePersister implements Persister { partitionResult.partition(), partitionResult.errorCode(), partitionResult.errorMessage())) - .collect(Collectors.toList()); + .toList(); } catch (Exception e) { log.error("Unexpected exception while writing data to share coordinator", e); - return Collections.singletonList(PartitionFactory.newPartitionErrorData( + return List.of(PartitionFactory.newPartitionErrorData( partition, Errors.UNKNOWN_SERVER_ERROR.code(), // No specific public error code exists for InterruptedException / ExecutionException "Error writing state to share coordinator: " + e.getMessage()) @@ -161,10 +247,10 @@ public class DefaultStatePersister implements Persister { } }) .flatMap(List::stream) - .collect(Collectors.toList()); + .toList(); return new TopicData<>(topicId, partitionErrData); }) - .collect(Collectors.toList()); + .toList(); return new WriteShareGroupStateResult.Builder() .setTopicsData(topicsData) .build(); @@ -248,12 +334,12 @@ public class DefaultStatePersister implements Persister { partitionResult.startOffset(), partitionResult.errorCode(), partitionResult.errorMessage(), - partitionResult.stateBatches().stream().map(PersisterStateBatch::from).collect(Collectors.toList()) + partitionResult.stateBatches().stream().map(PersisterStateBatch::from).toList() )) - .collect(Collectors.toList()); + .toList(); } catch (Exception e) { log.error("Unexpected exception while getting data from share coordinator", e); - return Collections.singletonList(PartitionFactory.newPartitionAllData( + return List.of(PartitionFactory.newPartitionAllData( partition, -1, -1, @@ -264,10 +350,10 @@ public class DefaultStatePersister implements Persister { } }) .flatMap(List::stream) - .collect(Collectors.toList()); + .toList(); return new TopicData<>(topicId, partitionAllData); }) - .collect(Collectors.toList()); + .toList(); return new ReadShareGroupStateResult.Builder() .setTopicsData(topicsData) .build(); @@ -403,10 +489,10 @@ public class DefaultStatePersister implements Persister { partitionResult.startOffset(), partitionResult.errorCode(), partitionResult.errorMessage())) - .collect(Collectors.toList()); + .toList(); } catch (Exception e) { log.error("Unexpected exception while getting data from share coordinator", e); - return Collections.singletonList(PartitionFactory.newPartitionStateSummaryData( + return List.of(PartitionFactory.newPartitionStateSummaryData( partition, -1, -1, @@ -415,10 +501,10 @@ public class DefaultStatePersister implements Persister { } }) .flatMap(List::stream) - .collect(Collectors.toList()); + .toList(); return new TopicData<>(topicId, partitionStateErrorData); }) - .collect(Collectors.toList()); + .toList(); return new ReadShareGroupStateSummaryResult.Builder() .setTopicsData(topicsData) .build(); @@ -464,15 +550,27 @@ public class DefaultStatePersister implements Persister { } }) .flatMap(List::stream) - .collect(Collectors.toList()); + .toList(); return new TopicData<>(topicId, partitionErrorData); }) - .collect(Collectors.toList()); + .toList(); return new DeleteShareGroupStateResult.Builder() .setTopicsData(topicsData) .build(); } + private static void validate(InitializeShareGroupStateParameters params) { + String prefix = "Initialize share group parameters"; + if (params == null) { + throw new IllegalArgumentException(prefix + " cannot be null."); + } + if (params.groupTopicPartitionData() == null) { + throw new IllegalArgumentException(prefix + " data cannot be null."); + } + + validateGroupTopicPartitionData(prefix, params.groupTopicPartitionData()); + } + private static void validate(WriteShareGroupStateParameters params) { String prefix = "Write share group parameters"; if (params == null) { diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java index 783ef4fae6a..4e38a795242 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java @@ -29,6 +29,8 @@ import org.apache.kafka.common.message.DeleteShareGroupStateRequestData; import org.apache.kafka.common.message.DeleteShareGroupStateResponseData; import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.message.InitializeShareGroupStateRequestData; +import org.apache.kafka.common.message.InitializeShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateRequestData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData; @@ -43,6 +45,8 @@ import org.apache.kafka.common.requests.DeleteShareGroupStateRequest; import org.apache.kafka.common.requests.DeleteShareGroupStateResponse; import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.requests.InitializeShareGroupStateRequest; +import org.apache.kafka.common.requests.InitializeShareGroupStateResponse; import org.apache.kafka.common.requests.ReadShareGroupStateRequest; import org.apache.kafka.common.requests.ReadShareGroupStateResponse; import org.apache.kafka.common.requests.ReadShareGroupStateSummaryRequest; @@ -147,6 +151,7 @@ public class PersisterStateManager { } public enum RPCType { + INITIALIZE, READ, WRITE, DELETE, @@ -483,6 +488,154 @@ public class PersisterStateManager { } } + public class InitializeStateHandler extends PersisterStateManagerHandler { + private final int stateEpoch; + private final long startOffset; + private final CompletableFuture result; + private final BackoffManager initializeStateBackoff; + + public InitializeStateHandler( + String groupId, + Uuid topicId, + int partition, + int stateEpoch, + long startOffset, + CompletableFuture result, + long backoffMs, + long backoffMaxMs, + int maxRPCRetryAttempts + ) { + super(groupId, topicId, partition, backoffMs, backoffMaxMs, maxRPCRetryAttempts); + this.stateEpoch = stateEpoch; + this.startOffset = startOffset; + this.result = result; + this.initializeStateBackoff = new BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs); + } + + public InitializeStateHandler( + String groupId, + Uuid topicId, + int partition, + int stateEpoch, + long startOffset, + CompletableFuture result, + Consumer onCompleteCallback + ) { + this( + groupId, + topicId, + partition, + stateEpoch, + startOffset, + result, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + MAX_FIND_COORD_ATTEMPTS + ); + } + + @Override + protected String name() { + return "InitializeStateHandler"; + } + + @Override + protected AbstractRequest.Builder requestBuilder() { + throw new RuntimeException("Initialize requests are batchable, hence individual requests not needed."); + } + + @Override + protected boolean isResponseForRequest(ClientResponse response) { + return response.requestHeader().apiKey() == ApiKeys.INITIALIZE_SHARE_GROUP_STATE; + } + + @Override + protected void handleRequestResponse(ClientResponse response) { + log.debug("Initialize state response received - {}", response); + initializeStateBackoff.incrementAttempt(); + + // response can be a combined one for large number of requests + // we need to deconstruct it + InitializeShareGroupStateResponse combinedResponse = (InitializeShareGroupStateResponse) response.responseBody(); + + for (InitializeShareGroupStateResponseData.InitializeStateResult initializeStateResult : combinedResponse.data().results()) { + if (initializeStateResult.topicId().equals(partitionKey().topicId())) { + Optional partitionStateData = + initializeStateResult.partitions().stream().filter(partitionResult -> partitionResult.partition() == partitionKey().partition()) + .findFirst(); + + if (partitionStateData.isPresent()) { + Errors error = Errors.forCode(partitionStateData.get().errorCode()); + switch (error) { + case NONE: + initializeStateBackoff.resetAttempts(); + InitializeShareGroupStateResponseData.InitializeStateResult result = InitializeShareGroupStateResponse.toResponseInitializeStateResult( + partitionKey().topicId(), + List.of(partitionStateData.get()) + ); + this.result.complete(new InitializeShareGroupStateResponse( + new InitializeShareGroupStateResponseData().setResults(List.of(result)))); + return; + + // check retriable errors + case COORDINATOR_NOT_AVAILABLE: + case COORDINATOR_LOAD_IN_PROGRESS: + case NOT_COORDINATOR: + log.warn("Received retriable error in initialize state RPC for key {}: {}", partitionKey(), error.message()); + if (!initializeStateBackoff.canAttempt()) { + log.error("Exhausted max retries for initialize state RPC for key {} without success.", partitionKey()); + requestErrorResponse(error, new Exception("Exhausted max retries to complete initialize state RPC without success.")); + return; + } + super.resetCoordinatorNode(); + timer.add(new PersisterTimerTask(initializeStateBackoff.backOff(), this)); + return; + + default: + log.error("Unable to perform initialize state RPC for key {}: {}", partitionKey(), error.message()); + requestErrorResponse(error, null); + return; + } + } + } + } + + // no response found specific topic partition + IllegalStateException exception = new IllegalStateException( + "Failed to initialize state for share partition: " + partitionKey() + ); + requestErrorResponse(Errors.forException(exception), exception); + } + + @Override + public void requestErrorResponse(Errors error, Exception exception) { + this.result.complete(new InitializeShareGroupStateResponse( + InitializeShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), error, "Error in initialize state RPC. " + + (exception == null ? error.message() : exception.getMessage())))); + } + + @Override + protected void findCoordinatorErrorResponse(Errors error, Exception exception) { + this.result.complete(new InitializeShareGroupStateResponse( + InitializeShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), error, "Error in find coordinator. " + + (exception == null ? error.message() : exception.getMessage())))); + } + + protected CompletableFuture result() { + return result; + } + + @Override + protected boolean isBatchable() { + return true; + } + + @Override + protected RPCType rpcType() { + return RPCType.INITIALIZE; + } + } + public class WriteStateHandler extends PersisterStateManagerHandler { private final int stateEpoch; private final int leaderEpoch; @@ -576,10 +729,10 @@ public class PersisterStateManager { writeStateBackoff.resetAttempts(); WriteShareGroupStateResponseData.WriteStateResult result = WriteShareGroupStateResponse.toResponseWriteStateResult( partitionKey().topicId(), - Collections.singletonList(partitionStateData.get()) + List.of(partitionStateData.get()) ); this.result.complete(new WriteShareGroupStateResponse( - new WriteShareGroupStateResponseData().setResults(Collections.singletonList(result)))); + new WriteShareGroupStateResponseData().setResults(List.of(result)))); return; // check retriable errors @@ -718,10 +871,10 @@ public class PersisterStateManager { readStateBackoff.resetAttempts(); ReadShareGroupStateResponseData.ReadStateResult result = ReadShareGroupStateResponse.toResponseReadStateResult( partitionKey().topicId(), - Collections.singletonList(partitionStateData.get()) + List.of(partitionStateData.get()) ); this.result.complete(new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData() - .setResults(Collections.singletonList(result)))); + .setResults(List.of(result)))); return; // check retriable errors @@ -860,10 +1013,10 @@ public class PersisterStateManager { readStateSummaryBackoff.resetAttempts(); ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult result = ReadShareGroupStateSummaryResponse.toResponseReadStateSummaryResult( partitionKey().topicId(), - Collections.singletonList(partitionStateData.get()) + List.of(partitionStateData.get()) ); this.result.complete(new ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData() - .setResults(Collections.singletonList(result)))); + .setResults(List.of(result)))); return; // check retriable errors @@ -1126,7 +1279,7 @@ public class PersisterStateManager { return Collections.emptyList(); } log.debug("Sending find coordinator RPC"); - return Collections.singletonList(new RequestAndCompletionHandler( + return List.of(new RequestAndCompletionHandler( time.milliseconds(), randomNode, handler.findShareCoordinatorBuilder(), @@ -1151,7 +1304,7 @@ public class PersisterStateManager { // write: [w1, w2], // read: [r1, r2], // delete: [d1], - // summary: [s1] + // summary: [s1], // } // group2: { // write: [w3, w4] @@ -1241,18 +1394,14 @@ public class PersisterStateManager { */ private static class RequestCoalescerHelper { public static AbstractRequest.Builder coalesceRequests(String groupId, RPCType rpcType, List handlers) { - switch (rpcType) { - case WRITE: - return coalesceWrites(groupId, handlers); - case READ: - return coalesceReads(groupId, handlers); - case SUMMARY: - return coalesceReadSummaries(groupId, handlers); - case DELETE: - return coalesceDeletes(groupId, handlers); - default: - throw new RuntimeException("Unknown rpc type: " + rpcType); - } + return switch (rpcType) { + case WRITE -> coalesceWrites(groupId, handlers); + case READ -> coalesceReads(groupId, handlers); + case SUMMARY -> coalesceReadSummaries(groupId, handlers); + case DELETE -> coalesceDeletes(groupId, handlers); + case INITIALIZE -> coalesceInitializations(groupId, handlers); + default -> throw new RuntimeException("Unknown rpc type: " + rpcType); + }; } private static AbstractRequest.Builder coalesceWrites(String groupId, List handlers) { @@ -1352,5 +1501,28 @@ public class PersisterStateManager { .setPartitions(entry.getValue())) .toList())); } + + private static AbstractRequest.Builder coalesceInitializations(String groupId, List handlers) { + Map> partitionData = new HashMap<>(); + handlers.forEach(persHandler -> { + assert persHandler instanceof InitializeStateHandler; + InitializeStateHandler handler = (InitializeStateHandler) persHandler; + partitionData.computeIfAbsent(handler.partitionKey().topicId(), topicId -> new LinkedList<>()) + .add( + new InitializeShareGroupStateRequestData.PartitionData() + .setPartition(handler.partitionKey().partition()) + .setStateEpoch(handler.stateEpoch) + .setStartOffset(handler.startOffset) + ); + }); + + return new InitializeShareGroupStateRequest.Builder(new InitializeShareGroupStateRequestData() + .setGroupId(groupId) + .setTopics(partitionData.entrySet().stream() + .map(entry -> new InitializeShareGroupStateRequestData.InitializeStateData() + .setTopicId(entry.getKey()) + .setPartitions(entry.getValue())) + .toList())); + } } } diff --git a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java index 708de117ad0..e9ba8e90613 100644 --- a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.DeleteShareGroupStateRequestData; import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.message.InitializeShareGroupStateRequestData; import org.apache.kafka.common.message.ReadShareGroupStateRequestData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData; @@ -33,6 +34,8 @@ import org.apache.kafka.common.requests.DeleteShareGroupStateRequest; import org.apache.kafka.common.requests.DeleteShareGroupStateResponse; import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.requests.InitializeShareGroupStateRequest; +import org.apache.kafka.common.requests.InitializeShareGroupStateResponse; import org.apache.kafka.common.requests.ReadShareGroupStateRequest; import org.apache.kafka.common.requests.ReadShareGroupStateResponse; import org.apache.kafka.common.requests.ReadShareGroupStateSummaryRequest; @@ -401,8 +404,8 @@ class DefaultStatePersisterTest { .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() .setGroupId(groupId) .setTopicsData(List.of(new TopicData<>(null, - List.of(PartitionFactory.newPartitionStateBatchData( - partition, 1, 0, 0, null))))).build()).build()); + List.of(PartitionFactory.newPartitionIdData( + partition))))).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); assertFutureThrows(IllegalArgumentException.class, result); @@ -430,6 +433,81 @@ class DefaultStatePersisterTest { assertFutureThrows(IllegalArgumentException.class, result); } + @Test + public void testInitializeStateValidate() { + String groupId = "group1"; + Uuid topicId = Uuid.randomUuid(); + int partition = 0; + int incorrectPartition = -1; + + // Request Parameters are null + DefaultStatePersister defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); + CompletableFuture result = defaultStatePersister.initializeState(null); + assertTrue(result.isDone()); + assertTrue(result.isCompletedExceptionally()); + assertFutureThrows(IllegalArgumentException.class, result); + + // groupTopicPartitionData is null + defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); + result = defaultStatePersister.initializeState(new InitializeShareGroupStateParameters.Builder().setGroupTopicPartitionData(null).build()); + assertTrue(result.isDone()); + assertTrue(result.isCompletedExceptionally()); + assertFutureThrows(IllegalArgumentException.class, result); + + // groupId is null + defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); + result = defaultStatePersister.initializeState(new InitializeShareGroupStateParameters.Builder() + .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() + .setGroupId(null).build()).build()); + assertTrue(result.isDone()); + assertTrue(result.isCompletedExceptionally()); + assertFutureThrows(IllegalArgumentException.class, result); + + // topicsData is empty + defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); + result = defaultStatePersister.initializeState(new InitializeShareGroupStateParameters.Builder() + .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() + .setGroupId(groupId) + .setTopicsData(List.of()).build()).build()); + assertTrue(result.isDone()); + assertTrue(result.isCompletedExceptionally()); + assertFutureThrows(IllegalArgumentException.class, result); + + // topicId is null + defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); + result = defaultStatePersister.initializeState(new InitializeShareGroupStateParameters.Builder() + .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() + .setGroupId(groupId) + .setTopicsData(List.of(new TopicData<>(null, + List.of(PartitionFactory.newPartitionStateData( + partition, 1, 0))))).build()).build()); + assertTrue(result.isDone()); + assertTrue(result.isCompletedExceptionally()); + assertFutureThrows(IllegalArgumentException.class, result); + + // partitionData is empty + defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); + result = defaultStatePersister.initializeState(new InitializeShareGroupStateParameters.Builder() + .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() + .setGroupId(groupId) + .setTopicsData(List.of(new TopicData<>(topicId, List.of()))).build()).build()); + assertTrue(result.isDone()); + assertTrue(result.isCompletedExceptionally()); + assertFutureThrows(IllegalArgumentException.class, result); + + // partition value is incorrect + defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); + result = defaultStatePersister.initializeState(new InitializeShareGroupStateParameters.Builder() + .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() + .setGroupId(groupId) + .setTopicsData(List.of(new TopicData<>(topicId, + List.of(PartitionFactory.newPartitionStateData( + incorrectPartition, 0, 0))))).build()).build()); + assertTrue(result.isDone()); + assertTrue(result.isCompletedExceptionally()); + assertFutureThrows(IllegalArgumentException.class, result); + } + @Test public void testWriteStateSuccess() { @@ -996,6 +1074,143 @@ class DefaultStatePersisterTest { assertEquals(expectedResultMap, resultMap); } + @Test + public void testInitializeStateSuccess() { + MockClient client = new MockClient(MOCK_TIME); + + String groupId = "group1"; + Uuid topicId1 = Uuid.randomUuid(); + int partition1 = 10; + int stateEpoch1 = 1; + long startOffset1 = 10; + + Uuid topicId2 = Uuid.randomUuid(); + int partition2 = 8; + int stateEpoch2 = 1; + long startOffset2 = 5; + + Node suppliedNode = new Node(0, HOST, PORT); + Node coordinatorNode1 = new Node(5, HOST, PORT); + Node coordinatorNode2 = new Node(6, HOST, PORT); + + String coordinatorKey1 = SharePartitionKey.asCoordinatorKey(groupId, topicId1, partition1); + String coordinatorKey2 = SharePartitionKey.asCoordinatorKey(groupId, topicId2, partition2); + + client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest + && ((FindCoordinatorRequest) body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() + && ((FindCoordinatorRequest) body).data().coordinatorKeys().get(0).equals(coordinatorKey1), + new FindCoordinatorResponse( + new FindCoordinatorResponseData() + .setCoordinators(List.of( + new FindCoordinatorResponseData.Coordinator() + .setNodeId(coordinatorNode1.id()) + .setHost(HOST) + .setPort(PORT) + .setErrorCode(Errors.NONE.code()) + )) + ), + suppliedNode + ); + + client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest + && ((FindCoordinatorRequest) body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() + && ((FindCoordinatorRequest) body).data().coordinatorKeys().get(0).equals(coordinatorKey2), + new FindCoordinatorResponse( + new FindCoordinatorResponseData() + .setCoordinators(List.of( + new FindCoordinatorResponseData.Coordinator() + .setNodeId(coordinatorNode2.id()) + .setHost(HOST) + .setPort(PORT) + .setErrorCode(Errors.NONE.code()) + )) + ), + suppliedNode + ); + + client.prepareResponseFrom( + body -> { + InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest) body; + String requestGroupId = request.data().groupId(); + Uuid requestTopicId = request.data().topics().get(0).topicId(); + int requestPartition = request.data().topics().get(0).partitions().get(0).partition(); + + return requestGroupId.equals(groupId) && requestTopicId == topicId1 && requestPartition == partition1; + }, + new InitializeShareGroupStateResponse(InitializeShareGroupStateResponse.toResponseData(topicId1, partition1)), + coordinatorNode1 + ); + + client.prepareResponseFrom( + body -> { + InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest) body; + String requestGroupId = request.data().groupId(); + Uuid requestTopicId = request.data().topics().get(0).topicId(); + int requestPartition = request.data().topics().get(0).partitions().get(0).partition(); + + return requestGroupId.equals(groupId) && requestTopicId == topicId2 && requestPartition == partition2; + }, + new InitializeShareGroupStateResponse(InitializeShareGroupStateResponse.toResponseData(topicId2, partition2)), + coordinatorNode2 + ); + + ShareCoordinatorMetadataCacheHelper cacheHelper = getDefaultCacheHelper(suppliedNode); + + DefaultStatePersister defaultStatePersister = DefaultStatePersisterBuilder.builder() + .withKafkaClient(client) + .withCacheHelper(cacheHelper) + .build(); + + InitializeShareGroupStateParameters request = InitializeShareGroupStateParameters.from( + new InitializeShareGroupStateRequestData() + .setGroupId(groupId) + .setTopics(List.of( + new InitializeShareGroupStateRequestData.InitializeStateData() + .setTopicId(topicId1) + .setPartitions(List.of( + new InitializeShareGroupStateRequestData.PartitionData() + .setPartition(partition1) + .setStateEpoch(stateEpoch1) + .setStartOffset(startOffset1) + )), + new InitializeShareGroupStateRequestData.InitializeStateData() + .setTopicId(topicId2) + .setPartitions(List.of( + new InitializeShareGroupStateRequestData.PartitionData() + .setPartition(partition2) + .setStateEpoch(stateEpoch2) + .setStartOffset(startOffset2) + )) + )) + ); + + CompletableFuture resultFuture = defaultStatePersister.initializeState(request); + + InitializeShareGroupStateResult result = null; + try { + // adding long delay to allow for environment/GC issues + result = resultFuture.get(10L, TimeUnit.SECONDS); + } catch (Exception e) { + fail("Unexpected exception", e); + } + + HashSet resultMap = new HashSet<>(); + result.topicsData().forEach( + topicData -> topicData.partitions().forEach( + partitionData -> resultMap.add((PartitionData) partitionData) + ) + ); + + + HashSet expectedResultMap = new HashSet<>(); + expectedResultMap.add((PartitionData) PartitionFactory.newPartitionErrorData(partition1, Errors.NONE.code(), null)); + + expectedResultMap.add((PartitionData) PartitionFactory.newPartitionErrorData(partition2, Errors.NONE.code(), null)); + + assertEquals(2, result.topicsData().size()); + assertEquals(expectedResultMap, resultMap); + } + @Test public void testWriteStateResponseToResultPartialResults() { Map>> futureMap = new HashMap<>(); @@ -1382,16 +1597,12 @@ class DefaultStatePersisterTest { TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), 1, null); // one entry has valid results - futureMap.computeIfAbsent(tp1.topicId(), k -> new HashMap<>()) - .put(tp1.partition(), CompletableFuture.completedFuture( - new DeleteShareGroupStateResponse( - DeleteShareGroupStateResponse.toResponseData( - tp1.topicId(), - tp1.partition() - ) - ) - ) - ); + futureMap.computeIfAbsent(tp1.topicId(), k -> new HashMap<>()).put(tp1.partition(), CompletableFuture.completedFuture( + new DeleteShareGroupStateResponse(DeleteShareGroupStateResponse.toResponseData( + tp1.topicId(), + tp1.partition() + )) + )); // one entry has failed future futureMap.computeIfAbsent(tp2.topicId(), k -> new HashMap<>()) @@ -1422,6 +1633,101 @@ class DefaultStatePersisterTest { ); } + @Test + public void testInitializeStateResponseToResultPartialResults() { + Map>> futureMap = new HashMap<>(); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), 1, null); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), 1, null); + + // one entry has valid results + futureMap.computeIfAbsent(tp1.topicId(), k -> new HashMap<>()).put(tp1.partition(), CompletableFuture.completedFuture( + new InitializeShareGroupStateResponse( + InitializeShareGroupStateResponse.toResponseData( + tp1.topicId(), + tp1.partition() + )) + )); + + // one entry has error + futureMap.computeIfAbsent(tp2.topicId(), k -> new HashMap<>()).put(tp2.partition(), CompletableFuture.completedFuture( + new InitializeShareGroupStateResponse( + InitializeShareGroupStateResponse.toErrorResponseData( + tp2.topicId(), + tp2.partition(), + Errors.UNKNOWN_TOPIC_OR_PARTITION, + "unknown tp" + )) + )); + + PersisterStateManager psm = mock(PersisterStateManager.class); + DefaultStatePersister dsp = new DefaultStatePersister(psm); + + InitializeShareGroupStateResult results = dsp.initializeResponsesToResult(futureMap); + + // results should contain partial results + assertEquals(2, results.topicsData().size()); + assertTrue( + results.topicsData().contains( + new TopicData<>( + tp1.topicId(), + List.of(PartitionFactory.newPartitionErrorData(tp1.partition(), Errors.NONE.code(), null)) + ) + ) + ); + assertTrue( + results.topicsData().contains( + new TopicData<>( + tp2.topicId(), + List.of(PartitionFactory.newPartitionErrorData(tp2.partition(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "unknown tp")) + ) + ) + ); + } + + @Test + public void testInitializeStateResponseToResultFailedFuture() { + Map>> futureMap = new HashMap<>(); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), 1, null); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), 1, null); + + // one entry has valid results + futureMap.computeIfAbsent(tp1.topicId(), k -> new HashMap<>()).put(tp1.partition(), CompletableFuture.completedFuture( + new InitializeShareGroupStateResponse( + InitializeShareGroupStateResponse.toResponseData( + tp1.topicId(), + tp1.partition() + )) + )); + + // one entry has failed future + futureMap.computeIfAbsent(tp2.topicId(), k -> new HashMap<>()) + .put(tp2.partition(), CompletableFuture.failedFuture(new Exception("scary stuff"))); + + PersisterStateManager psm = mock(PersisterStateManager.class); + DefaultStatePersister dsp = new DefaultStatePersister(psm); + + InitializeShareGroupStateResult results = dsp.initializeResponsesToResult(futureMap); + + // results should contain partial results + assertEquals(2, results.topicsData().size()); + assertTrue( + results.topicsData().contains( + new TopicData<>( + tp1.topicId(), + List.of(PartitionFactory.newPartitionErrorData(tp1.partition(), Errors.NONE.code(), null)) + ) + ) + ); + assertTrue( + results.topicsData().contains( + new TopicData<>( + tp2.topicId(), + List.of(PartitionFactory.newPartitionErrorData(tp2.partition(), Errors.UNKNOWN_SERVER_ERROR.code(), "Error initializing state in share coordinator: java.lang.Exception: scary stuff")) + ) + ) + ); + } + @Test public void testDefaultPersisterClose() { PersisterStateManager psm = mock(PersisterStateManager.class); diff --git a/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java b/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java index 0519392bc00..584b51c6e22 100644 --- a/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.DeleteShareGroupStateResponseData; import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.message.InitializeShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData; import org.apache.kafka.common.message.WriteShareGroupStateResponseData; @@ -34,6 +35,8 @@ import org.apache.kafka.common.requests.DeleteShareGroupStateRequest; import org.apache.kafka.common.requests.DeleteShareGroupStateResponse; import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.requests.InitializeShareGroupStateRequest; +import org.apache.kafka.common.requests.InitializeShareGroupStateResponse; import org.apache.kafka.common.requests.ReadShareGroupStateRequest; import org.apache.kafka.common.requests.ReadShareGroupStateResponse; import org.apache.kafka.common.requests.ReadShareGroupStateSummaryRequest; @@ -76,6 +79,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +@SuppressWarnings("JavaNCSS") class PersisterStateManagerTest { private static final KafkaClient CLIENT = mock(KafkaClient.class); private static final MockTime MOCK_TIME = new MockTime(); @@ -3716,6 +3720,756 @@ class PersisterStateManagerTest { TestUtils.waitForCondition(isBatchingSuccess::get, TestUtils.DEFAULT_MAX_WAIT_MS, 10L, () -> "unable to verify batching"); } + @Test + public void testInitializeStateRequestCoordinatorFoundSuccessfully() { + MockClient client = new MockClient(MOCK_TIME); + + String groupId = "group1"; + Uuid topicId = Uuid.randomUuid(); + int partition = 10; + int stateEpoch = 1; + long startOffset = 10; + + Node suppliedNode = new Node(0, HOST, PORT); + Node coordinatorNode = new Node(1, HOST, PORT); + + String coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId, topicId, partition); + + client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest + && ((FindCoordinatorRequest) body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() + && ((FindCoordinatorRequest) body).data().coordinatorKeys().get(0).equals(coordinatorKey), + new FindCoordinatorResponse( + new FindCoordinatorResponseData() + .setCoordinators(Collections.singletonList( + new FindCoordinatorResponseData.Coordinator() + .setNodeId(1) + .setHost(HOST) + .setPort(PORT) + .setErrorCode(Errors.NONE.code()) + )) + ), + suppliedNode + ); + + client.prepareResponseFrom(body -> { + InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest) body; + String requestGroupId = request.data().groupId(); + Uuid requestTopicId = request.data().topics().get(0).topicId(); + int requestPartition = request.data().topics().get(0).partitions().get(0).partition(); + + return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition; + }, new InitializeShareGroupStateResponse( + new InitializeShareGroupStateResponseData() + .setResults(List.of( + new InitializeShareGroupStateResponseData.InitializeStateResult() + .setTopicId(topicId) + .setPartitions(List.of( + new InitializeShareGroupStateResponseData.PartitionResult() + .setPartition(partition) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage("") + )) + )) + ), coordinatorNode); + + ShareCoordinatorMetadataCacheHelper cacheHelper = getDefaultCacheHelper(suppliedNode); + + PersisterStateManager stateManager = PersisterStateManagerBuilder.builder() + .withKafkaClient(client) + .withTimer(mockTimer) + .withCacheHelper(cacheHelper) + .build(); + + stateManager.start(); + + CompletableFuture future = new CompletableFuture<>(); + + PersisterStateManager.InitializeStateHandler handler = spy(stateManager.new InitializeStateHandler( + groupId, + topicId, + partition, + stateEpoch, + startOffset, + future, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + MAX_RPC_RETRY_ATTEMPTS + )); + + stateManager.enqueue(handler); + + CompletableFuture resultFuture = handler.result(); + + InitializeShareGroupStateResponse result = null; + try { + result = resultFuture.get(); + } catch (Exception e) { + fail("Failed to get result from future", e); + } + + InitializeShareGroupStateResponseData.PartitionResult partitionResult = result.data().results().get(0).partitions().get(0); + + verify(handler, times(1)).findShareCoordinatorBuilder(); + verify(handler, times(0)).requestBuilder(); + + // Verifying the coordinator node was populated correctly by the FIND_COORDINATOR request + assertEquals(coordinatorNode, handler.getCoordinatorNode()); + + // Verifying the result returned in correct + assertEquals(partition, partitionResult.partition()); + assertEquals(Errors.NONE.code(), partitionResult.errorCode()); + + try { + // Stopping the state manager + stateManager.stop(); + } catch (Exception e) { + fail("Failed to stop state manager", e); + } + } + + @Test + public void testInitializeStateRequestRetryWithNotCoordinatorSuccessfulOnRetry() throws InterruptedException, ExecutionException { + MockClient client = new MockClient(MOCK_TIME); + + String groupId = "group1"; + Uuid topicId = Uuid.randomUuid(); + int partition = 10; + int stateEpoch = 5; + int startOffset = 11; + + Node suppliedNode = new Node(0, HOST, PORT); + Node coordinatorNode = new Node(1, HOST, PORT); + + String coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId, topicId, partition); + + client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest + && ((FindCoordinatorRequest) body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() + && ((FindCoordinatorRequest) body).data().coordinatorKeys().get(0).equals(coordinatorKey), + new FindCoordinatorResponse( + new FindCoordinatorResponseData() + .setCoordinators(Collections.singletonList( + new FindCoordinatorResponseData.Coordinator() + .setErrorCode(Errors.NOT_COORDINATOR.code()) + )) + ), + suppliedNode + ); + + client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest + && ((FindCoordinatorRequest) body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() + && ((FindCoordinatorRequest) body).data().coordinatorKeys().get(0).equals(coordinatorKey), + new FindCoordinatorResponse( + new FindCoordinatorResponseData() + .setCoordinators(Collections.singletonList( + new FindCoordinatorResponseData.Coordinator() + .setNodeId(1) + .setHost(HOST) + .setPort(PORT) + .setErrorCode(Errors.NONE.code()) + )) + ), + suppliedNode + ); + + client.prepareResponseFrom(body -> { + InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest) body; + String requestGroupId = request.data().groupId(); + Uuid requestTopicId = request.data().topics().get(0).topicId(); + int requestPartition = request.data().topics().get(0).partitions().get(0).partition(); + + return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition; + }, new InitializeShareGroupStateResponse( + new InitializeShareGroupStateResponseData() + .setResults(List.of( + new InitializeShareGroupStateResponseData.InitializeStateResult() + .setTopicId(topicId) + .setPartitions(List.of( + new InitializeShareGroupStateResponseData.PartitionResult() + .setPartition(partition) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage("") + )) + )) + ), coordinatorNode); + + ShareCoordinatorMetadataCacheHelper cacheHelper = getDefaultCacheHelper(suppliedNode); + + PersisterStateManager stateManager = PersisterStateManagerBuilder.builder() + .withKafkaClient(client) + .withTimer(mockTimer) + .withCacheHelper(cacheHelper) + .build(); + + stateManager.start(); + + CompletableFuture future = new CompletableFuture<>(); + + PersisterStateManager.InitializeStateHandler handler = spy(stateManager.new InitializeStateHandler( + groupId, + topicId, + partition, + stateEpoch, + startOffset, + future, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + MAX_RPC_RETRY_ATTEMPTS + )); + + stateManager.enqueue(handler); + + CompletableFuture resultFuture = handler.result(); + + TestUtils.waitForCondition(resultFuture::isDone, TestUtils.DEFAULT_MAX_WAIT_MS, 10L, () -> "Failed to get result from future"); + + InitializeShareGroupStateResponse result = resultFuture.get(); + InitializeShareGroupStateResponseData.PartitionResult partitionResult = result.data().results().get(0).partitions().get(0); + + verify(handler, times(2)).findShareCoordinatorBuilder(); + verify(handler, times(0)).requestBuilder(); + + // Verifying the coordinator node was populated correctly by the FIND_COORDINATOR request + assertEquals(coordinatorNode, handler.getCoordinatorNode()); + + // Verifying the result returned is correct + assertEquals(partition, partitionResult.partition()); + assertEquals(Errors.NONE.code(), partitionResult.errorCode()); + + try { + // Stopping the state manager + stateManager.stop(); + } catch (Exception e) { + fail("Failed to stop state manager", e); + } + } + + @Test + public void testInitializeStateRequestCoordinatorFoundOnRetry() { + MockClient client = new MockClient(MOCK_TIME); + + String groupId = "group1"; + Uuid topicId = Uuid.randomUuid(); + int partition = 10; + int stateEpoch = 5; + long startOffset = 12; + + Node suppliedNode = new Node(0, HOST, PORT); + Node coordinatorNode = new Node(1, HOST, PORT); + + String coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId, topicId, partition); + + client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest + && ((FindCoordinatorRequest) body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() + && ((FindCoordinatorRequest) body).data().coordinatorKeys().get(0).equals(coordinatorKey), + new FindCoordinatorResponse( + new FindCoordinatorResponseData() + .setCoordinators(Collections.singletonList( + new FindCoordinatorResponseData.Coordinator() + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + )) + ), + suppliedNode + ); + + client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest + && ((FindCoordinatorRequest) body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() + && ((FindCoordinatorRequest) body).data().coordinatorKeys().get(0).equals(coordinatorKey), + new FindCoordinatorResponse( + new FindCoordinatorResponseData() + .setCoordinators(Collections.singletonList( + new FindCoordinatorResponseData.Coordinator() + .setNodeId(1) + .setHost(HOST) + .setPort(PORT) + .setErrorCode(Errors.NONE.code()) + )) + ), + suppliedNode + ); + + client.prepareResponseFrom(body -> { + InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest) body; + String requestGroupId = request.data().groupId(); + Uuid requestTopicId = request.data().topics().get(0).topicId(); + int requestPartition = request.data().topics().get(0).partitions().get(0).partition(); + + return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition; + }, new InitializeShareGroupStateResponse( + new InitializeShareGroupStateResponseData() + .setResults(List.of( + new InitializeShareGroupStateResponseData.InitializeStateResult() + .setTopicId(topicId) + .setPartitions(List.of( + new InitializeShareGroupStateResponseData.PartitionResult() + .setPartition(partition) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage("") + )) + )) + ), coordinatorNode); + + ShareCoordinatorMetadataCacheHelper cacheHelper = getDefaultCacheHelper(suppliedNode); + + PersisterStateManager stateManager = PersisterStateManagerBuilder.builder() + .withKafkaClient(client) + .withTimer(mockTimer) + .withCacheHelper(cacheHelper) + .build(); + + stateManager.start(); + + CompletableFuture future = new CompletableFuture<>(); + + PersisterStateManager.InitializeStateHandler handler = spy(stateManager.new InitializeStateHandler( + groupId, + topicId, + partition, + stateEpoch, + startOffset, + future, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + MAX_RPC_RETRY_ATTEMPTS + )); + + stateManager.enqueue(handler); + + CompletableFuture resultFuture = handler.result(); + + InitializeShareGroupStateResponse result = null; + try { + result = resultFuture.get(); + } catch (Exception e) { + fail("Failed to get result from future", e); + } + + InitializeShareGroupStateResponseData.PartitionResult partitionResult = result.data().results().get(0).partitions().get(0); + + verify(handler, times(2)).findShareCoordinatorBuilder(); + verify(handler, times(0)).requestBuilder(); + + // Verifying the coordinator node was populated correctly by the FIND_COORDINATOR request + assertEquals(coordinatorNode, handler.getCoordinatorNode()); + + // Verifying the result returned in correct + assertEquals(partition, partitionResult.partition()); + assertEquals(Errors.NONE.code(), partitionResult.errorCode()); + + try { + // Stopping the state manager + stateManager.stop(); + } catch (Exception e) { + fail("Failed to stop state manager", e); + } + } + + @Test + public void testInitializeStateRequestWithCoordinatorNodeLookup() { + MockClient client = new MockClient(MOCK_TIME); + + String groupId = "group1"; + Uuid topicId = Uuid.randomUuid(); + int partition = 10; + int stateEpoch = 5; + long startOffset = 10; + + Node coordinatorNode = new Node(1, HOST, PORT); + + client.prepareResponseFrom(body -> { + InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest) body; + String requestGroupId = request.data().groupId(); + Uuid requestTopicId = request.data().topics().get(0).topicId(); + int requestPartition = request.data().topics().get(0).partitions().get(0).partition(); + + return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition; + }, new InitializeShareGroupStateResponse( + new InitializeShareGroupStateResponseData() + .setResults(List.of( + new InitializeShareGroupStateResponseData.InitializeStateResult() + .setTopicId(topicId) + .setPartitions(List.of( + new InitializeShareGroupStateResponseData.PartitionResult() + .setPartition(partition) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage("") + )) + )) + ), coordinatorNode); + + ShareCoordinatorMetadataCacheHelper cacheHelper = getCoordinatorCacheHelper(coordinatorNode); + + PersisterStateManager stateManager = PersisterStateManagerBuilder.builder() + .withKafkaClient(client) + .withTimer(mockTimer) + .withCacheHelper(cacheHelper) + .build(); + + stateManager.start(); + + CompletableFuture future = new CompletableFuture<>(); + + PersisterStateManager.InitializeStateHandler handler = spy(stateManager.new InitializeStateHandler( + groupId, + topicId, + partition, + stateEpoch, + startOffset, + future, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + MAX_RPC_RETRY_ATTEMPTS + )); + + stateManager.enqueue(handler); + + CompletableFuture resultFuture = handler.result(); + + InitializeShareGroupStateResponse result = null; + try { + result = resultFuture.get(); + } catch (Exception e) { + fail("Failed to get result from future", e); + } + + InitializeShareGroupStateResponseData.PartitionResult partitionResult = result.data().results().get(0).partitions().get(0); + + verify(handler, times(0)).findShareCoordinatorBuilder(); + verify(handler, times(0)).requestBuilder(); + verify(handler, times(1)).onComplete(any()); + + // Verifying the coordinator node was populated correctly by the FIND_COORDINATOR request + assertEquals(coordinatorNode, handler.getCoordinatorNode()); + + // Verifying the result returned in correct + assertEquals(partition, partitionResult.partition()); + assertEquals(Errors.NONE.code(), partitionResult.errorCode()); + + try { + // Stopping the state manager + stateManager.stop(); + } catch (Exception e) { + fail("Failed to stop state manager", e); + } + } + + @Test + public void testInitializeStateRequestWithRetryAndCoordinatorNodeLookup() { + MockClient client = new MockClient(MOCK_TIME); + + String groupId = "group1"; + Uuid topicId = Uuid.randomUuid(); + int partition = 10; + int stateEpoch = 5; + long startOffset = 10; + + Node coordinatorNode = new Node(1, HOST, PORT); + + client.prepareResponseFrom(body -> { + InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest) body; + String requestGroupId = request.data().groupId(); + Uuid requestTopicId = request.data().topics().get(0).topicId(); + int requestPartition = request.data().topics().get(0).partitions().get(0).partition(); + + return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition; + }, new InitializeShareGroupStateResponse( + new InitializeShareGroupStateResponseData() + .setResults(List.of( + new InitializeShareGroupStateResponseData.InitializeStateResult() + .setTopicId(topicId) + .setPartitions(List.of( + new InitializeShareGroupStateResponseData.PartitionResult() + .setPartition(partition) + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) + .setErrorMessage("") + )) + )) + ), coordinatorNode); + + client.prepareResponseFrom(body -> { + InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest) body; + String requestGroupId = request.data().groupId(); + Uuid requestTopicId = request.data().topics().get(0).topicId(); + int requestPartition = request.data().topics().get(0).partitions().get(0).partition(); + + return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition; + }, new InitializeShareGroupStateResponse( + new InitializeShareGroupStateResponseData() + .setResults(List.of( + new InitializeShareGroupStateResponseData.InitializeStateResult() + .setTopicId(topicId) + .setPartitions(List.of( + new InitializeShareGroupStateResponseData.PartitionResult() + .setPartition(partition) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage("") + )) + )) + ), coordinatorNode); + + ShareCoordinatorMetadataCacheHelper cacheHelper = getCoordinatorCacheHelper(coordinatorNode); + + PersisterStateManager stateManager = PersisterStateManagerBuilder.builder() + .withKafkaClient(client) + .withTimer(mockTimer) + .withCacheHelper(cacheHelper) + .build(); + + stateManager.start(); + + CompletableFuture future = new CompletableFuture<>(); + + PersisterStateManager.InitializeStateHandler handler = spy(stateManager.new InitializeStateHandler( + groupId, + topicId, + partition, + stateEpoch, + startOffset, + future, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + MAX_RPC_RETRY_ATTEMPTS + )); + + stateManager.enqueue(handler); + + CompletableFuture resultFuture = handler.result(); + + InitializeShareGroupStateResponse result = null; + try { + result = resultFuture.get(); + } catch (Exception e) { + fail("Failed to get result from future", e); + } + + InitializeShareGroupStateResponseData.PartitionResult partitionResult = result.data().results().get(0).partitions().get(0); + + verify(handler, times(0)).findShareCoordinatorBuilder(); + verify(handler, times(0)).requestBuilder(); + verify(handler, times(2)).onComplete(any()); + + // Verifying the coordinator node was populated correctly by the FIND_COORDINATOR request + assertEquals(coordinatorNode, handler.getCoordinatorNode()); + + // Verifying the result returned in correct + assertEquals(partition, partitionResult.partition()); + assertEquals(Errors.NONE.code(), partitionResult.errorCode()); + + try { + // Stopping the state manager + stateManager.stop(); + } catch (Exception e) { + fail("Failed to stop state manager", e); + } + } + + @Test + public void testInitializeStateRequestFailedMaxRetriesExhausted() { + MockClient client = new MockClient(MOCK_TIME); + + String groupId = "group1"; + Uuid topicId = Uuid.randomUuid(); + int partition = 10; + int stateEpoch = 5; + long startOffset = 10; + + Node coordinatorNode = new Node(1, HOST, PORT); + + client.prepareResponseFrom(body -> { + InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest) body; + String requestGroupId = request.data().groupId(); + Uuid requestTopicId = request.data().topics().get(0).topicId(); + int requestPartition = request.data().topics().get(0).partitions().get(0).partition(); + + return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition; + }, new InitializeShareGroupStateResponse( + new InitializeShareGroupStateResponseData() + .setResults(List.of( + new InitializeShareGroupStateResponseData.InitializeStateResult() + .setTopicId(topicId) + .setPartitions(List.of( + new InitializeShareGroupStateResponseData.PartitionResult() + .setPartition(partition) + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) + .setErrorMessage("") + )) + )) + ), coordinatorNode); + + client.prepareResponseFrom(body -> { + InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest) body; + String requestGroupId = request.data().groupId(); + Uuid requestTopicId = request.data().topics().get(0).topicId(); + int requestPartition = request.data().topics().get(0).partitions().get(0).partition(); + + return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition; + }, new InitializeShareGroupStateResponse( + new InitializeShareGroupStateResponseData() + .setResults(List.of( + new InitializeShareGroupStateResponseData.InitializeStateResult() + .setTopicId(topicId) + .setPartitions(List.of( + new InitializeShareGroupStateResponseData.PartitionResult() + .setPartition(partition) + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) + .setErrorMessage("") + )) + )) + ), coordinatorNode); + + client.prepareResponseFrom(body -> { + InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest) body; + String requestGroupId = request.data().groupId(); + Uuid requestTopicId = request.data().topics().get(0).topicId(); + int requestPartition = request.data().topics().get(0).partitions().get(0).partition(); + + return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition; + }, new InitializeShareGroupStateResponse( + new InitializeShareGroupStateResponseData() + .setResults(List.of( + new InitializeShareGroupStateResponseData.InitializeStateResult() + .setTopicId(topicId) + .setPartitions(List.of( + new InitializeShareGroupStateResponseData.PartitionResult() + .setPartition(partition) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage("") + )) + )) + ), coordinatorNode); + + ShareCoordinatorMetadataCacheHelper cacheHelper = getCoordinatorCacheHelper(coordinatorNode); + + PersisterStateManager stateManager = PersisterStateManagerBuilder.builder() + .withKafkaClient(client) + .withTimer(mockTimer) + .withCacheHelper(cacheHelper) + .build(); + + stateManager.start(); + + CompletableFuture future = new CompletableFuture<>(); + + PersisterStateManager.InitializeStateHandler handler = spy(stateManager.new InitializeStateHandler( + groupId, + topicId, + partition, + stateEpoch, + startOffset, + future, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + 2 + )); + + stateManager.enqueue(handler); + + CompletableFuture resultFuture = handler.result(); + + InitializeShareGroupStateResponse result = null; + try { + result = resultFuture.get(); + } catch (Exception e) { + fail("Failed to get result from future", e); + } + + InitializeShareGroupStateResponseData.PartitionResult partitionResult = result.data().results().get(0).partitions().get(0); + + verify(handler, times(0)).findShareCoordinatorBuilder(); + verify(handler, times(0)).requestBuilder(); + verify(handler, times(2)).onComplete(any()); + + // Verifying the coordinator node was populated correctly by the FIND_COORDINATOR request + assertEquals(coordinatorNode, handler.getCoordinatorNode()); + + // Verifying the result returned in correct + assertEquals(partition, partitionResult.partition()); + assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS.code(), partitionResult.errorCode()); + + try { + // Stopping the state manager + stateManager.stop(); + } catch (Exception e) { + fail("Failed to stop state manager", e); + } + } + + @Test + public void testInitializeStateRequestBatchingWithCoordinatorNodeLookup() throws Exception { + MockClient client = new MockClient(MOCK_TIME); + + String groupId = "group1"; + Uuid topicId = Uuid.randomUuid(); + int partition = 10; + int stateEpoch = 5; + long startOffset = 10; + + Node coordinatorNode = new Node(1, HOST, PORT); + + client.prepareResponseFrom(body -> { + InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest) body; + String requestGroupId = request.data().groupId(); + Uuid requestTopicId = request.data().topics().get(0).topicId(); + int requestPartition = request.data().topics().get(0).partitions().get(0).partition(); + + return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition; + }, new InitializeShareGroupStateResponse( + new InitializeShareGroupStateResponseData() + .setResults(List.of( + new InitializeShareGroupStateResponseData.InitializeStateResult() + .setTopicId(topicId) + .setPartitions(List.of( + new InitializeShareGroupStateResponseData.PartitionResult() + .setPartition(partition) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage("") + )) + )) + ), coordinatorNode); + + ShareCoordinatorMetadataCacheHelper cacheHelper = getCoordinatorCacheHelper(coordinatorNode); + + PersisterStateManager stateManager = PersisterStateManagerBuilder.builder() + .withKafkaClient(client) + .withTimer(mockTimer) + .withCacheHelper(cacheHelper) + .build(); + + AtomicBoolean isBatchingSuccess = new AtomicBoolean(false); + stateManager.setGenerateCallback(() -> { + Map>> handlersPerType = stateManager.nodeRPCMap().get(coordinatorNode); + if (handlersPerType != null && handlersPerType.containsKey(PersisterStateManager.RPCType.INITIALIZE) && handlersPerType.get(PersisterStateManager.RPCType.INITIALIZE).containsKey(groupId)) { + if (handlersPerType.get(PersisterStateManager.RPCType.INITIALIZE).get(groupId).size() > 2) + isBatchingSuccess.set(true); + } + }); + + stateManager.start(); + + CompletableFuture future = new CompletableFuture<>(); + + List handlers = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + PersisterStateManager.InitializeStateHandler handler = spy(stateManager.new InitializeStateHandler( + groupId, + topicId, + partition, + stateEpoch, + startOffset, + future, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + MAX_RPC_RETRY_ATTEMPTS + )); + handlers.add(handler); + stateManager.enqueue(handler); + } + + CompletableFuture.allOf(handlers.stream() + .map(PersisterStateManager.InitializeStateHandler::result).toArray(CompletableFuture[]::new)).get(); + + TestUtils.waitForCondition(isBatchingSuccess::get, TestUtils.DEFAULT_MAX_WAIT_MS, 10L, () -> "unable to verify batching"); + } + @Test public void testPersisterStateManagerClose() { KafkaClient client = mock(KafkaClient.class);