KAFKA-18827: Initialize share group state persister impl [2/N]. (#18992)

* In this PR, we have provided implementation for the initialize share
group state RPC from the persister perspective.
* Tests have been added wherever applicable.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Sushant Mahajan 2025-02-23 13:33:13 +05:30 committed by GitHub
parent a1372ced69
commit 6e76736890
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 1379 additions and 49 deletions

View File

@ -20,6 +20,7 @@ package org.apache.kafka.server.share.persister;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DeleteShareGroupStateResponse; import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
import org.apache.kafka.common.requests.InitializeShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse; import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse; import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
import org.apache.kafka.common.requests.WriteShareGroupStateResponse; import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
@ -33,7 +34,6 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/** /**
* The default implementation of the {@link Persister} interface which is used by the * The default implementation of the {@link Persister} interface which is used by the
@ -70,7 +70,48 @@ public class DefaultStatePersister implements Persister {
* @return A completable future of InitializeShareGroupStateResult * @return A completable future of InitializeShareGroupStateResult
*/ */
public CompletableFuture<InitializeShareGroupStateResult> initializeState(InitializeShareGroupStateParameters request) { public CompletableFuture<InitializeShareGroupStateResult> initializeState(InitializeShareGroupStateParameters request) {
throw new RuntimeException("not implemented"); try {
validate(request);
} catch (Exception e) {
log.error("Unable to validate initialize state request", e);
return CompletableFuture.failedFuture(e);
}
GroupTopicPartitionData<PartitionStateData> gtp = request.groupTopicPartitionData();
String groupId = gtp.groupId();
Map<Uuid, Map<Integer, CompletableFuture<InitializeShareGroupStateResponse>>> futureMap = new HashMap<>();
List<PersisterStateManager.InitializeStateHandler> handlers = new ArrayList<>();
gtp.topicsData().forEach(topicData -> {
topicData.partitions().forEach(partitionData -> {
CompletableFuture<InitializeShareGroupStateResponse> future = futureMap
.computeIfAbsent(topicData.topicId(), k -> new HashMap<>())
.computeIfAbsent(partitionData.partition(), k -> new CompletableFuture<>());
handlers.add(
stateManager.new InitializeStateHandler(
groupId,
topicData.topicId(),
partitionData.partition(),
partitionData.stateEpoch(),
partitionData.startOffset(),
future,
null
)
);
});
});
for (PersisterStateManager.PersisterStateManagerHandler handler : handlers) {
stateManager.enqueue(handler);
}
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
handlers.stream()
.map(PersisterStateManager.InitializeStateHandler::result)
.toArray(CompletableFuture[]::new));
return combinedFuture.thenApply(v -> initializeResponsesToResult(futureMap));
} }
/** /**
@ -125,6 +166,51 @@ public class DefaultStatePersister implements Persister {
return combinedFuture.thenApply(v -> writeResponsesToResult(futureMap)); return combinedFuture.thenApply(v -> writeResponsesToResult(futureMap));
} }
/**
* Takes in a list of COMPLETED futures and combines the results,
* taking care of errors if any, into a single InitializeShareGroupStateResult
*
* @param futureMap - HashMap of {topic -> {partition -> future}}
* @return Object representing combined result of type InitializeShareGroupStateResult
*/
// visible for testing
InitializeShareGroupStateResult initializeResponsesToResult(
Map<Uuid, Map<Integer, CompletableFuture<InitializeShareGroupStateResponse>>> futureMap
) {
List<TopicData<PartitionErrorData>> topicsData = futureMap.keySet().stream()
.map(topicId -> {
List<PartitionErrorData> partitionErrData = futureMap.get(topicId).entrySet().stream()
.map(partitionFuture -> {
int partition = partitionFuture.getKey();
CompletableFuture<InitializeShareGroupStateResponse> future = partitionFuture.getValue();
try {
// already completed because of allOf application in the caller
InitializeShareGroupStateResponse partitionResponse = future.join();
return partitionResponse.data().results().get(0).partitions().stream()
.map(partitionResult -> PartitionFactory.newPartitionErrorData(
partitionResult.partition(),
partitionResult.errorCode(),
partitionResult.errorMessage()))
.toList();
} catch (Exception e) {
log.error("Unexpected exception while initializing data in share coordinator", e);
return List.of(PartitionFactory.newPartitionErrorData(
partition,
Errors.UNKNOWN_SERVER_ERROR.code(), // No specific public error code exists for InterruptedException / ExecutionException
"Error initializing state in share coordinator: " + e.getMessage())
);
}
})
.flatMap(List::stream)
.toList();
return new TopicData<>(topicId, partitionErrData);
})
.toList();
return new InitializeShareGroupStateResult.Builder()
.setTopicsData(topicsData)
.build();
}
/** /**
* Takes in a list of COMPLETED futures and combines the results, * Takes in a list of COMPLETED futures and combines the results,
* taking care of errors if any, into a single WriteShareGroupStateResult * taking care of errors if any, into a single WriteShareGroupStateResult
@ -150,10 +236,10 @@ public class DefaultStatePersister implements Persister {
partitionResult.partition(), partitionResult.partition(),
partitionResult.errorCode(), partitionResult.errorCode(),
partitionResult.errorMessage())) partitionResult.errorMessage()))
.collect(Collectors.toList()); .toList();
} catch (Exception e) { } catch (Exception e) {
log.error("Unexpected exception while writing data to share coordinator", e); log.error("Unexpected exception while writing data to share coordinator", e);
return Collections.singletonList(PartitionFactory.newPartitionErrorData( return List.of(PartitionFactory.newPartitionErrorData(
partition, partition,
Errors.UNKNOWN_SERVER_ERROR.code(), // No specific public error code exists for InterruptedException / ExecutionException Errors.UNKNOWN_SERVER_ERROR.code(), // No specific public error code exists for InterruptedException / ExecutionException
"Error writing state to share coordinator: " + e.getMessage()) "Error writing state to share coordinator: " + e.getMessage())
@ -161,10 +247,10 @@ public class DefaultStatePersister implements Persister {
} }
}) })
.flatMap(List::stream) .flatMap(List::stream)
.collect(Collectors.toList()); .toList();
return new TopicData<>(topicId, partitionErrData); return new TopicData<>(topicId, partitionErrData);
}) })
.collect(Collectors.toList()); .toList();
return new WriteShareGroupStateResult.Builder() return new WriteShareGroupStateResult.Builder()
.setTopicsData(topicsData) .setTopicsData(topicsData)
.build(); .build();
@ -248,12 +334,12 @@ public class DefaultStatePersister implements Persister {
partitionResult.startOffset(), partitionResult.startOffset(),
partitionResult.errorCode(), partitionResult.errorCode(),
partitionResult.errorMessage(), partitionResult.errorMessage(),
partitionResult.stateBatches().stream().map(PersisterStateBatch::from).collect(Collectors.toList()) partitionResult.stateBatches().stream().map(PersisterStateBatch::from).toList()
)) ))
.collect(Collectors.toList()); .toList();
} catch (Exception e) { } catch (Exception e) {
log.error("Unexpected exception while getting data from share coordinator", e); log.error("Unexpected exception while getting data from share coordinator", e);
return Collections.singletonList(PartitionFactory.newPartitionAllData( return List.of(PartitionFactory.newPartitionAllData(
partition, partition,
-1, -1,
-1, -1,
@ -264,10 +350,10 @@ public class DefaultStatePersister implements Persister {
} }
}) })
.flatMap(List::stream) .flatMap(List::stream)
.collect(Collectors.toList()); .toList();
return new TopicData<>(topicId, partitionAllData); return new TopicData<>(topicId, partitionAllData);
}) })
.collect(Collectors.toList()); .toList();
return new ReadShareGroupStateResult.Builder() return new ReadShareGroupStateResult.Builder()
.setTopicsData(topicsData) .setTopicsData(topicsData)
.build(); .build();
@ -403,10 +489,10 @@ public class DefaultStatePersister implements Persister {
partitionResult.startOffset(), partitionResult.startOffset(),
partitionResult.errorCode(), partitionResult.errorCode(),
partitionResult.errorMessage())) partitionResult.errorMessage()))
.collect(Collectors.toList()); .toList();
} catch (Exception e) { } catch (Exception e) {
log.error("Unexpected exception while getting data from share coordinator", e); log.error("Unexpected exception while getting data from share coordinator", e);
return Collections.singletonList(PartitionFactory.newPartitionStateSummaryData( return List.of(PartitionFactory.newPartitionStateSummaryData(
partition, partition,
-1, -1,
-1, -1,
@ -415,10 +501,10 @@ public class DefaultStatePersister implements Persister {
} }
}) })
.flatMap(List::stream) .flatMap(List::stream)
.collect(Collectors.toList()); .toList();
return new TopicData<>(topicId, partitionStateErrorData); return new TopicData<>(topicId, partitionStateErrorData);
}) })
.collect(Collectors.toList()); .toList();
return new ReadShareGroupStateSummaryResult.Builder() return new ReadShareGroupStateSummaryResult.Builder()
.setTopicsData(topicsData) .setTopicsData(topicsData)
.build(); .build();
@ -464,15 +550,27 @@ public class DefaultStatePersister implements Persister {
} }
}) })
.flatMap(List::stream) .flatMap(List::stream)
.collect(Collectors.toList()); .toList();
return new TopicData<>(topicId, partitionErrorData); return new TopicData<>(topicId, partitionErrorData);
}) })
.collect(Collectors.toList()); .toList();
return new DeleteShareGroupStateResult.Builder() return new DeleteShareGroupStateResult.Builder()
.setTopicsData(topicsData) .setTopicsData(topicsData)
.build(); .build();
} }
private static void validate(InitializeShareGroupStateParameters params) {
String prefix = "Initialize share group parameters";
if (params == null) {
throw new IllegalArgumentException(prefix + " cannot be null.");
}
if (params.groupTopicPartitionData() == null) {
throw new IllegalArgumentException(prefix + " data cannot be null.");
}
validateGroupTopicPartitionData(prefix, params.groupTopicPartitionData());
}
private static void validate(WriteShareGroupStateParameters params) { private static void validate(WriteShareGroupStateParameters params) {
String prefix = "Write share group parameters"; String prefix = "Write share group parameters";
if (params == null) { if (params == null) {

View File

@ -29,6 +29,8 @@ import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData; import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData; import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData; import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData; import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@ -43,6 +45,8 @@ import org.apache.kafka.common.requests.DeleteShareGroupStateRequest;
import org.apache.kafka.common.requests.DeleteShareGroupStateResponse; import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitializeShareGroupStateRequest;
import org.apache.kafka.common.requests.InitializeShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateRequest; import org.apache.kafka.common.requests.ReadShareGroupStateRequest;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse; import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateSummaryRequest; import org.apache.kafka.common.requests.ReadShareGroupStateSummaryRequest;
@ -147,6 +151,7 @@ public class PersisterStateManager {
} }
public enum RPCType { public enum RPCType {
INITIALIZE,
READ, READ,
WRITE, WRITE,
DELETE, DELETE,
@ -483,6 +488,154 @@ public class PersisterStateManager {
} }
} }
public class InitializeStateHandler extends PersisterStateManagerHandler {
private final int stateEpoch;
private final long startOffset;
private final CompletableFuture<InitializeShareGroupStateResponse> result;
private final BackoffManager initializeStateBackoff;
public InitializeStateHandler(
String groupId,
Uuid topicId,
int partition,
int stateEpoch,
long startOffset,
CompletableFuture<InitializeShareGroupStateResponse> result,
long backoffMs,
long backoffMaxMs,
int maxRPCRetryAttempts
) {
super(groupId, topicId, partition, backoffMs, backoffMaxMs, maxRPCRetryAttempts);
this.stateEpoch = stateEpoch;
this.startOffset = startOffset;
this.result = result;
this.initializeStateBackoff = new BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs);
}
public InitializeStateHandler(
String groupId,
Uuid topicId,
int partition,
int stateEpoch,
long startOffset,
CompletableFuture<InitializeShareGroupStateResponse> result,
Consumer<ClientResponse> onCompleteCallback
) {
this(
groupId,
topicId,
partition,
stateEpoch,
startOffset,
result,
REQUEST_BACKOFF_MS,
REQUEST_BACKOFF_MAX_MS,
MAX_FIND_COORD_ATTEMPTS
);
}
@Override
protected String name() {
return "InitializeStateHandler";
}
@Override
protected AbstractRequest.Builder<? extends AbstractRequest> requestBuilder() {
throw new RuntimeException("Initialize requests are batchable, hence individual requests not needed.");
}
@Override
protected boolean isResponseForRequest(ClientResponse response) {
return response.requestHeader().apiKey() == ApiKeys.INITIALIZE_SHARE_GROUP_STATE;
}
@Override
protected void handleRequestResponse(ClientResponse response) {
log.debug("Initialize state response received - {}", response);
initializeStateBackoff.incrementAttempt();
// response can be a combined one for large number of requests
// we need to deconstruct it
InitializeShareGroupStateResponse combinedResponse = (InitializeShareGroupStateResponse) response.responseBody();
for (InitializeShareGroupStateResponseData.InitializeStateResult initializeStateResult : combinedResponse.data().results()) {
if (initializeStateResult.topicId().equals(partitionKey().topicId())) {
Optional<InitializeShareGroupStateResponseData.PartitionResult> partitionStateData =
initializeStateResult.partitions().stream().filter(partitionResult -> partitionResult.partition() == partitionKey().partition())
.findFirst();
if (partitionStateData.isPresent()) {
Errors error = Errors.forCode(partitionStateData.get().errorCode());
switch (error) {
case NONE:
initializeStateBackoff.resetAttempts();
InitializeShareGroupStateResponseData.InitializeStateResult result = InitializeShareGroupStateResponse.toResponseInitializeStateResult(
partitionKey().topicId(),
List.of(partitionStateData.get())
);
this.result.complete(new InitializeShareGroupStateResponse(
new InitializeShareGroupStateResponseData().setResults(List.of(result))));
return;
// check retriable errors
case COORDINATOR_NOT_AVAILABLE:
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
log.warn("Received retriable error in initialize state RPC for key {}: {}", partitionKey(), error.message());
if (!initializeStateBackoff.canAttempt()) {
log.error("Exhausted max retries for initialize state RPC for key {} without success.", partitionKey());
requestErrorResponse(error, new Exception("Exhausted max retries to complete initialize state RPC without success."));
return;
}
super.resetCoordinatorNode();
timer.add(new PersisterTimerTask(initializeStateBackoff.backOff(), this));
return;
default:
log.error("Unable to perform initialize state RPC for key {}: {}", partitionKey(), error.message());
requestErrorResponse(error, null);
return;
}
}
}
}
// no response found specific topic partition
IllegalStateException exception = new IllegalStateException(
"Failed to initialize state for share partition: " + partitionKey()
);
requestErrorResponse(Errors.forException(exception), exception);
}
@Override
public void requestErrorResponse(Errors error, Exception exception) {
this.result.complete(new InitializeShareGroupStateResponse(
InitializeShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), error, "Error in initialize state RPC. " +
(exception == null ? error.message() : exception.getMessage()))));
}
@Override
protected void findCoordinatorErrorResponse(Errors error, Exception exception) {
this.result.complete(new InitializeShareGroupStateResponse(
InitializeShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), error, "Error in find coordinator. " +
(exception == null ? error.message() : exception.getMessage()))));
}
protected CompletableFuture<InitializeShareGroupStateResponse> result() {
return result;
}
@Override
protected boolean isBatchable() {
return true;
}
@Override
protected RPCType rpcType() {
return RPCType.INITIALIZE;
}
}
public class WriteStateHandler extends PersisterStateManagerHandler { public class WriteStateHandler extends PersisterStateManagerHandler {
private final int stateEpoch; private final int stateEpoch;
private final int leaderEpoch; private final int leaderEpoch;
@ -576,10 +729,10 @@ public class PersisterStateManager {
writeStateBackoff.resetAttempts(); writeStateBackoff.resetAttempts();
WriteShareGroupStateResponseData.WriteStateResult result = WriteShareGroupStateResponse.toResponseWriteStateResult( WriteShareGroupStateResponseData.WriteStateResult result = WriteShareGroupStateResponse.toResponseWriteStateResult(
partitionKey().topicId(), partitionKey().topicId(),
Collections.singletonList(partitionStateData.get()) List.of(partitionStateData.get())
); );
this.result.complete(new WriteShareGroupStateResponse( this.result.complete(new WriteShareGroupStateResponse(
new WriteShareGroupStateResponseData().setResults(Collections.singletonList(result)))); new WriteShareGroupStateResponseData().setResults(List.of(result))));
return; return;
// check retriable errors // check retriable errors
@ -718,10 +871,10 @@ public class PersisterStateManager {
readStateBackoff.resetAttempts(); readStateBackoff.resetAttempts();
ReadShareGroupStateResponseData.ReadStateResult result = ReadShareGroupStateResponse.toResponseReadStateResult( ReadShareGroupStateResponseData.ReadStateResult result = ReadShareGroupStateResponse.toResponseReadStateResult(
partitionKey().topicId(), partitionKey().topicId(),
Collections.singletonList(partitionStateData.get()) List.of(partitionStateData.get())
); );
this.result.complete(new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData() this.result.complete(new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData()
.setResults(Collections.singletonList(result)))); .setResults(List.of(result))));
return; return;
// check retriable errors // check retriable errors
@ -860,10 +1013,10 @@ public class PersisterStateManager {
readStateSummaryBackoff.resetAttempts(); readStateSummaryBackoff.resetAttempts();
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult result = ReadShareGroupStateSummaryResponse.toResponseReadStateSummaryResult( ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult result = ReadShareGroupStateSummaryResponse.toResponseReadStateSummaryResult(
partitionKey().topicId(), partitionKey().topicId(),
Collections.singletonList(partitionStateData.get()) List.of(partitionStateData.get())
); );
this.result.complete(new ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData() this.result.complete(new ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData()
.setResults(Collections.singletonList(result)))); .setResults(List.of(result))));
return; return;
// check retriable errors // check retriable errors
@ -1126,7 +1279,7 @@ public class PersisterStateManager {
return Collections.emptyList(); return Collections.emptyList();
} }
log.debug("Sending find coordinator RPC"); log.debug("Sending find coordinator RPC");
return Collections.singletonList(new RequestAndCompletionHandler( return List.of(new RequestAndCompletionHandler(
time.milliseconds(), time.milliseconds(),
randomNode, randomNode,
handler.findShareCoordinatorBuilder(), handler.findShareCoordinatorBuilder(),
@ -1151,7 +1304,7 @@ public class PersisterStateManager {
// write: [w1, w2], // write: [w1, w2],
// read: [r1, r2], // read: [r1, r2],
// delete: [d1], // delete: [d1],
// summary: [s1] // summary: [s1],
// } // }
// group2: { // group2: {
// write: [w3, w4] // write: [w3, w4]
@ -1241,18 +1394,14 @@ public class PersisterStateManager {
*/ */
private static class RequestCoalescerHelper { private static class RequestCoalescerHelper {
public static AbstractRequest.Builder<? extends AbstractRequest> coalesceRequests(String groupId, RPCType rpcType, List<? extends PersisterStateManagerHandler> handlers) { public static AbstractRequest.Builder<? extends AbstractRequest> coalesceRequests(String groupId, RPCType rpcType, List<? extends PersisterStateManagerHandler> handlers) {
switch (rpcType) { return switch (rpcType) {
case WRITE: case WRITE -> coalesceWrites(groupId, handlers);
return coalesceWrites(groupId, handlers); case READ -> coalesceReads(groupId, handlers);
case READ: case SUMMARY -> coalesceReadSummaries(groupId, handlers);
return coalesceReads(groupId, handlers); case DELETE -> coalesceDeletes(groupId, handlers);
case SUMMARY: case INITIALIZE -> coalesceInitializations(groupId, handlers);
return coalesceReadSummaries(groupId, handlers); default -> throw new RuntimeException("Unknown rpc type: " + rpcType);
case DELETE: };
return coalesceDeletes(groupId, handlers);
default:
throw new RuntimeException("Unknown rpc type: " + rpcType);
}
} }
private static AbstractRequest.Builder<? extends AbstractRequest> coalesceWrites(String groupId, List<? extends PersisterStateManagerHandler> handlers) { private static AbstractRequest.Builder<? extends AbstractRequest> coalesceWrites(String groupId, List<? extends PersisterStateManagerHandler> handlers) {
@ -1352,5 +1501,28 @@ public class PersisterStateManager {
.setPartitions(entry.getValue())) .setPartitions(entry.getValue()))
.toList())); .toList()));
} }
private static AbstractRequest.Builder<? extends AbstractRequest> coalesceInitializations(String groupId, List<? extends PersisterStateManagerHandler> handlers) {
Map<Uuid, List<InitializeShareGroupStateRequestData.PartitionData>> partitionData = new HashMap<>();
handlers.forEach(persHandler -> {
assert persHandler instanceof InitializeStateHandler;
InitializeStateHandler handler = (InitializeStateHandler) persHandler;
partitionData.computeIfAbsent(handler.partitionKey().topicId(), topicId -> new LinkedList<>())
.add(
new InitializeShareGroupStateRequestData.PartitionData()
.setPartition(handler.partitionKey().partition())
.setStateEpoch(handler.stateEpoch)
.setStartOffset(handler.startOffset)
);
});
return new InitializeShareGroupStateRequest.Builder(new InitializeShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(partitionData.entrySet().stream()
.map(entry -> new InitializeShareGroupStateRequestData.InitializeStateData()
.setTopicId(entry.getKey())
.setPartitions(entry.getValue()))
.toList()));
}
} }
} }

View File

@ -24,6 +24,7 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData; import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData; import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData; import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData; import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@ -33,6 +34,8 @@ import org.apache.kafka.common.requests.DeleteShareGroupStateRequest;
import org.apache.kafka.common.requests.DeleteShareGroupStateResponse; import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitializeShareGroupStateRequest;
import org.apache.kafka.common.requests.InitializeShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateRequest; import org.apache.kafka.common.requests.ReadShareGroupStateRequest;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse; import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateSummaryRequest; import org.apache.kafka.common.requests.ReadShareGroupStateSummaryRequest;
@ -401,8 +404,8 @@ class DefaultStatePersisterTest {
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdData>() .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdData>()
.setGroupId(groupId) .setGroupId(groupId)
.setTopicsData(List.of(new TopicData<>(null, .setTopicsData(List.of(new TopicData<>(null,
List.of(PartitionFactory.newPartitionStateBatchData( List.of(PartitionFactory.newPartitionIdData(
partition, 1, 0, 0, null))))).build()).build()); partition))))).build()).build());
assertTrue(result.isDone()); assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally()); assertTrue(result.isCompletedExceptionally());
assertFutureThrows(IllegalArgumentException.class, result); assertFutureThrows(IllegalArgumentException.class, result);
@ -430,6 +433,81 @@ class DefaultStatePersisterTest {
assertFutureThrows(IllegalArgumentException.class, result); assertFutureThrows(IllegalArgumentException.class, result);
} }
@Test
public void testInitializeStateValidate() {
String groupId = "group1";
Uuid topicId = Uuid.randomUuid();
int partition = 0;
int incorrectPartition = -1;
// Request Parameters are null
DefaultStatePersister defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
CompletableFuture<InitializeShareGroupStateResult> result = defaultStatePersister.initializeState(null);
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(IllegalArgumentException.class, result);
// groupTopicPartitionData is null
defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
result = defaultStatePersister.initializeState(new InitializeShareGroupStateParameters.Builder().setGroupTopicPartitionData(null).build());
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(IllegalArgumentException.class, result);
// groupId is null
defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
result = defaultStatePersister.initializeState(new InitializeShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionStateData>()
.setGroupId(null).build()).build());
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(IllegalArgumentException.class, result);
// topicsData is empty
defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
result = defaultStatePersister.initializeState(new InitializeShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionStateData>()
.setGroupId(groupId)
.setTopicsData(List.of()).build()).build());
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(IllegalArgumentException.class, result);
// topicId is null
defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
result = defaultStatePersister.initializeState(new InitializeShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionStateData>()
.setGroupId(groupId)
.setTopicsData(List.of(new TopicData<>(null,
List.of(PartitionFactory.newPartitionStateData(
partition, 1, 0))))).build()).build());
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(IllegalArgumentException.class, result);
// partitionData is empty
defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
result = defaultStatePersister.initializeState(new InitializeShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionStateData>()
.setGroupId(groupId)
.setTopicsData(List.of(new TopicData<>(topicId, List.of()))).build()).build());
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(IllegalArgumentException.class, result);
// partition value is incorrect
defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
result = defaultStatePersister.initializeState(new InitializeShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionStateData>()
.setGroupId(groupId)
.setTopicsData(List.of(new TopicData<>(topicId,
List.of(PartitionFactory.newPartitionStateData(
incorrectPartition, 0, 0))))).build()).build());
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(IllegalArgumentException.class, result);
}
@Test @Test
public void testWriteStateSuccess() { public void testWriteStateSuccess() {
@ -996,6 +1074,143 @@ class DefaultStatePersisterTest {
assertEquals(expectedResultMap, resultMap); assertEquals(expectedResultMap, resultMap);
} }
@Test
public void testInitializeStateSuccess() {
MockClient client = new MockClient(MOCK_TIME);
String groupId = "group1";
Uuid topicId1 = Uuid.randomUuid();
int partition1 = 10;
int stateEpoch1 = 1;
long startOffset1 = 10;
Uuid topicId2 = Uuid.randomUuid();
int partition2 = 8;
int stateEpoch2 = 1;
long startOffset2 = 5;
Node suppliedNode = new Node(0, HOST, PORT);
Node coordinatorNode1 = new Node(5, HOST, PORT);
Node coordinatorNode2 = new Node(6, HOST, PORT);
String coordinatorKey1 = SharePartitionKey.asCoordinatorKey(groupId, topicId1, partition1);
String coordinatorKey2 = SharePartitionKey.asCoordinatorKey(groupId, topicId2, partition2);
client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest
&& ((FindCoordinatorRequest) body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id()
&& ((FindCoordinatorRequest) body).data().coordinatorKeys().get(0).equals(coordinatorKey1),
new FindCoordinatorResponse(
new FindCoordinatorResponseData()
.setCoordinators(List.of(
new FindCoordinatorResponseData.Coordinator()
.setNodeId(coordinatorNode1.id())
.setHost(HOST)
.setPort(PORT)
.setErrorCode(Errors.NONE.code())
))
),
suppliedNode
);
client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest
&& ((FindCoordinatorRequest) body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id()
&& ((FindCoordinatorRequest) body).data().coordinatorKeys().get(0).equals(coordinatorKey2),
new FindCoordinatorResponse(
new FindCoordinatorResponseData()
.setCoordinators(List.of(
new FindCoordinatorResponseData.Coordinator()
.setNodeId(coordinatorNode2.id())
.setHost(HOST)
.setPort(PORT)
.setErrorCode(Errors.NONE.code())
))
),
suppliedNode
);
client.prepareResponseFrom(
body -> {
InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest) body;
String requestGroupId = request.data().groupId();
Uuid requestTopicId = request.data().topics().get(0).topicId();
int requestPartition = request.data().topics().get(0).partitions().get(0).partition();
return requestGroupId.equals(groupId) && requestTopicId == topicId1 && requestPartition == partition1;
},
new InitializeShareGroupStateResponse(InitializeShareGroupStateResponse.toResponseData(topicId1, partition1)),
coordinatorNode1
);
client.prepareResponseFrom(
body -> {
InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest) body;
String requestGroupId = request.data().groupId();
Uuid requestTopicId = request.data().topics().get(0).topicId();
int requestPartition = request.data().topics().get(0).partitions().get(0).partition();
return requestGroupId.equals(groupId) && requestTopicId == topicId2 && requestPartition == partition2;
},
new InitializeShareGroupStateResponse(InitializeShareGroupStateResponse.toResponseData(topicId2, partition2)),
coordinatorNode2
);
ShareCoordinatorMetadataCacheHelper cacheHelper = getDefaultCacheHelper(suppliedNode);
DefaultStatePersister defaultStatePersister = DefaultStatePersisterBuilder.builder()
.withKafkaClient(client)
.withCacheHelper(cacheHelper)
.build();
InitializeShareGroupStateParameters request = InitializeShareGroupStateParameters.from(
new InitializeShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new InitializeShareGroupStateRequestData.InitializeStateData()
.setTopicId(topicId1)
.setPartitions(List.of(
new InitializeShareGroupStateRequestData.PartitionData()
.setPartition(partition1)
.setStateEpoch(stateEpoch1)
.setStartOffset(startOffset1)
)),
new InitializeShareGroupStateRequestData.InitializeStateData()
.setTopicId(topicId2)
.setPartitions(List.of(
new InitializeShareGroupStateRequestData.PartitionData()
.setPartition(partition2)
.setStateEpoch(stateEpoch2)
.setStartOffset(startOffset2)
))
))
);
CompletableFuture<InitializeShareGroupStateResult> resultFuture = defaultStatePersister.initializeState(request);
InitializeShareGroupStateResult result = null;
try {
// adding long delay to allow for environment/GC issues
result = resultFuture.get(10L, TimeUnit.SECONDS);
} catch (Exception e) {
fail("Unexpected exception", e);
}
HashSet<PartitionData> resultMap = new HashSet<>();
result.topicsData().forEach(
topicData -> topicData.partitions().forEach(
partitionData -> resultMap.add((PartitionData) partitionData)
)
);
HashSet<PartitionData> expectedResultMap = new HashSet<>();
expectedResultMap.add((PartitionData) PartitionFactory.newPartitionErrorData(partition1, Errors.NONE.code(), null));
expectedResultMap.add((PartitionData) PartitionFactory.newPartitionErrorData(partition2, Errors.NONE.code(), null));
assertEquals(2, result.topicsData().size());
assertEquals(expectedResultMap, resultMap);
}
@Test @Test
public void testWriteStateResponseToResultPartialResults() { public void testWriteStateResponseToResultPartialResults() {
Map<Uuid, Map<Integer, CompletableFuture<WriteShareGroupStateResponse>>> futureMap = new HashMap<>(); Map<Uuid, Map<Integer, CompletableFuture<WriteShareGroupStateResponse>>> futureMap = new HashMap<>();
@ -1382,16 +1597,12 @@ class DefaultStatePersisterTest {
TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), 1, null); TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), 1, null);
// one entry has valid results // one entry has valid results
futureMap.computeIfAbsent(tp1.topicId(), k -> new HashMap<>()) futureMap.computeIfAbsent(tp1.topicId(), k -> new HashMap<>()).put(tp1.partition(), CompletableFuture.completedFuture(
.put(tp1.partition(), CompletableFuture.completedFuture( new DeleteShareGroupStateResponse(DeleteShareGroupStateResponse.toResponseData(
new DeleteShareGroupStateResponse( tp1.topicId(),
DeleteShareGroupStateResponse.toResponseData( tp1.partition()
tp1.topicId(), ))
tp1.partition() ));
)
)
)
);
// one entry has failed future // one entry has failed future
futureMap.computeIfAbsent(tp2.topicId(), k -> new HashMap<>()) futureMap.computeIfAbsent(tp2.topicId(), k -> new HashMap<>())
@ -1422,6 +1633,101 @@ class DefaultStatePersisterTest {
); );
} }
@Test
public void testInitializeStateResponseToResultPartialResults() {
Map<Uuid, Map<Integer, CompletableFuture<InitializeShareGroupStateResponse>>> futureMap = new HashMap<>();
TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), 1, null);
TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), 1, null);
// one entry has valid results
futureMap.computeIfAbsent(tp1.topicId(), k -> new HashMap<>()).put(tp1.partition(), CompletableFuture.completedFuture(
new InitializeShareGroupStateResponse(
InitializeShareGroupStateResponse.toResponseData(
tp1.topicId(),
tp1.partition()
))
));
// one entry has error
futureMap.computeIfAbsent(tp2.topicId(), k -> new HashMap<>()).put(tp2.partition(), CompletableFuture.completedFuture(
new InitializeShareGroupStateResponse(
InitializeShareGroupStateResponse.toErrorResponseData(
tp2.topicId(),
tp2.partition(),
Errors.UNKNOWN_TOPIC_OR_PARTITION,
"unknown tp"
))
));
PersisterStateManager psm = mock(PersisterStateManager.class);
DefaultStatePersister dsp = new DefaultStatePersister(psm);
InitializeShareGroupStateResult results = dsp.initializeResponsesToResult(futureMap);
// results should contain partial results
assertEquals(2, results.topicsData().size());
assertTrue(
results.topicsData().contains(
new TopicData<>(
tp1.topicId(),
List.of(PartitionFactory.newPartitionErrorData(tp1.partition(), Errors.NONE.code(), null))
)
)
);
assertTrue(
results.topicsData().contains(
new TopicData<>(
tp2.topicId(),
List.of(PartitionFactory.newPartitionErrorData(tp2.partition(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "unknown tp"))
)
)
);
}
@Test
public void testInitializeStateResponseToResultFailedFuture() {
Map<Uuid, Map<Integer, CompletableFuture<InitializeShareGroupStateResponse>>> futureMap = new HashMap<>();
TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), 1, null);
TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), 1, null);
// one entry has valid results
futureMap.computeIfAbsent(tp1.topicId(), k -> new HashMap<>()).put(tp1.partition(), CompletableFuture.completedFuture(
new InitializeShareGroupStateResponse(
InitializeShareGroupStateResponse.toResponseData(
tp1.topicId(),
tp1.partition()
))
));
// one entry has failed future
futureMap.computeIfAbsent(tp2.topicId(), k -> new HashMap<>())
.put(tp2.partition(), CompletableFuture.failedFuture(new Exception("scary stuff")));
PersisterStateManager psm = mock(PersisterStateManager.class);
DefaultStatePersister dsp = new DefaultStatePersister(psm);
InitializeShareGroupStateResult results = dsp.initializeResponsesToResult(futureMap);
// results should contain partial results
assertEquals(2, results.topicsData().size());
assertTrue(
results.topicsData().contains(
new TopicData<>(
tp1.topicId(),
List.of(PartitionFactory.newPartitionErrorData(tp1.partition(), Errors.NONE.code(), null))
)
)
);
assertTrue(
results.topicsData().contains(
new TopicData<>(
tp2.topicId(),
List.of(PartitionFactory.newPartitionErrorData(tp2.partition(), Errors.UNKNOWN_SERVER_ERROR.code(), "Error initializing state in share coordinator: java.lang.Exception: scary stuff"))
)
)
);
}
@Test @Test
public void testDefaultPersisterClose() { public void testDefaultPersisterClose() {
PersisterStateManager psm = mock(PersisterStateManager.class); PersisterStateManager psm = mock(PersisterStateManager.class);

View File

@ -25,6 +25,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData; import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.FindCoordinatorResponseData; import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData; import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData; import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
@ -34,6 +35,8 @@ import org.apache.kafka.common.requests.DeleteShareGroupStateRequest;
import org.apache.kafka.common.requests.DeleteShareGroupStateResponse; import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitializeShareGroupStateRequest;
import org.apache.kafka.common.requests.InitializeShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateRequest; import org.apache.kafka.common.requests.ReadShareGroupStateRequest;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse; import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateSummaryRequest; import org.apache.kafka.common.requests.ReadShareGroupStateSummaryRequest;
@ -76,6 +79,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@SuppressWarnings("JavaNCSS")
class PersisterStateManagerTest { class PersisterStateManagerTest {
private static final KafkaClient CLIENT = mock(KafkaClient.class); private static final KafkaClient CLIENT = mock(KafkaClient.class);
private static final MockTime MOCK_TIME = new MockTime(); private static final MockTime MOCK_TIME = new MockTime();
@ -3716,6 +3720,756 @@ class PersisterStateManagerTest {
TestUtils.waitForCondition(isBatchingSuccess::get, TestUtils.DEFAULT_MAX_WAIT_MS, 10L, () -> "unable to verify batching"); TestUtils.waitForCondition(isBatchingSuccess::get, TestUtils.DEFAULT_MAX_WAIT_MS, 10L, () -> "unable to verify batching");
} }
@Test
public void testInitializeStateRequestCoordinatorFoundSuccessfully() {
MockClient client = new MockClient(MOCK_TIME);
String groupId = "group1";
Uuid topicId = Uuid.randomUuid();
int partition = 10;
int stateEpoch = 1;
long startOffset = 10;
Node suppliedNode = new Node(0, HOST, PORT);
Node coordinatorNode = new Node(1, HOST, PORT);
String coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId, topicId, partition);
client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest
&& ((FindCoordinatorRequest) body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id()
&& ((FindCoordinatorRequest) body).data().coordinatorKeys().get(0).equals(coordinatorKey),
new FindCoordinatorResponse(
new FindCoordinatorResponseData()
.setCoordinators(Collections.singletonList(
new FindCoordinatorResponseData.Coordinator()
.setNodeId(1)
.setHost(HOST)
.setPort(PORT)
.setErrorCode(Errors.NONE.code())
))
),
suppliedNode
);
client.prepareResponseFrom(body -> {
InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest) body;
String requestGroupId = request.data().groupId();
Uuid requestTopicId = request.data().topics().get(0).topicId();
int requestPartition = request.data().topics().get(0).partitions().get(0).partition();
return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
}, new InitializeShareGroupStateResponse(
new InitializeShareGroupStateResponseData()
.setResults(List.of(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(List.of(
new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setErrorCode(Errors.NONE.code())
.setErrorMessage("")
))
))
), coordinatorNode);
ShareCoordinatorMetadataCacheHelper cacheHelper = getDefaultCacheHelper(suppliedNode);
PersisterStateManager stateManager = PersisterStateManagerBuilder.builder()
.withKafkaClient(client)
.withTimer(mockTimer)
.withCacheHelper(cacheHelper)
.build();
stateManager.start();
CompletableFuture<InitializeShareGroupStateResponse> future = new CompletableFuture<>();
PersisterStateManager.InitializeStateHandler handler = spy(stateManager.new InitializeStateHandler(
groupId,
topicId,
partition,
stateEpoch,
startOffset,
future,
REQUEST_BACKOFF_MS,
REQUEST_BACKOFF_MAX_MS,
MAX_RPC_RETRY_ATTEMPTS
));
stateManager.enqueue(handler);
CompletableFuture<InitializeShareGroupStateResponse> resultFuture = handler.result();
InitializeShareGroupStateResponse result = null;
try {
result = resultFuture.get();
} catch (Exception e) {
fail("Failed to get result from future", e);
}
InitializeShareGroupStateResponseData.PartitionResult partitionResult = result.data().results().get(0).partitions().get(0);
verify(handler, times(1)).findShareCoordinatorBuilder();
verify(handler, times(0)).requestBuilder();
// Verifying the coordinator node was populated correctly by the FIND_COORDINATOR request
assertEquals(coordinatorNode, handler.getCoordinatorNode());
// Verifying the result returned in correct
assertEquals(partition, partitionResult.partition());
assertEquals(Errors.NONE.code(), partitionResult.errorCode());
try {
// Stopping the state manager
stateManager.stop();
} catch (Exception e) {
fail("Failed to stop state manager", e);
}
}
@Test
public void testInitializeStateRequestRetryWithNotCoordinatorSuccessfulOnRetry() throws InterruptedException, ExecutionException {
MockClient client = new MockClient(MOCK_TIME);
String groupId = "group1";
Uuid topicId = Uuid.randomUuid();
int partition = 10;
int stateEpoch = 5;
int startOffset = 11;
Node suppliedNode = new Node(0, HOST, PORT);
Node coordinatorNode = new Node(1, HOST, PORT);
String coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId, topicId, partition);
client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest
&& ((FindCoordinatorRequest) body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id()
&& ((FindCoordinatorRequest) body).data().coordinatorKeys().get(0).equals(coordinatorKey),
new FindCoordinatorResponse(
new FindCoordinatorResponseData()
.setCoordinators(Collections.singletonList(
new FindCoordinatorResponseData.Coordinator()
.setErrorCode(Errors.NOT_COORDINATOR.code())
))
),
suppliedNode
);
client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest
&& ((FindCoordinatorRequest) body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id()
&& ((FindCoordinatorRequest) body).data().coordinatorKeys().get(0).equals(coordinatorKey),
new FindCoordinatorResponse(
new FindCoordinatorResponseData()
.setCoordinators(Collections.singletonList(
new FindCoordinatorResponseData.Coordinator()
.setNodeId(1)
.setHost(HOST)
.setPort(PORT)
.setErrorCode(Errors.NONE.code())
))
),
suppliedNode
);
client.prepareResponseFrom(body -> {
InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest) body;
String requestGroupId = request.data().groupId();
Uuid requestTopicId = request.data().topics().get(0).topicId();
int requestPartition = request.data().topics().get(0).partitions().get(0).partition();
return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
}, new InitializeShareGroupStateResponse(
new InitializeShareGroupStateResponseData()
.setResults(List.of(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(List.of(
new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setErrorCode(Errors.NONE.code())
.setErrorMessage("")
))
))
), coordinatorNode);
ShareCoordinatorMetadataCacheHelper cacheHelper = getDefaultCacheHelper(suppliedNode);
PersisterStateManager stateManager = PersisterStateManagerBuilder.builder()
.withKafkaClient(client)
.withTimer(mockTimer)
.withCacheHelper(cacheHelper)
.build();
stateManager.start();
CompletableFuture<InitializeShareGroupStateResponse> future = new CompletableFuture<>();
PersisterStateManager.InitializeStateHandler handler = spy(stateManager.new InitializeStateHandler(
groupId,
topicId,
partition,
stateEpoch,
startOffset,
future,
REQUEST_BACKOFF_MS,
REQUEST_BACKOFF_MAX_MS,
MAX_RPC_RETRY_ATTEMPTS
));
stateManager.enqueue(handler);
CompletableFuture<InitializeShareGroupStateResponse> resultFuture = handler.result();
TestUtils.waitForCondition(resultFuture::isDone, TestUtils.DEFAULT_MAX_WAIT_MS, 10L, () -> "Failed to get result from future");
InitializeShareGroupStateResponse result = resultFuture.get();
InitializeShareGroupStateResponseData.PartitionResult partitionResult = result.data().results().get(0).partitions().get(0);
verify(handler, times(2)).findShareCoordinatorBuilder();
verify(handler, times(0)).requestBuilder();
// Verifying the coordinator node was populated correctly by the FIND_COORDINATOR request
assertEquals(coordinatorNode, handler.getCoordinatorNode());
// Verifying the result returned is correct
assertEquals(partition, partitionResult.partition());
assertEquals(Errors.NONE.code(), partitionResult.errorCode());
try {
// Stopping the state manager
stateManager.stop();
} catch (Exception e) {
fail("Failed to stop state manager", e);
}
}
@Test
public void testInitializeStateRequestCoordinatorFoundOnRetry() {
MockClient client = new MockClient(MOCK_TIME);
String groupId = "group1";
Uuid topicId = Uuid.randomUuid();
int partition = 10;
int stateEpoch = 5;
long startOffset = 12;
Node suppliedNode = new Node(0, HOST, PORT);
Node coordinatorNode = new Node(1, HOST, PORT);
String coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId, topicId, partition);
client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest
&& ((FindCoordinatorRequest) body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id()
&& ((FindCoordinatorRequest) body).data().coordinatorKeys().get(0).equals(coordinatorKey),
new FindCoordinatorResponse(
new FindCoordinatorResponseData()
.setCoordinators(Collections.singletonList(
new FindCoordinatorResponseData.Coordinator()
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
))
),
suppliedNode
);
client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest
&& ((FindCoordinatorRequest) body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id()
&& ((FindCoordinatorRequest) body).data().coordinatorKeys().get(0).equals(coordinatorKey),
new FindCoordinatorResponse(
new FindCoordinatorResponseData()
.setCoordinators(Collections.singletonList(
new FindCoordinatorResponseData.Coordinator()
.setNodeId(1)
.setHost(HOST)
.setPort(PORT)
.setErrorCode(Errors.NONE.code())
))
),
suppliedNode
);
client.prepareResponseFrom(body -> {
InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest) body;
String requestGroupId = request.data().groupId();
Uuid requestTopicId = request.data().topics().get(0).topicId();
int requestPartition = request.data().topics().get(0).partitions().get(0).partition();
return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
}, new InitializeShareGroupStateResponse(
new InitializeShareGroupStateResponseData()
.setResults(List.of(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(List.of(
new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setErrorCode(Errors.NONE.code())
.setErrorMessage("")
))
))
), coordinatorNode);
ShareCoordinatorMetadataCacheHelper cacheHelper = getDefaultCacheHelper(suppliedNode);
PersisterStateManager stateManager = PersisterStateManagerBuilder.builder()
.withKafkaClient(client)
.withTimer(mockTimer)
.withCacheHelper(cacheHelper)
.build();
stateManager.start();
CompletableFuture<InitializeShareGroupStateResponse> future = new CompletableFuture<>();
PersisterStateManager.InitializeStateHandler handler = spy(stateManager.new InitializeStateHandler(
groupId,
topicId,
partition,
stateEpoch,
startOffset,
future,
REQUEST_BACKOFF_MS,
REQUEST_BACKOFF_MAX_MS,
MAX_RPC_RETRY_ATTEMPTS
));
stateManager.enqueue(handler);
CompletableFuture<InitializeShareGroupStateResponse> resultFuture = handler.result();
InitializeShareGroupStateResponse result = null;
try {
result = resultFuture.get();
} catch (Exception e) {
fail("Failed to get result from future", e);
}
InitializeShareGroupStateResponseData.PartitionResult partitionResult = result.data().results().get(0).partitions().get(0);
verify(handler, times(2)).findShareCoordinatorBuilder();
verify(handler, times(0)).requestBuilder();
// Verifying the coordinator node was populated correctly by the FIND_COORDINATOR request
assertEquals(coordinatorNode, handler.getCoordinatorNode());
// Verifying the result returned in correct
assertEquals(partition, partitionResult.partition());
assertEquals(Errors.NONE.code(), partitionResult.errorCode());
try {
// Stopping the state manager
stateManager.stop();
} catch (Exception e) {
fail("Failed to stop state manager", e);
}
}
@Test
public void testInitializeStateRequestWithCoordinatorNodeLookup() {
MockClient client = new MockClient(MOCK_TIME);
String groupId = "group1";
Uuid topicId = Uuid.randomUuid();
int partition = 10;
int stateEpoch = 5;
long startOffset = 10;
Node coordinatorNode = new Node(1, HOST, PORT);
client.prepareResponseFrom(body -> {
InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest) body;
String requestGroupId = request.data().groupId();
Uuid requestTopicId = request.data().topics().get(0).topicId();
int requestPartition = request.data().topics().get(0).partitions().get(0).partition();
return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
}, new InitializeShareGroupStateResponse(
new InitializeShareGroupStateResponseData()
.setResults(List.of(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(List.of(
new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setErrorCode(Errors.NONE.code())
.setErrorMessage("")
))
))
), coordinatorNode);
ShareCoordinatorMetadataCacheHelper cacheHelper = getCoordinatorCacheHelper(coordinatorNode);
PersisterStateManager stateManager = PersisterStateManagerBuilder.builder()
.withKafkaClient(client)
.withTimer(mockTimer)
.withCacheHelper(cacheHelper)
.build();
stateManager.start();
CompletableFuture<InitializeShareGroupStateResponse> future = new CompletableFuture<>();
PersisterStateManager.InitializeStateHandler handler = spy(stateManager.new InitializeStateHandler(
groupId,
topicId,
partition,
stateEpoch,
startOffset,
future,
REQUEST_BACKOFF_MS,
REQUEST_BACKOFF_MAX_MS,
MAX_RPC_RETRY_ATTEMPTS
));
stateManager.enqueue(handler);
CompletableFuture<InitializeShareGroupStateResponse> resultFuture = handler.result();
InitializeShareGroupStateResponse result = null;
try {
result = resultFuture.get();
} catch (Exception e) {
fail("Failed to get result from future", e);
}
InitializeShareGroupStateResponseData.PartitionResult partitionResult = result.data().results().get(0).partitions().get(0);
verify(handler, times(0)).findShareCoordinatorBuilder();
verify(handler, times(0)).requestBuilder();
verify(handler, times(1)).onComplete(any());
// Verifying the coordinator node was populated correctly by the FIND_COORDINATOR request
assertEquals(coordinatorNode, handler.getCoordinatorNode());
// Verifying the result returned in correct
assertEquals(partition, partitionResult.partition());
assertEquals(Errors.NONE.code(), partitionResult.errorCode());
try {
// Stopping the state manager
stateManager.stop();
} catch (Exception e) {
fail("Failed to stop state manager", e);
}
}
@Test
public void testInitializeStateRequestWithRetryAndCoordinatorNodeLookup() {
MockClient client = new MockClient(MOCK_TIME);
String groupId = "group1";
Uuid topicId = Uuid.randomUuid();
int partition = 10;
int stateEpoch = 5;
long startOffset = 10;
Node coordinatorNode = new Node(1, HOST, PORT);
client.prepareResponseFrom(body -> {
InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest) body;
String requestGroupId = request.data().groupId();
Uuid requestTopicId = request.data().topics().get(0).topicId();
int requestPartition = request.data().topics().get(0).partitions().get(0).partition();
return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
}, new InitializeShareGroupStateResponse(
new InitializeShareGroupStateResponseData()
.setResults(List.of(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(List.of(
new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
.setErrorMessage("")
))
))
), coordinatorNode);
client.prepareResponseFrom(body -> {
InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest) body;
String requestGroupId = request.data().groupId();
Uuid requestTopicId = request.data().topics().get(0).topicId();
int requestPartition = request.data().topics().get(0).partitions().get(0).partition();
return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
}, new InitializeShareGroupStateResponse(
new InitializeShareGroupStateResponseData()
.setResults(List.of(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(List.of(
new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setErrorCode(Errors.NONE.code())
.setErrorMessage("")
))
))
), coordinatorNode);
ShareCoordinatorMetadataCacheHelper cacheHelper = getCoordinatorCacheHelper(coordinatorNode);
PersisterStateManager stateManager = PersisterStateManagerBuilder.builder()
.withKafkaClient(client)
.withTimer(mockTimer)
.withCacheHelper(cacheHelper)
.build();
stateManager.start();
CompletableFuture<InitializeShareGroupStateResponse> future = new CompletableFuture<>();
PersisterStateManager.InitializeStateHandler handler = spy(stateManager.new InitializeStateHandler(
groupId,
topicId,
partition,
stateEpoch,
startOffset,
future,
REQUEST_BACKOFF_MS,
REQUEST_BACKOFF_MAX_MS,
MAX_RPC_RETRY_ATTEMPTS
));
stateManager.enqueue(handler);
CompletableFuture<InitializeShareGroupStateResponse> resultFuture = handler.result();
InitializeShareGroupStateResponse result = null;
try {
result = resultFuture.get();
} catch (Exception e) {
fail("Failed to get result from future", e);
}
InitializeShareGroupStateResponseData.PartitionResult partitionResult = result.data().results().get(0).partitions().get(0);
verify(handler, times(0)).findShareCoordinatorBuilder();
verify(handler, times(0)).requestBuilder();
verify(handler, times(2)).onComplete(any());
// Verifying the coordinator node was populated correctly by the FIND_COORDINATOR request
assertEquals(coordinatorNode, handler.getCoordinatorNode());
// Verifying the result returned in correct
assertEquals(partition, partitionResult.partition());
assertEquals(Errors.NONE.code(), partitionResult.errorCode());
try {
// Stopping the state manager
stateManager.stop();
} catch (Exception e) {
fail("Failed to stop state manager", e);
}
}
@Test
public void testInitializeStateRequestFailedMaxRetriesExhausted() {
MockClient client = new MockClient(MOCK_TIME);
String groupId = "group1";
Uuid topicId = Uuid.randomUuid();
int partition = 10;
int stateEpoch = 5;
long startOffset = 10;
Node coordinatorNode = new Node(1, HOST, PORT);
client.prepareResponseFrom(body -> {
InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest) body;
String requestGroupId = request.data().groupId();
Uuid requestTopicId = request.data().topics().get(0).topicId();
int requestPartition = request.data().topics().get(0).partitions().get(0).partition();
return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
}, new InitializeShareGroupStateResponse(
new InitializeShareGroupStateResponseData()
.setResults(List.of(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(List.of(
new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
.setErrorMessage("")
))
))
), coordinatorNode);
client.prepareResponseFrom(body -> {
InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest) body;
String requestGroupId = request.data().groupId();
Uuid requestTopicId = request.data().topics().get(0).topicId();
int requestPartition = request.data().topics().get(0).partitions().get(0).partition();
return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
}, new InitializeShareGroupStateResponse(
new InitializeShareGroupStateResponseData()
.setResults(List.of(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(List.of(
new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
.setErrorMessage("")
))
))
), coordinatorNode);
client.prepareResponseFrom(body -> {
InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest) body;
String requestGroupId = request.data().groupId();
Uuid requestTopicId = request.data().topics().get(0).topicId();
int requestPartition = request.data().topics().get(0).partitions().get(0).partition();
return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
}, new InitializeShareGroupStateResponse(
new InitializeShareGroupStateResponseData()
.setResults(List.of(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(List.of(
new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setErrorCode(Errors.NONE.code())
.setErrorMessage("")
))
))
), coordinatorNode);
ShareCoordinatorMetadataCacheHelper cacheHelper = getCoordinatorCacheHelper(coordinatorNode);
PersisterStateManager stateManager = PersisterStateManagerBuilder.builder()
.withKafkaClient(client)
.withTimer(mockTimer)
.withCacheHelper(cacheHelper)
.build();
stateManager.start();
CompletableFuture<InitializeShareGroupStateResponse> future = new CompletableFuture<>();
PersisterStateManager.InitializeStateHandler handler = spy(stateManager.new InitializeStateHandler(
groupId,
topicId,
partition,
stateEpoch,
startOffset,
future,
REQUEST_BACKOFF_MS,
REQUEST_BACKOFF_MAX_MS,
2
));
stateManager.enqueue(handler);
CompletableFuture<InitializeShareGroupStateResponse> resultFuture = handler.result();
InitializeShareGroupStateResponse result = null;
try {
result = resultFuture.get();
} catch (Exception e) {
fail("Failed to get result from future", e);
}
InitializeShareGroupStateResponseData.PartitionResult partitionResult = result.data().results().get(0).partitions().get(0);
verify(handler, times(0)).findShareCoordinatorBuilder();
verify(handler, times(0)).requestBuilder();
verify(handler, times(2)).onComplete(any());
// Verifying the coordinator node was populated correctly by the FIND_COORDINATOR request
assertEquals(coordinatorNode, handler.getCoordinatorNode());
// Verifying the result returned in correct
assertEquals(partition, partitionResult.partition());
assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS.code(), partitionResult.errorCode());
try {
// Stopping the state manager
stateManager.stop();
} catch (Exception e) {
fail("Failed to stop state manager", e);
}
}
@Test
public void testInitializeStateRequestBatchingWithCoordinatorNodeLookup() throws Exception {
MockClient client = new MockClient(MOCK_TIME);
String groupId = "group1";
Uuid topicId = Uuid.randomUuid();
int partition = 10;
int stateEpoch = 5;
long startOffset = 10;
Node coordinatorNode = new Node(1, HOST, PORT);
client.prepareResponseFrom(body -> {
InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest) body;
String requestGroupId = request.data().groupId();
Uuid requestTopicId = request.data().topics().get(0).topicId();
int requestPartition = request.data().topics().get(0).partitions().get(0).partition();
return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
}, new InitializeShareGroupStateResponse(
new InitializeShareGroupStateResponseData()
.setResults(List.of(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(List.of(
new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setErrorCode(Errors.NONE.code())
.setErrorMessage("")
))
))
), coordinatorNode);
ShareCoordinatorMetadataCacheHelper cacheHelper = getCoordinatorCacheHelper(coordinatorNode);
PersisterStateManager stateManager = PersisterStateManagerBuilder.builder()
.withKafkaClient(client)
.withTimer(mockTimer)
.withCacheHelper(cacheHelper)
.build();
AtomicBoolean isBatchingSuccess = new AtomicBoolean(false);
stateManager.setGenerateCallback(() -> {
Map<PersisterStateManager.RPCType, Map<String, List<PersisterStateManager.PersisterStateManagerHandler>>> handlersPerType = stateManager.nodeRPCMap().get(coordinatorNode);
if (handlersPerType != null && handlersPerType.containsKey(PersisterStateManager.RPCType.INITIALIZE) && handlersPerType.get(PersisterStateManager.RPCType.INITIALIZE).containsKey(groupId)) {
if (handlersPerType.get(PersisterStateManager.RPCType.INITIALIZE).get(groupId).size() > 2)
isBatchingSuccess.set(true);
}
});
stateManager.start();
CompletableFuture<InitializeShareGroupStateResponse> future = new CompletableFuture<>();
List<PersisterStateManager.InitializeStateHandler> handlers = new ArrayList<>();
for (int i = 0; i < 5; i++) {
PersisterStateManager.InitializeStateHandler handler = spy(stateManager.new InitializeStateHandler(
groupId,
topicId,
partition,
stateEpoch,
startOffset,
future,
REQUEST_BACKOFF_MS,
REQUEST_BACKOFF_MAX_MS,
MAX_RPC_RETRY_ATTEMPTS
));
handlers.add(handler);
stateManager.enqueue(handler);
}
CompletableFuture.allOf(handlers.stream()
.map(PersisterStateManager.InitializeStateHandler::result).toArray(CompletableFuture[]::new)).get();
TestUtils.waitForCondition(isBatchingSuccess::get, TestUtils.DEFAULT_MAX_WAIT_MS, 10L, () -> "unable to verify batching");
}
@Test @Test
public void testPersisterStateManagerClose() { public void testPersisterStateManagerClose() {
KafkaClient client = mock(KafkaClient.class); KafkaClient client = mock(KafkaClient.class);