KAFKA-18827: Initialize share state, share coordinator impl. [1/N] (#18968)

In this PR, we have added the share coordinator and KafkaApis side impl
of the intialize share group state RPC.
ref:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka#KIP932:QueuesforKafka-InitializeShareGroupStateAPI

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Sushant Mahajan 2025-02-22 21:42:08 +05:30 committed by GitHub
parent c6335c2ae8
commit 4f28973bd1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 1007 additions and 160 deletions

View File

@ -34,7 +34,7 @@ public class InitializeShareGroupStateRequest extends AbstractRequest {
private final InitializeShareGroupStateRequestData data;
public Builder(InitializeShareGroupStateRequestData data) {
this(data, false);
this(data, true);
}
public Builder(InitializeShareGroupStateRequestData data, boolean enableUnstableLastVersion) {
@ -64,15 +64,15 @@ public class InitializeShareGroupStateRequest extends AbstractRequest {
public InitializeShareGroupStateResponse getErrorResponse(int throttleTimeMs, Throwable e) {
List<InitializeShareGroupStateResponseData.InitializeStateResult> results = new ArrayList<>();
data.topics().forEach(
topicResult -> results.add(new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicResult.topicId())
.setPartitions(topicResult.partitions().stream()
.map(partitionData -> new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partitionData.partition())
.setErrorCode(Errors.forException(e).code()))
.collect(Collectors.toList()))));
topicResult -> results.add(new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicResult.topicId())
.setPartitions(topicResult.partitions().stream()
.map(partitionData -> new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partitionData.partition())
.setErrorCode(Errors.forException(e).code()))
.collect(Collectors.toList()))));
return new InitializeShareGroupStateResponse(new InitializeShareGroupStateResponseData()
.setResults(results));
.setResults(results));
}
@Override
@ -82,8 +82,8 @@ public class InitializeShareGroupStateRequest extends AbstractRequest {
public static InitializeShareGroupStateRequest parse(ByteBuffer buffer, short version) {
return new InitializeShareGroupStateRequest(
new InitializeShareGroupStateRequestData(new ByteBufferAccessor(buffer), version),
version
new InitializeShareGroupStateRequestData(new ByteBufferAccessor(buffer), version),
version
);
}
}

View File

@ -17,13 +17,17 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class InitializeShareGroupStateResponse extends AbstractResponse {
@ -43,9 +47,9 @@ public class InitializeShareGroupStateResponse extends AbstractResponse {
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> counts = new HashMap<>();
data.results().forEach(
result -> result.partitions().forEach(
partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode()))
)
result -> result.partitions().forEach(
partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode()))
)
);
return counts;
}
@ -62,7 +66,65 @@ public class InitializeShareGroupStateResponse extends AbstractResponse {
public static InitializeShareGroupStateResponse parse(ByteBuffer buffer, short version) {
return new InitializeShareGroupStateResponse(
new InitializeShareGroupStateResponseData(new ByteBufferAccessor(buffer), version)
new InitializeShareGroupStateResponseData(new ByteBufferAccessor(buffer), version)
);
}
public static InitializeShareGroupStateResponseData toGlobalErrorResponse(InitializeShareGroupStateRequestData request, Errors error) {
List<InitializeShareGroupStateResponseData.InitializeStateResult> initStateResults = new ArrayList<>();
request.topics().forEach(topicData -> {
List<InitializeShareGroupStateResponseData.PartitionResult> partitionResults = new ArrayList<>();
topicData.partitions().forEach(partitionData -> partitionResults.add(
toErrorResponsePartitionResult(partitionData.partition(), error, error.message()))
);
initStateResults.add(toResponseInitializeStateResult(topicData.topicId(), partitionResults));
});
return new InitializeShareGroupStateResponseData().setResults(initStateResults);
}
public static InitializeShareGroupStateResponseData.PartitionResult toErrorResponsePartitionResult(
int partitionId,
Errors error,
String errorMessage
) {
return new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)
.setErrorCode(error.code())
.setErrorMessage(errorMessage);
}
public static InitializeShareGroupStateResponseData.InitializeStateResult toResponseInitializeStateResult(
Uuid topicId,
List<InitializeShareGroupStateResponseData.PartitionResult> partitionResults
) {
return new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(partitionResults);
}
public static InitializeShareGroupStateResponseData toErrorResponseData(Uuid topicId, int partitionId, Errors error, String errorMessage) {
return new InitializeShareGroupStateResponseData().setResults(List.of(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)
.setErrorCode(error.code())
.setErrorMessage(errorMessage)))
));
}
public static InitializeShareGroupStateResponseData.PartitionResult toResponsePartitionResult(int partitionId) {
return new InitializeShareGroupStateResponseData.PartitionResult().setPartition(partitionId);
}
public static InitializeShareGroupStateResponseData toResponseData(Uuid topicId, int partitionId) {
return new InitializeShareGroupStateResponseData().setResults(List.of(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(List.of(
new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)
))
));
}
}

View File

@ -3100,11 +3100,33 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
def handleInitializeShareGroupStateRequest(request: RequestChannel.Request): Unit = {
def handleInitializeShareGroupStateRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val initializeShareGroupStateRequest = request.body[InitializeShareGroupStateRequest]
// TODO: Implement the InitializeShareGroupStateRequest handling
requestHelper.sendMaybeThrottle(request, initializeShareGroupStateRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
if (!authorizeClusterOperation(request, CLUSTER_ACTION)) {
requestHelper.sendMaybeThrottle(request, new InitializeShareGroupStateResponse(
InitializeShareGroupStateResponse.toGlobalErrorResponse(
initializeShareGroupStateRequest.data(),
Errors.CLUSTER_AUTHORIZATION_FAILED
)))
return CompletableFuture.completedFuture[Unit](())
}
shareCoordinator match {
case None => requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
initializeShareGroupStateRequest.getErrorResponse(requestThrottleMs,
new ApiException("Share coordinator is not enabled.")))
CompletableFuture.completedFuture[Unit](())
case Some(coordinator) => coordinator.initializeState(request.context, initializeShareGroupStateRequest.data)
.handle[Unit] { (response, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request, initializeShareGroupStateRequest.getErrorResponse(exception))
} else {
requestHelper.sendMaybeThrottle(request, new InitializeShareGroupStateResponse(response))
}
}
}
}
def handleReadShareGroupStateRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {

View File

@ -10879,6 +10879,98 @@ class KafkaApisTest extends Logging {
})
}
@Test
def testInitializeShareGroupStateSuccess(): Unit = {
val topicId = Uuid.randomUuid();
val initRequestData = new InitializeShareGroupStateRequestData()
.setGroupId("group1")
.setTopics(List(
new InitializeShareGroupStateRequestData.InitializeStateData()
.setTopicId(topicId)
.setPartitions(List(
new InitializeShareGroupStateRequestData.PartitionData()
.setPartition(1)
.setStateEpoch(0)
).asJava)
).asJava)
val initStateResultData: util.List[InitializeShareGroupStateResponseData.InitializeStateResult] = List(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(List(
new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(1)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null)
).asJava)
).asJava
val config = Map(
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true",
)
val response = getInitializeShareGroupResponse(
initRequestData,
config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = true,
null,
initStateResultData
)
assertNotNull(response.data)
assertEquals(1, response.data.results.size)
}
@Test
def testInitializeShareGroupStateAuthorizationFailed(): Unit = {
val topicId = Uuid.randomUuid();
val initRequestData = new InitializeShareGroupStateRequestData()
.setGroupId("group1")
.setTopics(List(
new InitializeShareGroupStateRequestData.InitializeStateData()
.setTopicId(topicId)
.setPartitions(List(
new InitializeShareGroupStateRequestData.PartitionData()
.setPartition(1)
.setStateEpoch(0)
).asJava)
).asJava)
val initStateResultData: util.List[InitializeShareGroupStateResponseData.InitializeStateResult] = List(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(List(
new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(1)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null)
).asJava)
).asJava
val authorizer: Authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
.thenReturn(Seq(AuthorizationResult.DENIED).asJava, Seq(AuthorizationResult.ALLOWED).asJava)
val config = Map(
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true",
)
val response = getInitializeShareGroupResponse(
initRequestData,
config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = false,
authorizer,
initStateResultData
)
assertNotNull(response.data)
assertEquals(1, response.data.results.size)
response.data.results.forEach(deleteResult => {
assertEquals(1, deleteResult.partitions.size)
assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED.code(), deleteResult.partitions.get(0).errorCode())
})
}
def getShareGroupDescribeResponse(groupIds: util.List[String], configOverrides: Map[String, String] = Map.empty,
verifyNoErr: Boolean = true, authorizer: Authorizer = null,
describedGroups: util.List[ShareGroupDescribeResponseData.DescribedGroup]): ShareGroupDescribeResponse = {
@ -11024,4 +11116,33 @@ class KafkaApisTest extends Logging {
}
response
}
def getInitializeShareGroupResponse(requestData: InitializeShareGroupStateRequestData, configOverrides: Map[String, String] = Map.empty,
verifyNoErr: Boolean = true, authorizer: Authorizer = null,
initStateResult: util.List[InitializeShareGroupStateResponseData.InitializeStateResult]): InitializeShareGroupStateResponse = {
val requestChannelRequest = buildRequest(new InitializeShareGroupStateRequest.Builder(requestData, true).build())
val future = new CompletableFuture[InitializeShareGroupStateResponseData]()
when(shareCoordinator.initializeState(
any[RequestContext],
any[InitializeShareGroupStateRequestData]
)).thenReturn(future)
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(
overrideProperties = configOverrides,
authorizer = Option(authorizer),
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching())
future.complete(new InitializeShareGroupStateResponseData()
.setResults(initStateResult))
val response = verifyNoThrottling[InitializeShareGroupStateResponse](requestChannelRequest)
if (verifyNoErr) {
val expectedInitShareGroupStateResponseData = new InitializeShareGroupStateResponseData()
.setResults(initStateResult)
assertEquals(expectedInitShareGroupStateResponseData, response.data)
}
response
}
}

View File

@ -19,6 +19,8 @@ package org.apache.kafka.coordinator.share;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@ -96,6 +98,14 @@ public interface ShareCoordinator {
*/
CompletableFuture<DeleteShareGroupStateResponseData> deleteState(RequestContext context, DeleteShareGroupStateRequestData request);
/**
* Handle initialize share group state call
* @param context - represents the incoming initialize share group request context
* @param request - actual RPC request object
* @return completable future representing initialize share group RPC response data
*/
CompletableFuture<InitializeShareGroupStateResponseData> initializeState(RequestContext context, InitializeShareGroupStateRequestData request);
/**
* Called when new coordinator is elected
* @param partitionIndex - The partition index (internal topic)

View File

@ -24,6 +24,8 @@ import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@ -32,6 +34,7 @@ import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
import org.apache.kafka.common.requests.InitializeShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
import org.apache.kafka.common.requests.RequestContext;
@ -70,10 +73,10 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntSupplier;
import java.util.stream.Collectors;
import static org.apache.kafka.coordinator.common.runtime.CoordinatorOperationExceptionHelper.handleOperationException;
@SuppressWarnings("ClassDataAbstractionCoupling")
public class ShareCoordinatorService implements ShareCoordinator {
private final ShareCoordinatorConfig config;
private final Logger log;
@ -682,11 +685,11 @@ public class ShareCoordinatorService implements ShareCoordinator {
(topicId, topicEntry) -> {
List<ReadShareGroupStateSummaryResponseData.PartitionResult> partitionResults = new ArrayList<>(topicEntry.size());
topicEntry.forEach(
(partitionId, responseFuture) -> {
(partitionId, responseFut) -> {
// ResponseFut would already be completed by now since we have used
// CompletableFuture::allOf to create a combined future from the future map.
partitionResults.add(
responseFuture.getNow(null).results().get(0).partitions().get(0)
responseFut.getNow(null).results().get(0).partitions().get(0)
);
}
);
@ -792,11 +795,11 @@ public class ShareCoordinatorService implements ShareCoordinator {
(topicId, topicEntry) -> {
List<DeleteShareGroupStateResponseData.PartitionResult> partitionResults = new ArrayList<>(topicEntry.size());
topicEntry.forEach(
(partitionId, responseFuture) -> {
(partitionId, responseFut) -> {
// ResponseFut would already be completed by now since we have used
// CompletableFuture::allOf to create a combined future from the future map.
partitionResults.add(
responseFuture.getNow(null).results().get(0).partitions().get(0)
responseFut.getNow(null).results().get(0).partitions().get(0)
);
}
);
@ -808,6 +811,106 @@ public class ShareCoordinatorService implements ShareCoordinator {
});
}
@Override
public CompletableFuture<InitializeShareGroupStateResponseData> initializeState(RequestContext context, InitializeShareGroupStateRequestData request) {
// Send an empty response if the coordinator is not active.
if (!isActive.get()) {
return CompletableFuture.completedFuture(
generateErrorInitStateResponse(
request,
Errors.COORDINATOR_NOT_AVAILABLE,
"Share coordinator is not available."
)
);
}
String groupId = request.groupId();
// Send an empty response if groupId is invalid.
if (isGroupIdEmpty(groupId)) {
log.error("Group id must be specified and non-empty: {}", request);
return CompletableFuture.completedFuture(
new InitializeShareGroupStateResponseData()
);
}
// Send an empty response if topic data is empty.
if (isEmpty(request.topics())) {
log.error("Topic Data is empty: {}", request);
return CompletableFuture.completedFuture(
new InitializeShareGroupStateResponseData()
);
}
// A map to store the futures for each topicId and partition.
Map<Uuid, Map<Integer, CompletableFuture<InitializeShareGroupStateResponseData>>> futureMap = new HashMap<>();
// The request received here could have multiple keys of structure group:topic:partition. However,
// the initializeState method in ShareCoordinatorShard expects a single key in the request. Hence, we will
// be looping over the keys below and constructing new InitializeShareGroupStateRequestData objects to pass
// onto the shard method.
for (InitializeShareGroupStateRequestData.InitializeStateData topicData : request.topics()) {
Uuid topicId = topicData.topicId();
for (InitializeShareGroupStateRequestData.PartitionData partitionData : topicData.partitions()) {
SharePartitionKey coordinatorKey = SharePartitionKey.getInstance(request.groupId(), topicId, partitionData.partition());
InitializeShareGroupStateRequestData requestForCurrentPartition = new InitializeShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(List.of(new InitializeShareGroupStateRequestData.InitializeStateData()
.setTopicId(topicId)
.setPartitions(List.of(partitionData))));
CompletableFuture<InitializeShareGroupStateResponseData> initializeFuture = runtime.scheduleWriteOperation(
"initialize-share-group-state",
topicPartitionFor(coordinatorKey),
Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
coordinator -> coordinator.initializeState(requestForCurrentPartition)
).exceptionally(initializeException ->
handleOperationException(
"initialize-share-group-state",
request,
initializeException,
(error, message) -> InitializeShareGroupStateResponse.toErrorResponseData(
topicData.topicId(),
partitionData.partition(),
error,
"Unable to initialize share group state: " + initializeException.getMessage()
),
log
));
futureMap.computeIfAbsent(topicId, k -> new HashMap<>())
.put(partitionData.partition(), initializeFuture);
}
}
// Combine all futures into a single CompletableFuture<Void>.
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(futureMap.values().stream()
.flatMap(map -> map.values().stream()).toArray(CompletableFuture[]::new));
// Transform the combined CompletableFuture<Void> into CompletableFuture<InitializeShareGroupStateResponseData>.
return combinedFuture.thenApply(v -> {
List<InitializeShareGroupStateResponseData.InitializeStateResult> initializeStateResult = new ArrayList<>(futureMap.size());
futureMap.forEach(
(topicId, topicEntry) -> {
List<InitializeShareGroupStateResponseData.PartitionResult> partitionResults = new ArrayList<>(topicEntry.size());
topicEntry.forEach(
(partitionId, responseFut) -> {
// ResponseFut would already be completed by now since we have used
// CompletableFuture::allOf to create a combined future from the future map.
partitionResults.add(
responseFut.getNow(null).results().get(0).partitions().get(0)
);
}
);
initializeStateResult.add(InitializeShareGroupStateResponse.toResponseInitializeStateResult(topicId, partitionResults));
}
);
return new InitializeShareGroupStateResponseData()
.setResults(initializeStateResult);
});
}
private ReadShareGroupStateResponseData generateErrorReadStateResponse(
ReadShareGroupStateRequestData request,
Errors error,
@ -820,9 +923,9 @@ public class ShareCoordinatorService implements ShareCoordinator {
resultData.setPartitions(topicData.partitions().stream()
.map(partitionData -> ReadShareGroupStateResponse.toErrorResponsePartitionResult(
partitionData.partition(), error, errorMessage
)).collect(Collectors.toList()));
)).toList());
return resultData;
}).collect(Collectors.toList()));
}).toList());
}
private ReadShareGroupStateSummaryResponseData generateErrorReadStateSummaryResponse(
@ -837,9 +940,9 @@ public class ShareCoordinatorService implements ShareCoordinator {
resultData.setPartitions(topicData.partitions().stream()
.map(partitionData -> ReadShareGroupStateSummaryResponse.toErrorResponsePartitionResult(
partitionData.partition(), error, errorMessage
)).collect(Collectors.toList()));
)).toList());
return resultData;
}).collect(Collectors.toList()));
}).toList());
}
private WriteShareGroupStateResponseData generateErrorWriteStateResponse(
@ -855,9 +958,9 @@ public class ShareCoordinatorService implements ShareCoordinator {
resultData.setPartitions(topicData.partitions().stream()
.map(partitionData -> WriteShareGroupStateResponse.toErrorResponsePartitionResult(
partitionData.partition(), error, errorMessage
)).collect(Collectors.toList()));
)).toList());
return resultData;
}).collect(Collectors.toList()));
}).toList());
}
private DeleteShareGroupStateResponseData generateErrorDeleteStateResponse(
@ -872,9 +975,26 @@ public class ShareCoordinatorService implements ShareCoordinator {
resultData.setPartitions(topicData.partitions().stream()
.map(partitionData -> DeleteShareGroupStateResponse.toErrorResponsePartitionResult(
partitionData.partition(), error, errorMessage
)).collect(Collectors.toList()));
)).toList());
return resultData;
}).collect(Collectors.toList()));
}).toList());
}
private InitializeShareGroupStateResponseData generateErrorInitStateResponse(
InitializeShareGroupStateRequestData request,
Errors error,
String errorMessage
) {
return new InitializeShareGroupStateResponseData().setResults(request.topics().stream()
.map(topicData -> {
InitializeShareGroupStateResponseData.InitializeStateResult resultData = new InitializeShareGroupStateResponseData.InitializeStateResult();
resultData.setTopicId(topicData.topicId());
resultData.setPartitions(topicData.partitions().stream()
.map(partitionData -> InitializeShareGroupStateResponse.toErrorResponsePartitionResult(
partitionData.partition(), error, errorMessage
)).toList());
return resultData;
}).toList());
}
private static boolean isGroupIdEmpty(String groupId) {

View File

@ -22,6 +22,8 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@ -31,6 +33,7 @@ import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
import org.apache.kafka.common.requests.InitializeShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
import org.apache.kafka.common.requests.TransactionResult;
@ -66,7 +69,6 @@ import org.slf4j.Logger;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord> {
private final Logger log;
@ -317,16 +319,12 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
CoordinatorRecord record = generateShareStateRecord(partitionData, key);
// build successful response if record is correctly created
WriteShareGroupStateResponseData responseData = new WriteShareGroupStateResponseData()
.setResults(
Collections.singletonList(
WriteShareGroupStateResponse.toResponseWriteStateResult(key.topicId(),
Collections.singletonList(
WriteShareGroupStateResponse.toResponsePartitionResult(
key.partition()
))
))
);
WriteShareGroupStateResponseData responseData = new WriteShareGroupStateResponseData().setResults(
List.of(WriteShareGroupStateResponse.toResponseWriteStateResult(key.topicId(),
List.of(WriteShareGroupStateResponse.toResponsePartitionResult(
key.partition()))
))
);
return new CoordinatorResult<>(Collections.singletonList(record), responseData);
}
@ -346,7 +344,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
// Only one key will be there in the request by design.
Optional<ReadShareGroupStateResponseData> error = maybeGetReadStateError(request);
if (error.isPresent()) {
return new CoordinatorResult<>(Collections.emptyList(), error.get());
return new CoordinatorResult<>(List.of(), error.get());
}
ReadShareGroupStateRequestData.ReadStateData topicData = request.topics().get(0);
@ -366,7 +364,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
partitionId,
PartitionFactory.UNINITIALIZED_START_OFFSET,
PartitionFactory.DEFAULT_STATE_EPOCH,
Collections.emptyList()
List.of()
);
} else {
// Leader epoch update might be needed
@ -379,7 +377,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
.setLastOffset(stateBatch.lastOffset())
.setDeliveryState(stateBatch.deliveryState())
.setDeliveryCount(stateBatch.deliveryCount())
).collect(Collectors.toList()) : Collections.emptyList();
).toList() : List.of();
responseData = ReadShareGroupStateResponse.toResponseData(
topicId,
@ -393,7 +391,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
// Optimization in case leaderEpoch update is not required.
if (leaderEpoch == -1 ||
(leaderEpochMap.get(key) != null && leaderEpochMap.get(key) == leaderEpoch)) {
return new CoordinatorResult<>(Collections.emptyList(), responseData);
return new CoordinatorResult<>(List.of(), responseData);
}
// It is OK to info log this since this reaching this codepoint should be quite infrequent.
@ -406,7 +404,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
WriteShareGroupStateRequestData.PartitionData writePartitionData = new WriteShareGroupStateRequestData.PartitionData()
.setPartition(partitionId)
.setLeaderEpoch(leaderEpoch)
.setStateBatches(Collections.emptyList())
.setStateBatches(List.of())
.setStartOffset(responseData.results().get(0).partitions().get(0).startOffset())
.setStateEpoch(responseData.results().get(0).partitions().get(0).stateEpoch());
@ -471,7 +469,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
}
}
return new CoordinatorResult<>(Collections.emptyList(), responseData);
return new CoordinatorResult<>(List.of(), responseData);
}
/**
@ -482,7 +480,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
*/
public CoordinatorResult<Optional<Long>, CoordinatorRecord> lastRedundantOffset() {
return new CoordinatorResult<>(
Collections.emptyList(),
List.of(),
this.offsetsManager.lastRedundantOffset()
);
}
@ -494,7 +492,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
* the request data which covers only key i.e. group1:topic1:partition1. The implementation
* below was done keeping this in mind.
*
* @param request - ReadShareGroupStateSummaryRequestData for a single key
* @param request - DeleteShareGroupStateRequestData for a single key
* @return CoordinatorResult(records, response)
*/
@ -514,22 +512,53 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
CoordinatorRecord record = generateTombstoneRecord(key);
// build successful response if record is correctly created
DeleteShareGroupStateResponseData responseData = new DeleteShareGroupStateResponseData()
.setResults(
List.of(
DeleteShareGroupStateResponse.toResponseDeleteStateResult(key.topicId(),
List.of(
DeleteShareGroupStateResponse.toResponsePartitionResult(
key.partition()
)
)
)
)
);
DeleteShareGroupStateResponseData responseData = new DeleteShareGroupStateResponseData().setResults(
List.of(DeleteShareGroupStateResponse.toResponseDeleteStateResult(key.topicId(),
List.of(DeleteShareGroupStateResponse.toResponsePartitionResult(
key.partition()))
))
);
return new CoordinatorResult<>(Collections.singletonList(record), responseData);
}
/**
* This method writes a share snapshot records corresponding to the requested topic partitions.
* <p>
* This method as called by the ShareCoordinatorService will be provided with
* the request data which covers only key i.e. group1:topic1:partition1. The implementation
* below was done keeping this in mind.
*
* @param request - InitializeShareGroupStateRequestData for a single key
* @return CoordinatorResult(records, response)
*/
public CoordinatorResult<InitializeShareGroupStateResponseData, CoordinatorRecord> initializeState(
InitializeShareGroupStateRequestData request
) {
// Records to write (with both key and value of snapshot type), response to caller
// only one key will be there in the request by design.
Optional<CoordinatorResult<InitializeShareGroupStateResponseData, CoordinatorRecord>> error = maybeGetInitializeStateError(request);
if (error.isPresent()) {
return error.get();
}
InitializeShareGroupStateRequestData.InitializeStateData topicData = request.topics().get(0);
InitializeShareGroupStateRequestData.PartitionData partitionData = topicData.partitions().get(0);
SharePartitionKey key = SharePartitionKey.getInstance(request.groupId(), topicData.topicId(), partitionData.partition());
CoordinatorRecord record = generateInitializeStateRecord(partitionData, key);
// build successful response if record is correctly created
InitializeShareGroupStateResponseData responseData = new InitializeShareGroupStateResponseData().setResults(
List.of(InitializeShareGroupStateResponse.toResponseInitializeStateResult(key.topicId(),
List.of(InitializeShareGroupStateResponse.toResponsePartitionResult(
key.partition()))
))
);
return new CoordinatorResult<>(List.of(record), responseData);
}
/**
* Util method to generate a ShareSnapshot or ShareUpdate type record for a key, based on various conditions.
* <p>
@ -555,7 +584,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
.setStartOffset(partitionData.startOffset())
.setLeaderEpoch(partitionData.leaderEpoch())
.setStateEpoch(partitionData.stateEpoch())
.setStateBatches(mergeBatches(Collections.emptyList(), partitionData))
.setStateBatches(mergeBatches(List.of(), partitionData))
.build());
} else if (snapshotUpdateCount.getOrDefault(key, 0) >= config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) {
ShareGroupOffset currentState = shareStateMap.get(key); // shareStateMap will have the entry as containsKey is true
@ -587,7 +616,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
.setSnapshotEpoch(currentState.snapshotEpoch()) // Use same snapshotEpoch as last share snapshot.
.setStartOffset(partitionData.startOffset())
.setLeaderEpoch(partitionData.leaderEpoch())
.setStateBatches(mergeBatches(Collections.emptyList(), partitionData))
.setStateBatches(mergeBatches(List.of(), partitionData))
.build());
}
}
@ -600,6 +629,24 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
);
}
private CoordinatorRecord generateInitializeStateRecord(
InitializeShareGroupStateRequestData.PartitionData partitionData,
SharePartitionKey key
) {
// We need to create a new share snapshot here, with
// appropriate state information. We will not be merging
// state here with previous snapshots as init state implies
// fresh start.
int snapshotEpoch = shareStateMap.containsKey(key) ? shareStateMap.get(key).snapshotEpoch() + 1 : 0;
return ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
key.groupId(),
key.topicId(),
key.partition(),
ShareGroupOffset.fromRequest(partitionData, snapshotEpoch)
);
}
private List<PersisterStateBatch> mergeBatches(
List<PersisterStateBatch> soFar,
WriteShareGroupStateRequestData.PartitionData partitionData) {
@ -609,15 +656,13 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
private List<PersisterStateBatch> mergeBatches(
List<PersisterStateBatch> soFar,
WriteShareGroupStateRequestData.PartitionData partitionData,
long startOffset) {
return new PersisterStateBatchCombiner(
soFar,
partitionData.stateBatches().stream()
.map(PersisterStateBatch::from)
.collect(Collectors.toList()),
long startOffset
) {
return new PersisterStateBatchCombiner(soFar, partitionData.stateBatches().stream()
.map(PersisterStateBatch::from)
.toList(),
startOffset
)
.combineStateBatches();
).combineStateBatches();
}
private Optional<CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord>> maybeGetWriteStateError(
@ -631,30 +676,30 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
int partitionId = partitionData.partition();
if (topicId == null) {
return Optional.of(getWriteErrorResponse(Errors.INVALID_REQUEST, NULL_TOPIC_ID, null, partitionId));
return Optional.of(getWriteErrorCoordinatorResult(Errors.INVALID_REQUEST, NULL_TOPIC_ID, null, partitionId));
}
if (partitionId < 0) {
return Optional.of(getWriteErrorResponse(Errors.INVALID_REQUEST, NEGATIVE_PARTITION_ID, topicId, partitionId));
return Optional.of(getWriteErrorCoordinatorResult(Errors.INVALID_REQUEST, NEGATIVE_PARTITION_ID, topicId, partitionId));
}
SharePartitionKey mapKey = SharePartitionKey.getInstance(groupId, topicId, partitionId);
if (partitionData.leaderEpoch() != -1 && leaderEpochMap.containsKey(mapKey) && leaderEpochMap.get(mapKey) > partitionData.leaderEpoch()) {
log.error("Request leader epoch smaller than last recorded.");
return Optional.of(getWriteErrorResponse(Errors.FENCED_LEADER_EPOCH, null, topicId, partitionId));
return Optional.of(getWriteErrorCoordinatorResult(Errors.FENCED_LEADER_EPOCH, null, topicId, partitionId));
}
if (partitionData.stateEpoch() != -1 && stateEpochMap.containsKey(mapKey) && stateEpochMap.get(mapKey) > partitionData.stateEpoch()) {
log.error("Request state epoch smaller than last recorded.");
return Optional.of(getWriteErrorResponse(Errors.FENCED_STATE_EPOCH, null, topicId, partitionId));
return Optional.of(getWriteErrorCoordinatorResult(Errors.FENCED_STATE_EPOCH, null, topicId, partitionId));
}
if (metadataImage == null) {
log.error("Metadata image is null");
return Optional.of(getWriteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, topicId, partitionId));
return Optional.of(getWriteErrorCoordinatorResult(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, topicId, partitionId));
}
if (metadataImage.topics().getTopic(topicId) == null ||
metadataImage.topics().getPartition(topicId, partitionId) == null) {
log.error("Topic/TopicPartition not found in metadata image.");
return Optional.of(getWriteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, topicId, partitionId));
return Optional.of(getWriteErrorCoordinatorResult(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, topicId, partitionId));
}
return Optional.empty();
@ -743,28 +788,65 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
int partitionId = partitionData.partition();
if (topicId == null) {
return Optional.of(getDeleteErrorResponse(Errors.INVALID_REQUEST, NULL_TOPIC_ID, null, partitionId));
return Optional.of(getDeleteErrorCoordinatorResult(Errors.INVALID_REQUEST, NULL_TOPIC_ID, null, partitionId));
}
if (partitionId < 0) {
return Optional.of(getDeleteErrorResponse(Errors.INVALID_REQUEST, NEGATIVE_PARTITION_ID, topicId, partitionId));
return Optional.of(getDeleteErrorCoordinatorResult(Errors.INVALID_REQUEST, NEGATIVE_PARTITION_ID, topicId, partitionId));
}
if (metadataImage == null) {
log.error("Metadata image is null");
return Optional.of(getDeleteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, topicId, partitionId));
return Optional.of(getDeleteErrorCoordinatorResult(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, topicId, partitionId));
}
if (metadataImage.topics().getTopic(topicId) == null ||
metadataImage.topics().getPartition(topicId, partitionId) == null) {
log.error("Topic/TopicPartition not found in metadata image.");
return Optional.of(getDeleteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, topicId, partitionId));
return Optional.of(getDeleteErrorCoordinatorResult(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, topicId, partitionId));
}
return Optional.empty();
}
private CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord> getWriteErrorResponse(
private Optional<CoordinatorResult<InitializeShareGroupStateResponseData, CoordinatorRecord>> maybeGetInitializeStateError(
InitializeShareGroupStateRequestData request
) {
InitializeShareGroupStateRequestData.InitializeStateData topicData = request.topics().get(0);
InitializeShareGroupStateRequestData.PartitionData partitionData = topicData.partitions().get(0);
Uuid topicId = topicData.topicId();
int partitionId = partitionData.partition();
if (topicId == null) {
return Optional.of(getInitializeErrorCoordinatorResult(Errors.INVALID_REQUEST, NULL_TOPIC_ID, null, partitionId));
}
if (partitionId < 0) {
return Optional.of(getInitializeErrorCoordinatorResult(Errors.INVALID_REQUEST, NEGATIVE_PARTITION_ID, topicId, partitionId));
}
SharePartitionKey key = SharePartitionKey.getInstance(request.groupId(), topicId, partitionId);
if (partitionData.stateEpoch() != -1 && stateEpochMap.containsKey(key) && stateEpochMap.get(key) > partitionData.stateEpoch()) {
log.error("Initialize request state epoch smaller than last recorded.");
return Optional.of(getInitializeErrorCoordinatorResult(Errors.FENCED_STATE_EPOCH, Errors.FENCED_STATE_EPOCH.exception(), topicId, partitionId));
}
if (metadataImage == null) {
log.error("Metadata image is null");
return Optional.of(getInitializeErrorCoordinatorResult(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, topicId, partitionId));
}
if (metadataImage.topics().getTopic(topicId) == null ||
metadataImage.topics().getPartition(topicId, partitionId) == null) {
log.error("Topic/TopicPartition not found in metadata image.");
return Optional.of(getInitializeErrorCoordinatorResult(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, topicId, partitionId));
}
return Optional.empty();
}
private CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord> getWriteErrorCoordinatorResult(
Errors error,
Exception exception,
Uuid topicId,
@ -772,10 +854,10 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
) {
String message = exception == null ? error.message() : exception.getMessage();
WriteShareGroupStateResponseData responseData = WriteShareGroupStateResponse.toErrorResponseData(topicId, partitionId, error, message);
return new CoordinatorResult<>(Collections.emptyList(), responseData);
return new CoordinatorResult<>(List.of(), responseData);
}
private CoordinatorResult<DeleteShareGroupStateResponseData, CoordinatorRecord> getDeleteErrorResponse(
private CoordinatorResult<DeleteShareGroupStateResponseData, CoordinatorRecord> getDeleteErrorCoordinatorResult(
Errors error,
Exception exception,
Uuid topicId,
@ -783,7 +865,18 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
) {
String message = exception == null ? error.message() : exception.getMessage();
DeleteShareGroupStateResponseData responseData = DeleteShareGroupStateResponse.toErrorResponseData(topicId, partitionId, error, message);
return new CoordinatorResult<>(Collections.emptyList(), responseData);
return new CoordinatorResult<>(List.of(), responseData);
}
private CoordinatorResult<InitializeShareGroupStateResponseData, CoordinatorRecord> getInitializeErrorCoordinatorResult(
Errors error,
Exception exception,
Uuid topicId,
int partitionId
) {
String message = exception == null ? error.message() : exception.getMessage();
InitializeShareGroupStateResponseData responseData = InitializeShareGroupStateResponse.toErrorResponseData(topicId, partitionId, error, message);
return new CoordinatorResult<>(List.of(), responseData);
}
// Visible for testing
@ -820,7 +913,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
.setLeaderEpoch(newLeaderEpoch)
.setStateBatches(new PersisterStateBatchCombiner(currentBatches, newData.stateBatches().stream()
.map(ShareCoordinatorShard::toPersisterStateBatch)
.collect(Collectors.toList()), newStartOffset)
.toList(), newStartOffset)
.combineStateBatches())
.build();
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.share;
import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
@ -94,13 +95,29 @@ public class ShareGroupOffset {
}
public static ShareGroupOffset fromRequest(WriteShareGroupStateRequestData.PartitionData data, int snapshotEpoch) {
return new ShareGroupOffset(snapshotEpoch,
return new ShareGroupOffset(
snapshotEpoch,
data.stateEpoch(),
data.leaderEpoch(),
data.startOffset(),
data.stateBatches().stream()
.map(PersisterStateBatch::from)
.collect(Collectors.toList()));
.toList()
);
}
public static ShareGroupOffset fromRequest(InitializeShareGroupStateRequestData.PartitionData data) {
return fromRequest(data, 0);
}
public static ShareGroupOffset fromRequest(InitializeShareGroupStateRequestData.PartitionData data, int snapshotEpoch) {
return new ShareGroupOffset(
snapshotEpoch,
data.stateEpoch(),
-1,
data.startOffset(),
List.of()
);
}
public LinkedHashSet<PersisterStateBatch> stateBatchAsSet() {

View File

@ -26,6 +26,8 @@ import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@ -51,7 +53,6 @@ import org.apache.kafka.server.util.timer.Timer;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -133,7 +134,7 @@ class ShareCoordinatorServiceTest {
WriteShareGroupStateRequestData request = new WriteShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(Arrays.asList(
.setTopics(List.of(
new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(topicId1)
.setPartitions(List.of(
@ -199,7 +200,7 @@ class ShareCoordinatorServiceTest {
HashSet<WriteShareGroupStateResponseData.WriteStateResult> result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
HashSet<WriteShareGroupStateResponseData.WriteStateResult> expectedResult = new HashSet<>(Arrays.asList(
HashSet<WriteShareGroupStateResponseData.WriteStateResult> expectedResult = new HashSet<>(List.of(
new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId2)
.setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult()
@ -210,7 +211,7 @@ class ShareCoordinatorServiceTest {
.setPartition(partition1)))));
assertEquals(expectedResult, result);
verify(time, times(2)).hiResClockMs();
Set<MetricName> expectedMetrics = new HashSet<>(Arrays.asList(
Set<MetricName> expectedMetrics = new HashSet<>(List.of(
metrics.metricName("write-latency-avg", ShareCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("write-latency-max", ShareCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("write-rate", ShareCoordinatorMetrics.METRICS_GROUP),
@ -243,7 +244,7 @@ class ShareCoordinatorServiceTest {
ReadShareGroupStateRequestData request = new ReadShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(Arrays.asList(
.setTopics(List.of(
new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(topicId1)
.setPartitions(List.of(
@ -283,7 +284,7 @@ class ShareCoordinatorServiceTest {
.setErrorCode(Errors.NONE.code())
.setStateEpoch(1)
.setStartOffset(0)
.setStateBatches(Arrays.asList(
.setStateBatches(List.of(
new ReadShareGroupStateResponseData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
@ -315,7 +316,7 @@ class ShareCoordinatorServiceTest {
HashSet<ReadShareGroupStateResponseData.ReadStateResult> result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
HashSet<ReadShareGroupStateResponseData.ReadStateResult> expectedResult = new HashSet<>(Arrays.asList(
HashSet<ReadShareGroupStateResponseData.ReadStateResult> expectedResult = new HashSet<>(List.of(
topicData1,
topicData2));
assertEquals(expectedResult, result);
@ -345,23 +346,20 @@ class ShareCoordinatorServiceTest {
ReadShareGroupStateSummaryRequestData request = new ReadShareGroupStateSummaryRequestData()
.setGroupId(groupId)
.setTopics(Arrays.asList(
new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(topicId1)
.setPartitions(List.of(
new ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartition(partition1)
.setLeaderEpoch(1)
)),
new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(topicId2)
.setPartitions(List.of(
new ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartition(partition2)
.setLeaderEpoch(1)
))
)
);
.setTopics(List.of(
new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(topicId1)
.setPartitions(List.of(
new ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartition(partition1)
.setLeaderEpoch(1))),
new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(topicId2)
.setPartitions(List.of(
new ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartition(partition2)
.setLeaderEpoch(1)))
));
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult topicData1 = new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
.setTopicId(topicId1)
@ -385,12 +383,12 @@ class ShareCoordinatorServiceTest {
eq("read-share-group-state-summary"),
eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)),
any(),
any()
))
.thenReturn(CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResponseData()
.setResults(List.of(topicData1))))
.thenReturn(CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResponseData()
.setResults(List.of(topicData2))));
any())
).thenReturn(CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResponseData()
.setResults(List.of(topicData1)))
).thenReturn(CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResponseData()
.setResults(List.of(topicData2)))
);
CompletableFuture<ReadShareGroupStateSummaryResponseData> future = service.readStateSummary(
requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY),
@ -399,7 +397,7 @@ class ShareCoordinatorServiceTest {
HashSet<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
HashSet<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> expectedResult = new HashSet<>(Arrays.asList(
HashSet<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> expectedResult = new HashSet<>(List.of(
topicData1,
topicData2));
assertEquals(expectedResult, result);
@ -432,21 +430,18 @@ class ShareCoordinatorServiceTest {
DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(Arrays.asList(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId1)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(partition1)
)),
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId2)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(partition2)
))
)
);
.setTopics(List.of(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId1)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(partition1))),
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId2)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(partition2)))
));
DeleteShareGroupStateResponseData response1 = new DeleteShareGroupStateResponseData()
.setResults(List.of(
@ -469,9 +464,7 @@ class ShareCoordinatorServiceTest {
eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)),
eq(Duration.ofMillis(5000)),
any()
))
.thenReturn(CompletableFuture.completedFuture(response1))
.thenReturn(CompletableFuture.completedFuture(response2));
)).thenReturn(CompletableFuture.completedFuture(response1)).thenReturn(CompletableFuture.completedFuture(response2));
CompletableFuture<DeleteShareGroupStateResponseData> future = service.deleteState(
requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE),
@ -480,7 +473,7 @@ class ShareCoordinatorServiceTest {
HashSet<DeleteShareGroupStateResponseData.DeleteStateResult> result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
HashSet<DeleteShareGroupStateResponseData.DeleteStateResult> expectedResult = new HashSet<>(Arrays.asList(
HashSet<DeleteShareGroupStateResponseData.DeleteStateResult> expectedResult = new HashSet<>(List.of(
new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(topicId2)
.setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult()
@ -492,6 +485,91 @@ class ShareCoordinatorServiceTest {
assertEquals(expectedResult, result);
}
@Test
public void testInitializeStateSuccess() throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Metrics metrics = new Metrics();
ShareCoordinatorMetrics coordinatorMetrics = new ShareCoordinatorMetrics(metrics);
Time time = mock(Time.class);
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
coordinatorMetrics,
time,
mock(Timer.class),
mock(PartitionWriter.class)
);
service.startup(() -> 1);
String groupId = "group1";
Uuid topicId1 = Uuid.randomUuid();
int partition1 = 0;
Uuid topicId2 = Uuid.randomUuid();
int partition2 = 1;
InitializeShareGroupStateRequestData request = new InitializeShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new InitializeShareGroupStateRequestData.InitializeStateData()
.setTopicId(topicId1)
.setPartitions(List.of(
new InitializeShareGroupStateRequestData.PartitionData()
.setPartition(partition1)
.setStartOffset(0)
.setStateEpoch(1))),
new InitializeShareGroupStateRequestData.InitializeStateData()
.setTopicId(topicId2)
.setPartitions(List.of(
new InitializeShareGroupStateRequestData.PartitionData()
.setPartition(partition2)
.setStartOffset(5)
.setStateEpoch(1)))
));
InitializeShareGroupStateResponseData response1 = new InitializeShareGroupStateResponseData().setResults(List.of(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId1)
.setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partition1)))
));
InitializeShareGroupStateResponseData response2 = new InitializeShareGroupStateResponseData().setResults(List.of(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId2)
.setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partition2)))
));
when(runtime.scheduleWriteOperation(
eq("initialize-share-group-state"),
eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)),
eq(Duration.ofMillis(5000)),
any())).thenReturn(CompletableFuture.completedFuture(response1)).thenReturn(CompletableFuture.completedFuture(response2)
);
CompletableFuture<InitializeShareGroupStateResponseData> future = service.initializeState(
requestContext(ApiKeys.INITIALIZE_SHARE_GROUP_STATE),
request
);
HashSet<InitializeShareGroupStateResponseData.InitializeStateResult> result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
HashSet<InitializeShareGroupStateResponseData.InitializeStateResult> expectedResult = new HashSet<>(List.of(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId2)
.setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partition2))),
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId1)
.setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partition1)))
));
assertEquals(expectedResult, result);
}
@Test
public void testWriteStateValidationsError() throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
@ -680,6 +758,53 @@ class ShareCoordinatorServiceTest {
);
}
@Test
public void testInitializeStateValidationError() throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
mock(PartitionWriter.class)
);
service.startup(() -> 1);
String groupId = "group1";
Uuid topicId = Uuid.randomUuid();
int partition = 0;
// 1. Empty topicsData
assertEquals(new InitializeShareGroupStateResponseData(),
service.initializeState(
requestContext(ApiKeys.INITIALIZE_SHARE_GROUP_STATE),
new InitializeShareGroupStateRequestData().setGroupId(groupId)
).get(5, TimeUnit.SECONDS)
);
// 2. Empty partitionsData
assertEquals(new InitializeShareGroupStateResponseData(),
service.initializeState(
requestContext(ApiKeys.INITIALIZE_SHARE_GROUP_STATE),
new InitializeShareGroupStateRequestData().setGroupId(groupId).setTopics(List.of(
new InitializeShareGroupStateRequestData.InitializeStateData().setTopicId(topicId)))
).get(5, TimeUnit.SECONDS)
);
// 3. Invalid groupId
assertEquals(new InitializeShareGroupStateResponseData(),
service.initializeState(
requestContext(ApiKeys.INITIALIZE_SHARE_GROUP_STATE),
new InitializeShareGroupStateRequestData().setGroupId(null).setTopics(List.of(
new InitializeShareGroupStateRequestData.InitializeStateData().setTopicId(topicId).setPartitions(List.of(
new InitializeShareGroupStateRequestData.PartitionData().setPartition(partition)))))
).get(5, TimeUnit.SECONDS)
);
}
@Test
public void testWriteStateWhenNotStarted() throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
@ -702,7 +827,7 @@ class ShareCoordinatorServiceTest {
WriteShareGroupStateRequestData request = new WriteShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(Arrays.asList(
.setTopics(List.of(
new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(topicId1)
.setPartitions(List.of(
@ -743,7 +868,7 @@ class ShareCoordinatorServiceTest {
HashSet<WriteShareGroupStateResponseData.WriteStateResult> result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
HashSet<WriteShareGroupStateResponseData.WriteStateResult> expectedResult = new HashSet<>(Arrays.asList(
HashSet<WriteShareGroupStateResponseData.WriteStateResult> expectedResult = new HashSet<>(List.of(
new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId2)
.setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult()
@ -781,7 +906,7 @@ class ShareCoordinatorServiceTest {
ReadShareGroupStateRequestData request = new ReadShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(Arrays.asList(
.setTopics(List.of(
new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(topicId1)
.setPartitions(List.of(
@ -806,7 +931,7 @@ class ShareCoordinatorServiceTest {
HashSet<ReadShareGroupStateResponseData.ReadStateResult> result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
HashSet<ReadShareGroupStateResponseData.ReadStateResult> expectedResult = new HashSet<>(Arrays.asList(
HashSet<ReadShareGroupStateResponseData.ReadStateResult> expectedResult = new HashSet<>(List.of(
new ReadShareGroupStateResponseData.ReadStateResult()
.setTopicId(topicId2)
.setPartitions(List.of(new ReadShareGroupStateResponseData.PartitionResult()
@ -844,7 +969,7 @@ class ShareCoordinatorServiceTest {
ReadShareGroupStateSummaryRequestData request = new ReadShareGroupStateSummaryRequestData()
.setGroupId(groupId)
.setTopics(Arrays.asList(
.setTopics(List.of(
new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(topicId1)
.setPartitions(List.of(
@ -869,7 +994,7 @@ class ShareCoordinatorServiceTest {
HashSet<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
HashSet<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> expectedResult = new HashSet<>(Arrays.asList(
HashSet<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> expectedResult = new HashSet<>(List.of(
new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
.setTopicId(topicId2)
.setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult()
@ -907,7 +1032,7 @@ class ShareCoordinatorServiceTest {
DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(Arrays.asList(
.setTopics(List.of(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId1)
.setPartitions(List.of(
@ -930,7 +1055,7 @@ class ShareCoordinatorServiceTest {
HashSet<DeleteShareGroupStateResponseData.DeleteStateResult> result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
HashSet<DeleteShareGroupStateResponseData.DeleteStateResult> expectedResult = new HashSet<>(Arrays.asList(
HashSet<DeleteShareGroupStateResponseData.DeleteStateResult> expectedResult = new HashSet<>(List.of(
new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(topicId2)
.setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult()
@ -946,6 +1071,66 @@ class ShareCoordinatorServiceTest {
assertEquals(expectedResult, result);
}
@Test
public void testInitializeStateWhenNotStarted() throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
mock(PartitionWriter.class)
);
String groupId = "group1";
Uuid topicId1 = Uuid.randomUuid();
int partition1 = 0;
Uuid topicId2 = Uuid.randomUuid();
int partition2 = 1;
InitializeShareGroupStateRequestData request = new InitializeShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new InitializeShareGroupStateRequestData.InitializeStateData()
.setTopicId(topicId1)
.setPartitions(List.of(
new InitializeShareGroupStateRequestData.PartitionData()
.setPartition(partition1)
)),
new InitializeShareGroupStateRequestData.InitializeStateData()
.setTopicId(topicId2)
.setPartitions(List.of(
new InitializeShareGroupStateRequestData.PartitionData()
.setPartition(partition2)
))
));
CompletableFuture<InitializeShareGroupStateResponseData> future = service.initializeState(
requestContext(ApiKeys.INITIALIZE_SHARE_GROUP_STATE),
request
);
HashSet<InitializeShareGroupStateResponseData.InitializeStateResult> result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
HashSet<InitializeShareGroupStateResponseData.InitializeStateResult> expectedResult = new HashSet<>(List.of(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId2)
.setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partition2)
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage("Share coordinator is not available."))),
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId1)
.setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partition1)
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage("Share coordinator is not available.")))));
assertEquals(expectedResult, result);
}
@Test
public void testWriteFutureReturnsError() throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
@ -1124,6 +1309,47 @@ class ShareCoordinatorServiceTest {
);
}
@Test
public void testInitializeFutureReturnsError() throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
mock(PartitionWriter.class)
);
service.startup(() -> 1);
String groupId = "group1";
Uuid topicId = Uuid.randomUuid();
int partition = 0;
when(runtime.scheduleWriteOperation(any(), any(), any(), any())).thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
assertEquals(
new InitializeShareGroupStateResponseData().setResults(List.of(new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage("Unable to initialize share group state: This server does not host this topic-partition.")))
)),
service.initializeState(requestContext(ApiKeys.INITIALIZE_SHARE_GROUP_STATE),
new InitializeShareGroupStateRequestData().setGroupId(groupId)
.setTopics(List.of(new InitializeShareGroupStateRequestData.InitializeStateData()
.setTopicId(topicId)
.setPartitions(List.of(new InitializeShareGroupStateRequestData.PartitionData()
.setPartition(partition)
))
))
).get(5, TimeUnit.SECONDS)
);
}
@Test
public void testTopicPartitionFor() {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
@ -1676,7 +1902,7 @@ class ShareCoordinatorServiceTest {
}
private void checkMetrics(Metrics metrics) {
Set<MetricName> usualMetrics = new HashSet<>(Arrays.asList(
Set<MetricName> usualMetrics = new HashSet<>(List.of(
metrics.metricName("write-latency-avg", ShareCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("write-latency-max", ShareCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("write-rate", ShareCoordinatorMetrics.METRICS_GROUP),
@ -1687,17 +1913,12 @@ class ShareCoordinatorServiceTest {
}
private void checkPruneMetric(Metrics metrics, String topic, int partition, boolean checkPresence) {
boolean isPresent = metrics.metrics().containsKey(
metrics.metricName(
"last-pruned-offset",
ShareCoordinatorMetrics.METRICS_GROUP,
"The offset at which the share-group state topic was last pruned.",
Map.of(
"topic", topic,
"partition", Integer.toString(partition)
)
)
);
boolean isPresent = metrics.metrics().containsKey(metrics.metricName(
"last-pruned-offset",
ShareCoordinatorMetrics.METRICS_GROUP,
"The offset at which the share-group state topic was last pruned.",
Map.of("topic", topic, "partition", Integer.toString(partition))
));
assertEquals(checkPresence, isPresent);
}
}

View File

@ -20,6 +20,8 @@ package org.apache.kafka.coordinator.share;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@ -29,6 +31,7 @@ import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
import org.apache.kafka.common.requests.InitializeShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
@ -1173,6 +1176,184 @@ class ShareCoordinatorShardTest {
verify(topicsImage, times(1)).getPartition(eq(TOPIC_ID), eq(0));
}
@Test
public void testInitializeStateSuccess() {
ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build();
SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
InitializeShareGroupStateRequestData request = new InitializeShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(List.of(new InitializeShareGroupStateRequestData.InitializeStateData()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new InitializeShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(10)
.setStateEpoch(5)))
));
assertNull(shard.getShareStateMapValue(shareCoordinatorKey));
assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
CoordinatorResult<InitializeShareGroupStateResponseData, CoordinatorRecord> result = shard.initializeState(request);
result.records().forEach(record -> shard.replay(0L, 0L, (short) 0, record));
InitializeShareGroupStateResponseData expectedData = InitializeShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
List<CoordinatorRecord> expectedRecords = List.of(
ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0))
));
assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records());
assertNotNull(shard.getShareStateMapValue(shareCoordinatorKey));
assertNotNull(shard.getStateEpochMapValue(shareCoordinatorKey));
}
@Test
public void testInitializeStateInvalidRequestData() {
ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build();
// invalid partition
int partition = -1;
InitializeShareGroupStateRequestData request = new InitializeShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(List.of(new InitializeShareGroupStateRequestData.InitializeStateData()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new InitializeShareGroupStateRequestData.PartitionData()
.setPartition(partition)
))
));
CoordinatorResult<InitializeShareGroupStateResponseData, CoordinatorRecord> result = shard.initializeState(request);
InitializeShareGroupStateResponseData expectedData = InitializeShareGroupStateResponse.toErrorResponseData(
TOPIC_ID, partition, Errors.INVALID_REQUEST, ShareCoordinatorShard.NEGATIVE_PARTITION_ID.getMessage());
List<CoordinatorRecord> expectedRecords = List.of();
assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records());
// invalid state epoch
int stateEpoch = 1;
partition = 0;
shard.replay(0L, 0L, (short) 0, ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID, TOPIC_ID, partition, new ShareGroupOffset.Builder()
.setStateEpoch(5)
.setSnapshotEpoch(0)
.setStateBatches(List.of())
.build()
));
request = new InitializeShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(List.of(new InitializeShareGroupStateRequestData.InitializeStateData()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new InitializeShareGroupStateRequestData.PartitionData()
.setPartition(partition)
))
));
result = shard.initializeState(request);
expectedData = InitializeShareGroupStateResponse.toErrorResponseData(
TOPIC_ID, partition, Errors.FENCED_STATE_EPOCH, Errors.FENCED_STATE_EPOCH.exception().getMessage());
expectedRecords = List.of();
assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records());
}
@Test
public void testInitializeNullMetadataImage() {
ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build();
shard.onNewMetadataImage(null, null);
InitializeShareGroupStateRequestData request = new InitializeShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(List.of(new InitializeShareGroupStateRequestData.InitializeStateData()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new InitializeShareGroupStateRequestData.PartitionData()
.setPartition(0)
))
));
CoordinatorResult<InitializeShareGroupStateResponseData, CoordinatorRecord> result = shard.initializeState(request);
InitializeShareGroupStateResponseData expectedData = InitializeShareGroupStateResponse.toErrorResponseData(
TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.UNKNOWN_TOPIC_OR_PARTITION.message());
List<CoordinatorRecord> expectedRecords = List.of();
assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records());
}
@Test
public void testInitializeTopicIdNonExistentInMetadataImage() {
ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build();
MetadataImage image = mock(MetadataImage.class);
shard.onNewMetadataImage(image, null);
InitializeShareGroupStateRequestData request = new InitializeShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(List.of(new InitializeShareGroupStateRequestData.InitializeStateData()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new InitializeShareGroupStateRequestData.PartitionData()
.setPartition(0)
))
));
// topic id not found in cache
TopicsImage topicsImage = mock(TopicsImage.class);
when(topicsImage.getTopic(eq(TOPIC_ID))).thenReturn(null);
when(image.topics()).thenReturn(topicsImage);
CoordinatorResult<InitializeShareGroupStateResponseData, CoordinatorRecord> result = shard.initializeState(request);
InitializeShareGroupStateResponseData expectedData = InitializeShareGroupStateResponse.toErrorResponseData(
TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.UNKNOWN_TOPIC_OR_PARTITION.message());
List<CoordinatorRecord> expectedRecords = List.of();
assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records());
verify(topicsImage, times(1)).getTopic(eq(TOPIC_ID));
}
@Test
public void testInitializePartitionIdNonExistentInMetadataImage() {
ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build();
MetadataImage image = mock(MetadataImage.class);
shard.onNewMetadataImage(image, null);
InitializeShareGroupStateRequestData request = new InitializeShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(List.of(new InitializeShareGroupStateRequestData.InitializeStateData()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new InitializeShareGroupStateRequestData.PartitionData()
.setPartition(0)
))
));
// topic id found in cache
TopicsImage topicsImage = mock(TopicsImage.class);
when(topicsImage.getTopic(eq(TOPIC_ID))).thenReturn(mock(TopicImage.class));
when(image.topics()).thenReturn(topicsImage);
// partition id not found
when(topicsImage.getPartition(eq(TOPIC_ID), eq(0))).thenReturn(null);
CoordinatorResult<InitializeShareGroupStateResponseData, CoordinatorRecord> result = shard.initializeState(request);
InitializeShareGroupStateResponseData expectedData = InitializeShareGroupStateResponse.toErrorResponseData(
TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.UNKNOWN_TOPIC_OR_PARTITION.message());
List<CoordinatorRecord> expectedRecords = List.of();
assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records());
verify(topicsImage, times(1)).getTopic(eq(TOPIC_ID));
verify(topicsImage, times(1)).getPartition(eq(TOPIC_ID), eq(0));
}
private static ShareGroupOffset groupOffset(ApiMessage record) {
if (record instanceof ShareSnapshotValue) {
return ShareGroupOffset.fromRecord((ShareSnapshotValue) record);