KAFKA-17914: Update string ref with SharePartitionKey. (#17660)

Currently, we are using the String repr of the shareCoordinator/sharePartition key (groupId:topicId:parition) as defined in kip-932 in a few methods like ShareCoordinator.partitionFor and ShareCoordinatorMetadataCacheHelper.getShareCoordinator.
This has the potential to introduce subtle bugs when incorrect strings are used to invoke these methods. What is perturbing is that the failures might be intermittent.
This PR aims to remedy the situation by changing the type to the concrete SharePartitionKey. This way callers need not be worried about a specific encoding or format of the coordinator key as long as the SharePartitionKey has the correct fields set.
There is one issue - the FIND_COORDINATOR RPC does require the coordinator key to be set as a String in the request body. We can't get around this and have to set the value as String. However, on the KafkaApis handler side we parse this key into a SharePartitionKey again and gain compile time safety. One downside is that we need to split and format the incoming coordinator key in the request but that can be encapsulated at a single location in SharePartitionKey.
Added tests for partitionFor.

Reviewers: Andrew Schofield <aschofield@confluent.io>,  Apoorv Mittal <apoorvmittal10@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
Sushant Mahajan 2024-11-08 15:05:39 +05:30 committed by GitHub
parent b9976437e1
commit 2e2b0a58ed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 162 additions and 64 deletions

View File

@ -76,7 +76,7 @@ import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_I
import org.apache.kafka.server.purgatory.TopicPartitionOperationKey
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.share.context.ShareFetchContext
import org.apache.kafka.server.share.ErroneousAndValidPartitionData
import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, SharePartitionKey}
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData}
import org.apache.kafka.storage.internals.log.AppendOrigin
@ -1645,6 +1645,13 @@ class KafkaApis(val requestChannel: RequestChannel,
if (shareCoordinator.isEmpty) {
return (Errors.INVALID_REQUEST, Node.noNode)
}
try {
SharePartitionKey.validate(key)
} catch {
case e: IllegalArgumentException =>
error(s"Share coordinator key is invalid", e)
(Errors.INVALID_REQUEST, Node.noNode())
}
}
val (partition, internalTopicName) = CoordinatorType.forId(keyType) match {
case CoordinatorType.GROUP =>
@ -1654,8 +1661,7 @@ class KafkaApis(val requestChannel: RequestChannel,
(txnCoordinator.partitionFor(key), TRANSACTION_STATE_TOPIC_NAME)
case CoordinatorType.SHARE =>
// None check already done above
(shareCoordinator.get.partitionFor(key), SHARE_GROUP_STATE_TOPIC_NAME)
(shareCoordinator.foreach(coordinator => coordinator.partitionFor(SharePartitionKey.getInstance(key))), SHARE_GROUP_STATE_TOPIC_NAME)
}
val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)

View File

@ -24,6 +24,7 @@ import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;
import java.util.HashSet;
@ -38,12 +39,12 @@ import scala.jdk.javaapi.OptionConverters;
public class ShareCoordinatorMetadataCacheHelperImpl implements ShareCoordinatorMetadataCacheHelper {
private final MetadataCache metadataCache;
private final Function<String, Integer> keyToPartitionMapper;
private final Function<SharePartitionKey, Integer> keyToPartitionMapper;
private final ListenerName interBrokerListenerName;
public ShareCoordinatorMetadataCacheHelperImpl(
MetadataCache metadataCache,
Function<String, Integer> keyToPartitionMapper,
Function<SharePartitionKey, Integer> keyToPartitionMapper,
ListenerName interBrokerListenerName
) {
Objects.requireNonNull(metadataCache, "metadataCache must not be null");
@ -61,7 +62,7 @@ public class ShareCoordinatorMetadataCacheHelperImpl implements ShareCoordinator
}
@Override
public Node getShareCoordinator(String key, String internalTopicName) {
public Node getShareCoordinator(SharePartitionKey key, String internalTopicName) {
if (metadataCache.contains(internalTopicName)) {
Set<String> topicSet = new HashSet<>();
topicSet.add(internalTopicName);

View File

@ -1340,6 +1340,10 @@ class KafkaApisTest extends Logging {
topicConfigOverride.put(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout.toString)
val groupId = "group"
val topicId = Uuid.randomUuid
val partition = 0
var key:String = groupId
val topicName =
coordinatorType match {
case CoordinatorType.GROUP =>
@ -1359,6 +1363,7 @@ class KafkaApisTest extends Logging {
case CoordinatorType.SHARE =>
authorizeResource(authorizer, AclOperation.CLUSTER_ACTION, ResourceType.CLUSTER,
Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
key = "%s:%s:%d" format(groupId, topicId, partition)
Topic.SHARE_GROUP_STATE_TOPIC_NAME
case _ =>
throw new IllegalStateException(s"Unknown coordinator type $coordinatorType")
@ -1368,12 +1373,12 @@ class KafkaApisTest extends Logging {
new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(coordinatorType.id())
.setCoordinatorKeys(asList(groupId)))
.setCoordinatorKeys(asList(key)))
} else {
new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(coordinatorType.id())
.setKey(groupId))
.setKey(key))
}
val request = buildRequest(findCoordinatorRequestBuilder.build(requestHeader.apiVersion))
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
@ -1389,7 +1394,7 @@ class KafkaApisTest extends Logging {
assertEquals(Errors.INVALID_REQUEST.code, response.data.coordinators.get(0).errorCode)
} else if (version >= 4) {
assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code, response.data.coordinators.get(0).errorCode)
assertEquals(groupId, response.data.coordinators.get(0).key)
assertEquals(key, response.data.coordinators.get(0).key)
} else {
assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code, response.data.errorCode)
assertTrue(capturedRequest.getValue.isEmpty)

View File

@ -24,6 +24,7 @@ import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.share.SharePartitionKey;
import java.util.OptionalInt;
import java.util.Properties;
@ -39,10 +40,10 @@ public interface ShareCoordinator {
/**
* Return the partition index for the given key.
*
* @param key - groupId:topicId:partitionId.
* @param key - reference to {@link SharePartitionKey}.
* @return The partition index.
*/
int partitionFor(String key);
int partitionFor(SharePartitionKey key);
/**
* Return the configuration properties of the share-group state topic.

View File

@ -201,9 +201,12 @@ public class ShareCoordinatorService implements ShareCoordinator {
}
@Override
public int partitionFor(String key) {
public int partitionFor(SharePartitionKey key) {
throwIfNotActive();
return Utils.abs(key.hashCode()) % numPartitions;
// Call to asCoordinatorKey is necessary as we depend only on topicId (Uuid) and
// not topic name. We do not want this calculation to distinguish between 2
// SharePartitionKeys where everything except topic name is the same.
return Utils.abs(key.asCoordinatorKey().hashCode()) % numPartitions;
}
@Override
@ -539,7 +542,7 @@ public class ShareCoordinatorService implements ShareCoordinator {
}
TopicPartition topicPartitionFor(SharePartitionKey key) {
return new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, partitionFor(key.asCoordinatorKey()));
return new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, partitionFor(key));
}
private static <P> boolean isEmpty(List<P> list) {

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
@ -31,6 +32,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
@ -52,6 +54,7 @@ import java.util.concurrent.TimeoutException;
import static org.apache.kafka.coordinator.common.runtime.TestUtil.requestContext;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@ -637,4 +640,38 @@ class ShareCoordinatorServiceTest {
assertEquals(Topic.SHARE_GROUP_STATE_TOPIC_NAME, tp.topic());
assertEquals(expectedPartition, tp.partition());
}
@Test
public void testPartitionFor() {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
Time.SYSTEM
);
String groupId = "group1";
Uuid topicId = Uuid.randomUuid();
int partition = 0;
// inactive shard should throw exception
assertThrows(CoordinatorNotAvailableException.class, () -> service.partitionFor(SharePartitionKey.getInstance(groupId, topicId, partition)));
final int numPartitions = 50;
service.startup(() -> numPartitions);
final SharePartitionKey key1 = SharePartitionKey.getInstance(groupId, new TopicIdPartition(topicId, partition, null));
int sharePartitionKey = service.partitionFor(key1);
assertEquals(Utils.abs(key1.asCoordinatorKey().hashCode()) % numPartitions, sharePartitionKey);
// The presence of a topic name should not affect the choice of partition
final SharePartitionKey key2 = new SharePartitionKey(groupId, new TopicIdPartition(topicId, partition, "whatever"));
sharePartitionKey = service.partitionFor(key2);
assertEquals(Utils.abs(key2.asCoordinatorKey().hashCode()) % numPartitions, sharePartitionKey);
// asCoordinatorKey does not discriminate on topic name
assertEquals(key1.asCoordinatorKey(), key2.asCoordinatorKey());
}
}

View File

@ -65,6 +65,56 @@ public class SharePartitionKey {
return getInstance(groupId, topicIdPartition.topicId(), topicIdPartition.partition());
}
/**
* Returns a SharePartitionKey from input string of format - groupId:topicId:partition
* @param key - String in format groupId:topicId:partition
* @return object representing SharePartitionKey
* @throws IllegalArgumentException if the key is empty or has invalid format
*/
public static SharePartitionKey getInstance(String key) {
validate(key);
String[] tokens = key.split(":");
return new SharePartitionKey(
tokens[0].trim(),
Uuid.fromString(tokens[1]),
Integer.parseInt(tokens[2])
);
}
/**
* Validates whether the String argument has a valid SharePartitionKey format - groupId:topicId:partition
* @param key - String in format groupId:topicId:partition
* @throws IllegalArgumentException if the key is empty or has invalid format
*/
public static void validate(String key) {
Objects.requireNonNull(key, "Share partition key cannot be null");
if (key.isEmpty()) {
throw new IllegalArgumentException("Share partition key cannot be empty");
}
String[] tokens = key.split(":");
if (tokens.length != 3) {
throw new IllegalArgumentException("Invalid key format: expected - groupId:topicId:partition, found - " + key);
}
if (tokens[0].trim().isEmpty()) {
throw new IllegalArgumentException("GroupId must be alphanumeric string");
}
try {
Uuid.fromString(tokens[1]);
} catch (Exception e) {
throw new IllegalArgumentException("Invalid topic ID: " + tokens[1], e);
}
try {
Integer.parseInt(tokens[2]);
} catch (Exception e) {
throw new IllegalArgumentException("Invalid partition: " + tokens[2], e);
}
}
public static SharePartitionKey getInstance(String groupId, Uuid topicId, int partition) {
return new SharePartitionKey(groupId, topicId, partition);
}
@ -97,7 +147,7 @@ public class SharePartitionKey {
@Override
public String toString() {
return "SharePartitionKey{" +
"groupId='" + groupId +
"groupId=" + groupId +
", topicIdPartition=" + topicIdPartition +
'}';
}

View File

@ -206,12 +206,10 @@ public class PersisterStateManager {
*/
public abstract class PersisterStateManagerHandler implements RequestCompletionHandler {
protected Node coordinatorNode;
protected final String groupId;
protected final Uuid topicId;
protected final int partition;
private final BackoffManager findCoordBackoff;
protected final Logger log = LoggerFactory.getLogger(getClass());
private Consumer<ClientResponse> onCompleteCallback;
protected final SharePartitionKey partitionKey;
public PersisterStateManagerHandler(
String groupId,
@ -221,12 +219,10 @@ public class PersisterStateManager {
long backoffMaxMs,
int maxRPCRetryAttempts
) {
this.groupId = groupId;
this.topicId = topicId;
this.partition = partition;
this.findCoordBackoff = new BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs);
this.onCompleteCallback = response -> {
}; // noop
partitionKey = SharePartitionKey.getInstance(groupId, topicId, partition);
}
/**
@ -290,7 +286,7 @@ public class PersisterStateManager {
protected AbstractRequest.Builder<FindCoordinatorRequest> findShareCoordinatorBuilder() {
return new FindCoordinatorRequest.Builder(new FindCoordinatorRequestData()
.setKeyType(FindCoordinatorRequest.CoordinatorType.SHARE.id())
.setKey(coordinatorKey()));
.setKey(partitionKey().asCoordinatorKey()));
}
public void addRequestToNodeMap(Node node, PersisterStateManagerHandler handler) {
@ -300,7 +296,7 @@ public class PersisterStateManager {
synchronized (nodeMapLock) {
nodeRPCMap.computeIfAbsent(node, k -> new HashMap<>())
.computeIfAbsent(handler.rpcType(), k -> new HashMap<>())
.computeIfAbsent(handler.groupId, k -> new LinkedList<>())
.computeIfAbsent(partitionKey().groupId(), k -> new LinkedList<>())
.add(handler);
}
sender.wakeup();
@ -317,7 +313,7 @@ public class PersisterStateManager {
}
if (cacheHelper.containsTopic(Topic.SHARE_GROUP_STATE_TOPIC_NAME)) {
log.debug("{} internal topic already exists.", Topic.SHARE_GROUP_STATE_TOPIC_NAME);
Node node = cacheHelper.getShareCoordinator(coordinatorKey(), Topic.SHARE_GROUP_STATE_TOPIC_NAME);
Node node = cacheHelper.getShareCoordinator(partitionKey(), Topic.SHARE_GROUP_STATE_TOPIC_NAME);
if (node != Node.noNode()) {
log.debug("Found coordinator node in cache: {}", node);
coordinatorNode = node;
@ -333,8 +329,8 @@ public class PersisterStateManager {
*
* @return String
*/
protected String coordinatorKey() {
return SharePartitionKey.asCoordinatorKey(groupId, topicId, partition);
protected SharePartitionKey partitionKey() {
return partitionKey;
}
/**
@ -378,7 +374,7 @@ public class PersisterStateManager {
findCoordBackoff.incrementAttempt();
List<FindCoordinatorResponseData.Coordinator> coordinators = ((FindCoordinatorResponse) response.responseBody()).coordinators();
if (coordinators.size() != 1) {
log.error("Find coordinator response for {} is invalid", coordinatorKey());
log.error("Find coordinator response for {} is invalid", partitionKey());
findCoordinatorErrorResponse(Errors.UNKNOWN_SERVER_ERROR, new IllegalStateException("Invalid response with multiple coordinators."));
return;
}
@ -399,12 +395,12 @@ public class PersisterStateManager {
}
break;
case COORDINATOR_NOT_AVAILABLE: // retryable error codes
case COORDINATOR_NOT_AVAILABLE: // retriable error codes
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
log.warn("Received retryable error in find coordinator: {}", error.message());
log.warn("Received retriable error in find coordinator for {} using key {}: {}", name(), partitionKey(), error.message());
if (!findCoordBackoff.canAttempt()) {
log.error("Exhausted max retries to find coordinator without success.");
log.error("Exhausted max retries to find coordinator for {} using key {} without success.", name(), partitionKey());
findCoordinatorErrorResponse(error, new Exception("Exhausted max retries to find coordinator without success."));
break;
}
@ -413,7 +409,7 @@ public class PersisterStateManager {
break;
default:
log.error("Unable to find coordinator.");
log.error("Unable to find coordinator for {} using key {}.", name(), partitionKey());
findCoordinatorErrorResponse(error, null);
}
}
@ -517,9 +513,9 @@ public class PersisterStateManager {
WriteShareGroupStateResponse combinedResponse = (WriteShareGroupStateResponse) response.responseBody();
for (WriteShareGroupStateResponseData.WriteStateResult writeStateResult : combinedResponse.data().results()) {
if (writeStateResult.topicId().equals(topicId)) {
if (writeStateResult.topicId().equals(partitionKey().topicId())) {
Optional<WriteShareGroupStateResponseData.PartitionResult> partitionStateData =
writeStateResult.partitions().stream().filter(partitionResult -> partitionResult.partition() == partition)
writeStateResult.partitions().stream().filter(partitionResult -> partitionResult.partition() == partitionKey().partition())
.findFirst();
if (partitionStateData.isPresent()) {
@ -528,20 +524,20 @@ public class PersisterStateManager {
case NONE:
writeStateBackoff.resetAttempts();
WriteShareGroupStateResponseData.WriteStateResult result = WriteShareGroupStateResponse.toResponseWriteStateResult(
topicId,
partitionKey().topicId(),
Collections.singletonList(partitionStateData.get())
);
this.result.complete(new WriteShareGroupStateResponse(
new WriteShareGroupStateResponseData().setResults(Collections.singletonList(result))));
return;
// check retryable errors
// check retriable errors
case COORDINATOR_NOT_AVAILABLE:
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
log.warn("Received retryable error in write state RPC: {}", error.message());
log.warn("Received retriable error in write state RPC for key {}: {}", partitionKey(), error.message());
if (!writeStateBackoff.canAttempt()) {
log.error("Exhausted max retries for write state RPC without success.");
log.error("Exhausted max retries for write state RPC for key {} without success.", partitionKey());
writeStateErrorResponse(error, new Exception("Exhausted max retries to complete write state RPC without success."));
return;
}
@ -550,7 +546,7 @@ public class PersisterStateManager {
return;
default:
log.error("Unable to perform write state RPC: {}", error.message());
log.error("Unable to perform write state RPC for key {}: {}", partitionKey(), error.message());
writeStateErrorResponse(error, null);
return;
}
@ -560,21 +556,21 @@ public class PersisterStateManager {
// no response found specific topic partition
IllegalStateException exception = new IllegalStateException(
"Failed to write state for partition " + partition + " in topic " + topicId + " for group " + groupId
"Failed to write state for share partition: " + partitionKey()
);
writeStateErrorResponse(Errors.forException(exception), exception);
}
private void writeStateErrorResponse(Errors error, Exception exception) {
this.result.complete(new WriteShareGroupStateResponse(
WriteShareGroupStateResponse.toErrorResponseData(topicId, partition, error, "Error in write state RPC. " +
WriteShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), error, "Error in write state RPC. " +
(exception == null ? error.message() : exception.getMessage()))));
}
@Override
protected void findCoordinatorErrorResponse(Errors error, Exception exception) {
this.result.complete(new WriteShareGroupStateResponse(
WriteShareGroupStateResponse.toErrorResponseData(topicId, partition, error, "Error in find coordinator. " +
WriteShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), error, "Error in find coordinator. " +
(exception == null ? error.message() : exception.getMessage()))));
}
@ -595,7 +591,6 @@ public class PersisterStateManager {
public class ReadStateHandler extends PersisterStateManagerHandler {
private final int leaderEpoch;
private final String coordinatorKey;
private final CompletableFuture<ReadShareGroupStateResponse> result;
private final BackoffManager readStateBackoff;
@ -612,7 +607,6 @@ public class PersisterStateManager {
) {
super(groupId, topicId, partition, backoffMs, backoffMaxMs, maxRPCRetryAttempts);
this.leaderEpoch = leaderEpoch;
this.coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId, topicId, partition);
this.result = result;
this.readStateBackoff = new BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs);
}
@ -660,9 +654,9 @@ public class PersisterStateManager {
ReadShareGroupStateResponse combinedResponse = (ReadShareGroupStateResponse) response.responseBody();
for (ReadShareGroupStateResponseData.ReadStateResult readStateResult : combinedResponse.data().results()) {
if (readStateResult.topicId().equals(topicId)) {
if (readStateResult.topicId().equals(partitionKey().topicId())) {
Optional<ReadShareGroupStateResponseData.PartitionResult> partitionStateData =
readStateResult.partitions().stream().filter(partitionResult -> partitionResult.partition() == partition)
readStateResult.partitions().stream().filter(partitionResult -> partitionResult.partition() == partitionKey().partition())
.findFirst();
if (partitionStateData.isPresent()) {
@ -671,20 +665,20 @@ public class PersisterStateManager {
case NONE:
readStateBackoff.resetAttempts();
ReadShareGroupStateResponseData.ReadStateResult result = ReadShareGroupStateResponse.toResponseReadStateResult(
topicId,
partitionKey().topicId(),
Collections.singletonList(partitionStateData.get())
);
this.result.complete(new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData()
.setResults(Collections.singletonList(result))));
return;
// check retryable errors
// check retriable errors
case COORDINATOR_NOT_AVAILABLE:
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
log.warn("Received retryable error in read state RPC: {}", error.message());
log.warn("Received retriable error in read state RPC for key {}: {}", partitionKey(), error.message());
if (!readStateBackoff.canAttempt()) {
log.error("Exhausted max retries for read state RPC without success.");
log.error("Exhausted max retries for read state RPC for key {} without success.", partitionKey());
readStateErrorReponse(error, new Exception("Exhausted max retries to complete read state RPC without success."));
return;
}
@ -693,7 +687,7 @@ public class PersisterStateManager {
return;
default:
log.error("Unable to perform read state RPC: {}", error.message());
log.error("Unable to perform read state RPC for key {}: {}", partitionKey(), error.message());
readStateErrorReponse(error, null);
return;
}
@ -703,21 +697,21 @@ public class PersisterStateManager {
// no response found specific topic partition
IllegalStateException exception = new IllegalStateException(
"Failed to read state for partition " + partition + " in topic " + topicId + " for group " + groupId
"Failed to read state for share partition " + partitionKey()
);
readStateErrorReponse(Errors.forException(exception), exception);
}
protected void readStateErrorReponse(Errors error, Exception exception) {
this.result.complete(new ReadShareGroupStateResponse(
ReadShareGroupStateResponse.toErrorResponseData(topicId, partition, error, "Error in find coordinator. " +
ReadShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), error, "Error in find coordinator. " +
(exception == null ? error.message() : exception.getMessage()))));
}
@Override
protected void findCoordinatorErrorResponse(Errors error, Exception exception) {
this.result.complete(new ReadShareGroupStateResponse(
ReadShareGroupStateResponse.toErrorResponseData(topicId, partition, error, "Error in read state RPC. " +
ReadShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), error, "Error in read state RPC. " +
(exception == null ? error.message() : exception.getMessage()))));
}
@ -928,10 +922,10 @@ public class PersisterStateManager {
handlers.forEach(persHandler -> {
assert persHandler instanceof WriteStateHandler;
WriteStateHandler handler = (WriteStateHandler) persHandler;
partitionData.computeIfAbsent(handler.topicId, topicId -> new LinkedList<>())
partitionData.computeIfAbsent(handler.partitionKey().topicId(), topicId -> new LinkedList<>())
.add(
new WriteShareGroupStateRequestData.PartitionData()
.setPartition(handler.partition)
.setPartition(handler.partitionKey().partition())
.setStateEpoch(handler.stateEpoch)
.setLeaderEpoch(handler.leaderEpoch)
.setStartOffset(handler.startOffset)
@ -959,10 +953,10 @@ public class PersisterStateManager {
handlers.forEach(persHandler -> {
assert persHandler instanceof ReadStateHandler;
ReadStateHandler handler = (ReadStateHandler) persHandler;
partitionData.computeIfAbsent(handler.topicId, topicId -> new LinkedList<>())
partitionData.computeIfAbsent(handler.partitionKey().topicId(), topicId -> new LinkedList<>())
.add(
new ReadShareGroupStateRequestData.PartitionData()
.setPartition(handler.partition)
.setPartition(handler.partitionKey().partition())
.setLeaderEpoch(handler.leaderEpoch)
);
});

View File

@ -18,13 +18,14 @@
package org.apache.kafka.server.share.persister;
import org.apache.kafka.common.Node;
import org.apache.kafka.server.share.SharePartitionKey;
import java.util.List;
public interface ShareCoordinatorMetadataCacheHelper {
boolean containsTopic(String topic);
Node getShareCoordinator(String key, String internalTopicName);
Node getShareCoordinator(SharePartitionKey key, String internalTopicName);
List<Node> getClusterNodes();
}

View File

@ -107,7 +107,7 @@ class DefaultStatePersisterTest {
}
@Override
public Node getShareCoordinator(String key, String internalTopicName) {
public Node getShareCoordinator(SharePartitionKey key, String internalTopicName) {
return Node.noNode();
}

View File

@ -142,7 +142,7 @@ class PersisterStateManagerTest {
this.result.complete(new TestHandlerResponse(new TestHandlerResponseData()
.setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult()
.setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setPartition(partitionKey().partition())
.setErrorMessage(Errors.NONE.message())
.setErrorCode(Errors.NONE.code()))
)
@ -159,9 +159,9 @@ class PersisterStateManagerTest {
protected void findCoordinatorErrorResponse(Errors error, Exception exception) {
this.result.complete(new TestHandlerResponse(new TestHandlerResponseData()
.setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId)
.setTopicId(partitionKey().topicId())
.setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setPartition(partitionKey().partition())
.setErrorMessage(exception == null ? error.message() : exception.getMessage())
.setErrorCode(error.code()))
)
@ -198,7 +198,7 @@ class PersisterStateManagerTest {
}
@Override
public Node getShareCoordinator(String key, String internalTopicName) {
public Node getShareCoordinator(SharePartitionKey key, String internalTopicName) {
return Node.noNode();
}
@ -217,7 +217,7 @@ class PersisterStateManagerTest {
}
@Override
public Node getShareCoordinator(String key, String internalTopicName) {
public Node getShareCoordinator(SharePartitionKey key, String internalTopicName) {
return coordinatorNode;
}