From 2e2b0a58eda3e677763af974a44a6aaa3c280214 Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Fri, 8 Nov 2024 15:05:39 +0530 Subject: [PATCH] KAFKA-17914: Update string ref with SharePartitionKey. (#17660) Currently, we are using the String repr of the shareCoordinator/sharePartition key (groupId:topicId:parition) as defined in kip-932 in a few methods like ShareCoordinator.partitionFor and ShareCoordinatorMetadataCacheHelper.getShareCoordinator. This has the potential to introduce subtle bugs when incorrect strings are used to invoke these methods. What is perturbing is that the failures might be intermittent. This PR aims to remedy the situation by changing the type to the concrete SharePartitionKey. This way callers need not be worried about a specific encoding or format of the coordinator key as long as the SharePartitionKey has the correct fields set. There is one issue - the FIND_COORDINATOR RPC does require the coordinator key to be set as a String in the request body. We can't get around this and have to set the value as String. However, on the KafkaApis handler side we parse this key into a SharePartitionKey again and gain compile time safety. One downside is that we need to split and format the incoming coordinator key in the request but that can be encapsulated at a single location in SharePartitionKey. Added tests for partitionFor. Reviewers: Andrew Schofield , Apoorv Mittal , Manikumar Reddy --- .../main/scala/kafka/server/KafkaApis.scala | 12 ++- ...areCoordinatorMetadataCacheHelperImpl.java | 7 +- .../unit/kafka/server/KafkaApisTest.scala | 11 ++- .../coordinator/share/ShareCoordinator.java | 5 +- .../share/ShareCoordinatorService.java | 9 ++- .../share/ShareCoordinatorServiceTest.java | 37 +++++++++ .../kafka/server/share/SharePartitionKey.java | 52 ++++++++++++- .../persister/PersisterStateManager.java | 78 +++++++++---------- .../ShareCoordinatorMetadataCacheHelper.java | 3 +- .../persister/DefaultStatePersisterTest.java | 2 +- .../persister/PersisterStateManagerTest.java | 10 +-- 11 files changed, 162 insertions(+), 64 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 0cba5cf4e41..21942229d87 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -76,7 +76,7 @@ import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_I import org.apache.kafka.server.purgatory.TopicPartitionOperationKey import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.server.share.context.ShareFetchContext -import org.apache.kafka.server.share.ErroneousAndValidPartitionData +import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, SharePartitionKey} import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData} import org.apache.kafka.storage.internals.log.AppendOrigin @@ -1645,6 +1645,13 @@ class KafkaApis(val requestChannel: RequestChannel, if (shareCoordinator.isEmpty) { return (Errors.INVALID_REQUEST, Node.noNode) } + try { + SharePartitionKey.validate(key) + } catch { + case e: IllegalArgumentException => + error(s"Share coordinator key is invalid", e) + (Errors.INVALID_REQUEST, Node.noNode()) + } } val (partition, internalTopicName) = CoordinatorType.forId(keyType) match { case CoordinatorType.GROUP => @@ -1654,8 +1661,7 @@ class KafkaApis(val requestChannel: RequestChannel, (txnCoordinator.partitionFor(key), TRANSACTION_STATE_TOPIC_NAME) case CoordinatorType.SHARE => - // None check already done above - (shareCoordinator.get.partitionFor(key), SHARE_GROUP_STATE_TOPIC_NAME) + (shareCoordinator.foreach(coordinator => coordinator.partitionFor(SharePartitionKey.getInstance(key))), SHARE_GROUP_STATE_TOPIC_NAME) } val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName) diff --git a/core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java b/core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java index caa7b348f5e..28148eab7ff 100644 --- a/core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java +++ b/core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.message.MetadataResponseData; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper; import java.util.HashSet; @@ -38,12 +39,12 @@ import scala.jdk.javaapi.OptionConverters; public class ShareCoordinatorMetadataCacheHelperImpl implements ShareCoordinatorMetadataCacheHelper { private final MetadataCache metadataCache; - private final Function keyToPartitionMapper; + private final Function keyToPartitionMapper; private final ListenerName interBrokerListenerName; public ShareCoordinatorMetadataCacheHelperImpl( MetadataCache metadataCache, - Function keyToPartitionMapper, + Function keyToPartitionMapper, ListenerName interBrokerListenerName ) { Objects.requireNonNull(metadataCache, "metadataCache must not be null"); @@ -61,7 +62,7 @@ public class ShareCoordinatorMetadataCacheHelperImpl implements ShareCoordinator } @Override - public Node getShareCoordinator(String key, String internalTopicName) { + public Node getShareCoordinator(SharePartitionKey key, String internalTopicName) { if (metadataCache.contains(internalTopicName)) { Set topicSet = new HashSet<>(); topicSet.add(internalTopicName); diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 31fdc9efe81..7b78793373b 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -1340,6 +1340,10 @@ class KafkaApisTest extends Logging { topicConfigOverride.put(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout.toString) val groupId = "group" + val topicId = Uuid.randomUuid + val partition = 0 + var key:String = groupId + val topicName = coordinatorType match { case CoordinatorType.GROUP => @@ -1359,6 +1363,7 @@ class KafkaApisTest extends Logging { case CoordinatorType.SHARE => authorizeResource(authorizer, AclOperation.CLUSTER_ACTION, ResourceType.CLUSTER, Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED) + key = "%s:%s:%d" format(groupId, topicId, partition) Topic.SHARE_GROUP_STATE_TOPIC_NAME case _ => throw new IllegalStateException(s"Unknown coordinator type $coordinatorType") @@ -1368,12 +1373,12 @@ class KafkaApisTest extends Logging { new FindCoordinatorRequest.Builder( new FindCoordinatorRequestData() .setKeyType(coordinatorType.id()) - .setCoordinatorKeys(asList(groupId))) + .setCoordinatorKeys(asList(key))) } else { new FindCoordinatorRequest.Builder( new FindCoordinatorRequestData() .setKeyType(coordinatorType.id()) - .setKey(groupId)) + .setKey(key)) } val request = buildRequest(findCoordinatorRequestBuilder.build(requestHeader.apiVersion)) when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), @@ -1389,7 +1394,7 @@ class KafkaApisTest extends Logging { assertEquals(Errors.INVALID_REQUEST.code, response.data.coordinators.get(0).errorCode) } else if (version >= 4) { assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code, response.data.coordinators.get(0).errorCode) - assertEquals(groupId, response.data.coordinators.get(0).key) + assertEquals(key, response.data.coordinators.get(0).key) } else { assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code, response.data.errorCode) assertTrue(capturedRequest.getValue.isEmpty) diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java index 67f9d606414..dd56503dbaf 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.message.WriteShareGroupStateResponseData; import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.share.SharePartitionKey; import java.util.OptionalInt; import java.util.Properties; @@ -39,10 +40,10 @@ public interface ShareCoordinator { /** * Return the partition index for the given key. * - * @param key - groupId:topicId:partitionId. + * @param key - reference to {@link SharePartitionKey}. * @return The partition index. */ - int partitionFor(String key); + int partitionFor(SharePartitionKey key); /** * Return the configuration properties of the share-group state topic. diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java index ecec6897dac..05999bd6d25 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java @@ -201,9 +201,12 @@ public class ShareCoordinatorService implements ShareCoordinator { } @Override - public int partitionFor(String key) { + public int partitionFor(SharePartitionKey key) { throwIfNotActive(); - return Utils.abs(key.hashCode()) % numPartitions; + // Call to asCoordinatorKey is necessary as we depend only on topicId (Uuid) and + // not topic name. We do not want this calculation to distinguish between 2 + // SharePartitionKeys where everything except topic name is the same. + return Utils.abs(key.asCoordinatorKey().hashCode()) % numPartitions; } @Override @@ -539,7 +542,7 @@ public class ShareCoordinatorService implements ShareCoordinator { } TopicPartition topicPartitionFor(SharePartitionKey key) { - return new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, partitionFor(key.asCoordinatorKey())); + return new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, partitionFor(key)); } private static

boolean isEmpty(List

list) { diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java index a88aba632a7..c641447dd6b 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.CoordinatorNotAvailableException; import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.message.ReadShareGroupStateRequestData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData; @@ -31,6 +32,7 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime; import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics; @@ -52,6 +54,7 @@ import java.util.concurrent.TimeoutException; import static org.apache.kafka.coordinator.common.runtime.TestUtil.requestContext; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -637,4 +640,38 @@ class ShareCoordinatorServiceTest { assertEquals(Topic.SHARE_GROUP_STATE_TOPIC_NAME, tp.topic()); assertEquals(expectedPartition, tp.partition()); } + + @Test + public void testPartitionFor() { + CoordinatorRuntime runtime = mockRuntime(); + ShareCoordinatorService service = new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()), + runtime, + new ShareCoordinatorMetrics(), + Time.SYSTEM + ); + + String groupId = "group1"; + Uuid topicId = Uuid.randomUuid(); + int partition = 0; + + // inactive shard should throw exception + assertThrows(CoordinatorNotAvailableException.class, () -> service.partitionFor(SharePartitionKey.getInstance(groupId, topicId, partition))); + + final int numPartitions = 50; + service.startup(() -> numPartitions); + + final SharePartitionKey key1 = SharePartitionKey.getInstance(groupId, new TopicIdPartition(topicId, partition, null)); + int sharePartitionKey = service.partitionFor(key1); + assertEquals(Utils.abs(key1.asCoordinatorKey().hashCode()) % numPartitions, sharePartitionKey); + + // The presence of a topic name should not affect the choice of partition + final SharePartitionKey key2 = new SharePartitionKey(groupId, new TopicIdPartition(topicId, partition, "whatever")); + sharePartitionKey = service.partitionFor(key2); + assertEquals(Utils.abs(key2.asCoordinatorKey().hashCode()) % numPartitions, sharePartitionKey); + + // asCoordinatorKey does not discriminate on topic name + assertEquals(key1.asCoordinatorKey(), key2.asCoordinatorKey()); + } } diff --git a/share/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java b/share/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java index 29ce801aff3..89f555b564e 100644 --- a/share/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java +++ b/share/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java @@ -65,6 +65,56 @@ public class SharePartitionKey { return getInstance(groupId, topicIdPartition.topicId(), topicIdPartition.partition()); } + + /** + * Returns a SharePartitionKey from input string of format - groupId:topicId:partition + * @param key - String in format groupId:topicId:partition + * @return object representing SharePartitionKey + * @throws IllegalArgumentException if the key is empty or has invalid format + */ + public static SharePartitionKey getInstance(String key) { + validate(key); + String[] tokens = key.split(":"); + return new SharePartitionKey( + tokens[0].trim(), + Uuid.fromString(tokens[1]), + Integer.parseInt(tokens[2]) + ); + } + + /** + * Validates whether the String argument has a valid SharePartitionKey format - groupId:topicId:partition + * @param key - String in format groupId:topicId:partition + * @throws IllegalArgumentException if the key is empty or has invalid format + */ + public static void validate(String key) { + Objects.requireNonNull(key, "Share partition key cannot be null"); + if (key.isEmpty()) { + throw new IllegalArgumentException("Share partition key cannot be empty"); + } + + String[] tokens = key.split(":"); + if (tokens.length != 3) { + throw new IllegalArgumentException("Invalid key format: expected - groupId:topicId:partition, found - " + key); + } + + if (tokens[0].trim().isEmpty()) { + throw new IllegalArgumentException("GroupId must be alphanumeric string"); + } + + try { + Uuid.fromString(tokens[1]); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid topic ID: " + tokens[1], e); + } + + try { + Integer.parseInt(tokens[2]); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid partition: " + tokens[2], e); + } + } + public static SharePartitionKey getInstance(String groupId, Uuid topicId, int partition) { return new SharePartitionKey(groupId, topicId, partition); } @@ -97,7 +147,7 @@ public class SharePartitionKey { @Override public String toString() { return "SharePartitionKey{" + - "groupId='" + groupId + + "groupId=" + groupId + ", topicIdPartition=" + topicIdPartition + '}'; } diff --git a/share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java b/share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java index 45bbf52a39d..ea14c4f59b4 100644 --- a/share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java +++ b/share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java @@ -206,12 +206,10 @@ public class PersisterStateManager { */ public abstract class PersisterStateManagerHandler implements RequestCompletionHandler { protected Node coordinatorNode; - protected final String groupId; - protected final Uuid topicId; - protected final int partition; private final BackoffManager findCoordBackoff; protected final Logger log = LoggerFactory.getLogger(getClass()); private Consumer onCompleteCallback; + protected final SharePartitionKey partitionKey; public PersisterStateManagerHandler( String groupId, @@ -221,12 +219,10 @@ public class PersisterStateManager { long backoffMaxMs, int maxRPCRetryAttempts ) { - this.groupId = groupId; - this.topicId = topicId; - this.partition = partition; this.findCoordBackoff = new BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs); this.onCompleteCallback = response -> { }; // noop + partitionKey = SharePartitionKey.getInstance(groupId, topicId, partition); } /** @@ -290,7 +286,7 @@ public class PersisterStateManager { protected AbstractRequest.Builder findShareCoordinatorBuilder() { return new FindCoordinatorRequest.Builder(new FindCoordinatorRequestData() .setKeyType(FindCoordinatorRequest.CoordinatorType.SHARE.id()) - .setKey(coordinatorKey())); + .setKey(partitionKey().asCoordinatorKey())); } public void addRequestToNodeMap(Node node, PersisterStateManagerHandler handler) { @@ -300,7 +296,7 @@ public class PersisterStateManager { synchronized (nodeMapLock) { nodeRPCMap.computeIfAbsent(node, k -> new HashMap<>()) .computeIfAbsent(handler.rpcType(), k -> new HashMap<>()) - .computeIfAbsent(handler.groupId, k -> new LinkedList<>()) + .computeIfAbsent(partitionKey().groupId(), k -> new LinkedList<>()) .add(handler); } sender.wakeup(); @@ -317,7 +313,7 @@ public class PersisterStateManager { } if (cacheHelper.containsTopic(Topic.SHARE_GROUP_STATE_TOPIC_NAME)) { log.debug("{} internal topic already exists.", Topic.SHARE_GROUP_STATE_TOPIC_NAME); - Node node = cacheHelper.getShareCoordinator(coordinatorKey(), Topic.SHARE_GROUP_STATE_TOPIC_NAME); + Node node = cacheHelper.getShareCoordinator(partitionKey(), Topic.SHARE_GROUP_STATE_TOPIC_NAME); if (node != Node.noNode()) { log.debug("Found coordinator node in cache: {}", node); coordinatorNode = node; @@ -333,8 +329,8 @@ public class PersisterStateManager { * * @return String */ - protected String coordinatorKey() { - return SharePartitionKey.asCoordinatorKey(groupId, topicId, partition); + protected SharePartitionKey partitionKey() { + return partitionKey; } /** @@ -378,7 +374,7 @@ public class PersisterStateManager { findCoordBackoff.incrementAttempt(); List coordinators = ((FindCoordinatorResponse) response.responseBody()).coordinators(); if (coordinators.size() != 1) { - log.error("Find coordinator response for {} is invalid", coordinatorKey()); + log.error("Find coordinator response for {} is invalid", partitionKey()); findCoordinatorErrorResponse(Errors.UNKNOWN_SERVER_ERROR, new IllegalStateException("Invalid response with multiple coordinators.")); return; } @@ -399,12 +395,12 @@ public class PersisterStateManager { } break; - case COORDINATOR_NOT_AVAILABLE: // retryable error codes + case COORDINATOR_NOT_AVAILABLE: // retriable error codes case COORDINATOR_LOAD_IN_PROGRESS: case NOT_COORDINATOR: - log.warn("Received retryable error in find coordinator: {}", error.message()); + log.warn("Received retriable error in find coordinator for {} using key {}: {}", name(), partitionKey(), error.message()); if (!findCoordBackoff.canAttempt()) { - log.error("Exhausted max retries to find coordinator without success."); + log.error("Exhausted max retries to find coordinator for {} using key {} without success.", name(), partitionKey()); findCoordinatorErrorResponse(error, new Exception("Exhausted max retries to find coordinator without success.")); break; } @@ -413,7 +409,7 @@ public class PersisterStateManager { break; default: - log.error("Unable to find coordinator."); + log.error("Unable to find coordinator for {} using key {}.", name(), partitionKey()); findCoordinatorErrorResponse(error, null); } } @@ -517,9 +513,9 @@ public class PersisterStateManager { WriteShareGroupStateResponse combinedResponse = (WriteShareGroupStateResponse) response.responseBody(); for (WriteShareGroupStateResponseData.WriteStateResult writeStateResult : combinedResponse.data().results()) { - if (writeStateResult.topicId().equals(topicId)) { + if (writeStateResult.topicId().equals(partitionKey().topicId())) { Optional partitionStateData = - writeStateResult.partitions().stream().filter(partitionResult -> partitionResult.partition() == partition) + writeStateResult.partitions().stream().filter(partitionResult -> partitionResult.partition() == partitionKey().partition()) .findFirst(); if (partitionStateData.isPresent()) { @@ -528,20 +524,20 @@ public class PersisterStateManager { case NONE: writeStateBackoff.resetAttempts(); WriteShareGroupStateResponseData.WriteStateResult result = WriteShareGroupStateResponse.toResponseWriteStateResult( - topicId, + partitionKey().topicId(), Collections.singletonList(partitionStateData.get()) ); this.result.complete(new WriteShareGroupStateResponse( new WriteShareGroupStateResponseData().setResults(Collections.singletonList(result)))); return; - // check retryable errors + // check retriable errors case COORDINATOR_NOT_AVAILABLE: case COORDINATOR_LOAD_IN_PROGRESS: case NOT_COORDINATOR: - log.warn("Received retryable error in write state RPC: {}", error.message()); + log.warn("Received retriable error in write state RPC for key {}: {}", partitionKey(), error.message()); if (!writeStateBackoff.canAttempt()) { - log.error("Exhausted max retries for write state RPC without success."); + log.error("Exhausted max retries for write state RPC for key {} without success.", partitionKey()); writeStateErrorResponse(error, new Exception("Exhausted max retries to complete write state RPC without success.")); return; } @@ -550,7 +546,7 @@ public class PersisterStateManager { return; default: - log.error("Unable to perform write state RPC: {}", error.message()); + log.error("Unable to perform write state RPC for key {}: {}", partitionKey(), error.message()); writeStateErrorResponse(error, null); return; } @@ -560,21 +556,21 @@ public class PersisterStateManager { // no response found specific topic partition IllegalStateException exception = new IllegalStateException( - "Failed to write state for partition " + partition + " in topic " + topicId + " for group " + groupId + "Failed to write state for share partition: " + partitionKey() ); writeStateErrorResponse(Errors.forException(exception), exception); } private void writeStateErrorResponse(Errors error, Exception exception) { this.result.complete(new WriteShareGroupStateResponse( - WriteShareGroupStateResponse.toErrorResponseData(topicId, partition, error, "Error in write state RPC. " + + WriteShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), error, "Error in write state RPC. " + (exception == null ? error.message() : exception.getMessage())))); } @Override protected void findCoordinatorErrorResponse(Errors error, Exception exception) { this.result.complete(new WriteShareGroupStateResponse( - WriteShareGroupStateResponse.toErrorResponseData(topicId, partition, error, "Error in find coordinator. " + + WriteShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), error, "Error in find coordinator. " + (exception == null ? error.message() : exception.getMessage())))); } @@ -595,7 +591,6 @@ public class PersisterStateManager { public class ReadStateHandler extends PersisterStateManagerHandler { private final int leaderEpoch; - private final String coordinatorKey; private final CompletableFuture result; private final BackoffManager readStateBackoff; @@ -612,7 +607,6 @@ public class PersisterStateManager { ) { super(groupId, topicId, partition, backoffMs, backoffMaxMs, maxRPCRetryAttempts); this.leaderEpoch = leaderEpoch; - this.coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId, topicId, partition); this.result = result; this.readStateBackoff = new BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs); } @@ -660,9 +654,9 @@ public class PersisterStateManager { ReadShareGroupStateResponse combinedResponse = (ReadShareGroupStateResponse) response.responseBody(); for (ReadShareGroupStateResponseData.ReadStateResult readStateResult : combinedResponse.data().results()) { - if (readStateResult.topicId().equals(topicId)) { + if (readStateResult.topicId().equals(partitionKey().topicId())) { Optional partitionStateData = - readStateResult.partitions().stream().filter(partitionResult -> partitionResult.partition() == partition) + readStateResult.partitions().stream().filter(partitionResult -> partitionResult.partition() == partitionKey().partition()) .findFirst(); if (partitionStateData.isPresent()) { @@ -671,20 +665,20 @@ public class PersisterStateManager { case NONE: readStateBackoff.resetAttempts(); ReadShareGroupStateResponseData.ReadStateResult result = ReadShareGroupStateResponse.toResponseReadStateResult( - topicId, + partitionKey().topicId(), Collections.singletonList(partitionStateData.get()) ); this.result.complete(new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData() .setResults(Collections.singletonList(result)))); return; - // check retryable errors + // check retriable errors case COORDINATOR_NOT_AVAILABLE: case COORDINATOR_LOAD_IN_PROGRESS: case NOT_COORDINATOR: - log.warn("Received retryable error in read state RPC: {}", error.message()); + log.warn("Received retriable error in read state RPC for key {}: {}", partitionKey(), error.message()); if (!readStateBackoff.canAttempt()) { - log.error("Exhausted max retries for read state RPC without success."); + log.error("Exhausted max retries for read state RPC for key {} without success.", partitionKey()); readStateErrorReponse(error, new Exception("Exhausted max retries to complete read state RPC without success.")); return; } @@ -693,7 +687,7 @@ public class PersisterStateManager { return; default: - log.error("Unable to perform read state RPC: {}", error.message()); + log.error("Unable to perform read state RPC for key {}: {}", partitionKey(), error.message()); readStateErrorReponse(error, null); return; } @@ -703,21 +697,21 @@ public class PersisterStateManager { // no response found specific topic partition IllegalStateException exception = new IllegalStateException( - "Failed to read state for partition " + partition + " in topic " + topicId + " for group " + groupId + "Failed to read state for share partition " + partitionKey() ); readStateErrorReponse(Errors.forException(exception), exception); } protected void readStateErrorReponse(Errors error, Exception exception) { this.result.complete(new ReadShareGroupStateResponse( - ReadShareGroupStateResponse.toErrorResponseData(topicId, partition, error, "Error in find coordinator. " + + ReadShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), error, "Error in find coordinator. " + (exception == null ? error.message() : exception.getMessage())))); } @Override protected void findCoordinatorErrorResponse(Errors error, Exception exception) { this.result.complete(new ReadShareGroupStateResponse( - ReadShareGroupStateResponse.toErrorResponseData(topicId, partition, error, "Error in read state RPC. " + + ReadShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), error, "Error in read state RPC. " + (exception == null ? error.message() : exception.getMessage())))); } @@ -928,10 +922,10 @@ public class PersisterStateManager { handlers.forEach(persHandler -> { assert persHandler instanceof WriteStateHandler; WriteStateHandler handler = (WriteStateHandler) persHandler; - partitionData.computeIfAbsent(handler.topicId, topicId -> new LinkedList<>()) + partitionData.computeIfAbsent(handler.partitionKey().topicId(), topicId -> new LinkedList<>()) .add( new WriteShareGroupStateRequestData.PartitionData() - .setPartition(handler.partition) + .setPartition(handler.partitionKey().partition()) .setStateEpoch(handler.stateEpoch) .setLeaderEpoch(handler.leaderEpoch) .setStartOffset(handler.startOffset) @@ -959,10 +953,10 @@ public class PersisterStateManager { handlers.forEach(persHandler -> { assert persHandler instanceof ReadStateHandler; ReadStateHandler handler = (ReadStateHandler) persHandler; - partitionData.computeIfAbsent(handler.topicId, topicId -> new LinkedList<>()) + partitionData.computeIfAbsent(handler.partitionKey().topicId(), topicId -> new LinkedList<>()) .add( new ReadShareGroupStateRequestData.PartitionData() - .setPartition(handler.partition) + .setPartition(handler.partitionKey().partition()) .setLeaderEpoch(handler.leaderEpoch) ); }); diff --git a/share/src/main/java/org/apache/kafka/server/share/persister/ShareCoordinatorMetadataCacheHelper.java b/share/src/main/java/org/apache/kafka/server/share/persister/ShareCoordinatorMetadataCacheHelper.java index 75e9cd62960..5a18a370c2e 100644 --- a/share/src/main/java/org/apache/kafka/server/share/persister/ShareCoordinatorMetadataCacheHelper.java +++ b/share/src/main/java/org/apache/kafka/server/share/persister/ShareCoordinatorMetadataCacheHelper.java @@ -18,13 +18,14 @@ package org.apache.kafka.server.share.persister; import org.apache.kafka.common.Node; +import org.apache.kafka.server.share.SharePartitionKey; import java.util.List; public interface ShareCoordinatorMetadataCacheHelper { boolean containsTopic(String topic); - Node getShareCoordinator(String key, String internalTopicName); + Node getShareCoordinator(SharePartitionKey key, String internalTopicName); List getClusterNodes(); } diff --git a/share/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java b/share/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java index f3c285cd5ca..f2cca9c4b3f 100644 --- a/share/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java +++ b/share/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java @@ -107,7 +107,7 @@ class DefaultStatePersisterTest { } @Override - public Node getShareCoordinator(String key, String internalTopicName) { + public Node getShareCoordinator(SharePartitionKey key, String internalTopicName) { return Node.noNode(); } diff --git a/share/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java b/share/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java index 464dd4c0517..e410c2e3588 100644 --- a/share/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java +++ b/share/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java @@ -142,7 +142,7 @@ class PersisterStateManagerTest { this.result.complete(new TestHandlerResponse(new TestHandlerResponseData() .setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult() .setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult() - .setPartition(partition) + .setPartition(partitionKey().partition()) .setErrorMessage(Errors.NONE.message()) .setErrorCode(Errors.NONE.code())) ) @@ -159,9 +159,9 @@ class PersisterStateManagerTest { protected void findCoordinatorErrorResponse(Errors error, Exception exception) { this.result.complete(new TestHandlerResponse(new TestHandlerResponseData() .setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult() - .setTopicId(topicId) + .setTopicId(partitionKey().topicId()) .setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult() - .setPartition(partition) + .setPartition(partitionKey().partition()) .setErrorMessage(exception == null ? error.message() : exception.getMessage()) .setErrorCode(error.code())) ) @@ -198,7 +198,7 @@ class PersisterStateManagerTest { } @Override - public Node getShareCoordinator(String key, String internalTopicName) { + public Node getShareCoordinator(SharePartitionKey key, String internalTopicName) { return Node.noNode(); } @@ -217,7 +217,7 @@ class PersisterStateManagerTest { } @Override - public Node getShareCoordinator(String key, String internalTopicName) { + public Node getShareCoordinator(SharePartitionKey key, String internalTopicName) { return coordinatorNode; }