From be7415cb8bf5e09ad969cccfcbffd30fd68804ea Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Sat, 25 Jan 2025 23:02:28 +0800 Subject: [PATCH] KAFKA-18555 Avoid casting MetadataCache to KRaftMetadataCache (#18632) Reviewers: Ismael Juma , Chia-Ping Tsai --- ...DescribeTopicPartitionsRequestHandler.java | 8 +- .../main/scala/kafka/cluster/Partition.scala | 42 ++++------ .../main/scala/kafka/cluster/Replica.scala | 15 ++-- .../main/scala/kafka/server/KafkaApis.scala | 30 +++---- .../scala/kafka/server/MetadataCache.scala | 32 +++++++- .../server/metadata/KRaftMetadataCache.scala | 24 +----- .../kafka/cluster/PartitionLockTest.scala | 1 + .../unit/kafka/cluster/PartitionTest.scala | 78 ++++++++++++------- .../unit/kafka/server/KafkaApisTest.scala | 19 ++--- .../unit/kafka/server/MetadataCacheTest.scala | 12 +-- .../kafka/server/ReplicaManagerTest.scala | 2 + 11 files changed, 135 insertions(+), 128 deletions(-) diff --git a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java index bfadde3bfb0..a3a3e6f386a 100644 --- a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java +++ b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java @@ -20,7 +20,7 @@ package kafka.server.handlers; import kafka.network.RequestChannel; import kafka.server.AuthHelper; import kafka.server.KafkaConfig; -import kafka.server.metadata.KRaftMetadataCache; +import kafka.server.MetadataCache; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.InvalidRequestException; @@ -44,12 +44,12 @@ import static org.apache.kafka.common.acl.AclOperation.DESCRIBE; import static org.apache.kafka.common.resource.ResourceType.TOPIC; public class DescribeTopicPartitionsRequestHandler { - KRaftMetadataCache metadataCache; + MetadataCache metadataCache; AuthHelper authHelper; KafkaConfig config; public DescribeTopicPartitionsRequestHandler( - KRaftMetadataCache metadataCache, + MetadataCache metadataCache, AuthHelper authHelper, KafkaConfig config ) { @@ -104,7 +104,7 @@ public class DescribeTopicPartitionsRequestHandler { return isAuthorized; }); - DescribeTopicPartitionsResponseData response = metadataCache.getTopicMetadataForDescribeTopicResponse( + DescribeTopicPartitionsResponseData response = metadataCache.describeTopicResponse( CollectionConverters.asScala(authorizedTopicsStream.iterator()), abstractRequest.context().listenerName, (String topicName) -> topicName.equals(cursorTopicName) ? cursor.partitionIndex() : 0, diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 5035c86aa06..bcb029e4dd5 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -23,7 +23,6 @@ import kafka.controller.StateChangeLogger import kafka.log._ import kafka.log.remote.RemoteLogManager import kafka.server._ -import kafka.server.metadata.KRaftMetadataCache import kafka.server.share.DelayedShareFetch import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ @@ -1050,28 +1049,23 @@ class Partition(val topicPartition: TopicPartition, } private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = { - metadataCache match { - // In KRaft mode, only a replica which meets all of the following requirements is allowed to join the ISR. - // 1. It is not fenced. - // 2. It is not in controlled shutdown. - // 3. Its metadata cached broker epoch matches its Fetch request broker epoch. Or the Fetch - // request broker epoch is -1 which bypasses the epoch verification. - case kRaftMetadataCache: KRaftMetadataCache => - val mayBeReplica = getReplica(followerReplicaId) - // The topic is already deleted and we don't have any replica information. In this case, we can return false - // so as to avoid NPE - if (mayBeReplica.isEmpty) { - warn(s"The replica state of replica ID:[$followerReplicaId] doesn't exist in the leader node. It might because the topic is already deleted.") - return false - } - val storedBrokerEpoch = mayBeReplica.get.stateSnapshot.brokerEpoch - val cachedBrokerEpoch = kRaftMetadataCache.getAliveBrokerEpoch(followerReplicaId) - !kRaftMetadataCache.isBrokerFenced(followerReplicaId) && - !kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId) && - isBrokerEpochIsrEligible(storedBrokerEpoch, cachedBrokerEpoch) - - case _ => true + // A replica which meets all of the following requirements is allowed to join the ISR. + // 1. It is not fenced. + // 2. It is not in controlled shutdown. + // 3. Its metadata cached broker epoch matches its Fetch request broker epoch. Or the Fetch + // request broker epoch is -1 which bypasses the epoch verification. + val mayBeReplica = getReplica(followerReplicaId) + // The topic is already deleted and we don't have any replica information. In this case, we can return false + // so as to avoid NPE + if (mayBeReplica.isEmpty) { + warn(s"The replica state of replica ID:[$followerReplicaId] doesn't exist in the leader node. It might because the topic is already deleted.") + return false } + val storedBrokerEpoch = mayBeReplica.get.stateSnapshot.brokerEpoch + val cachedBrokerEpoch = metadataCache.getAliveBrokerEpoch(followerReplicaId) + !metadataCache.isBrokerFenced(followerReplicaId) && + !metadataCache.isBrokerShuttingDown(followerReplicaId) && + isBrokerEpochIsrEligible(storedBrokerEpoch, cachedBrokerEpoch) } private def isBrokerEpochIsrEligible(storedBrokerEpoch: Option[Long], cachedBrokerEpoch: Option[Long]): Boolean = { @@ -1793,9 +1787,7 @@ class Partition(val topicPartition: TopicPartition, private def addBrokerEpochToIsr(isr: List[Int]): List[BrokerState] = { isr.map { brokerId => val brokerState = new BrokerState().setBrokerId(brokerId) - if (!metadataCache.isInstanceOf[KRaftMetadataCache]) { - brokerState.setBrokerEpoch(-1) - } else if (brokerId == localBrokerId) { + if (brokerId == localBrokerId) { brokerState.setBrokerEpoch(localBrokerEpochSupplier()) } else { val replica = remoteReplicasMap.get(brokerId) diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index 9a88544562d..5a2fb1abe06 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -19,7 +19,6 @@ package kafka.cluster import kafka.log.UnifiedLog import kafka.server.MetadataCache -import kafka.server.metadata.KRaftMetadataCache import kafka.utils.Logging import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.NotLeaderOrFollowerException @@ -113,15 +112,11 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition, val metadat brokerEpoch: Long ): Unit = { replicaState.updateAndGet { currentReplicaState => - metadataCache match { - case kRaftMetadataCache: KRaftMetadataCache => - val cachedBrokerEpoch = kRaftMetadataCache.getAliveBrokerEpoch(brokerId) - // Fence the update if it provides a stale broker epoch. - if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) { - throw new NotLeaderOrFollowerException(s"Received stale fetch state update. broker epoch=$brokerEpoch " + - s"vs expected=${cachedBrokerEpoch.get}") - } - case _ => + val cachedBrokerEpoch = metadataCache.getAliveBrokerEpoch(brokerId) + // Fence the update if it provides a stale broker epoch. + if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) { + throw new NotLeaderOrFollowerException(s"Received stale fetch state update. broker epoch=$brokerEpoch " + + s"vs expected=${currentReplicaState.brokerEpoch.get}") } val lastCaughtUpTime = if (followerFetchOffsetMetadata.messageOffset >= leaderEndOffset) { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 8d0d62667f2..11623b02a3d 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -21,7 +21,7 @@ import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinat import kafka.network.RequestChannel import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA} import kafka.server.handlers.DescribeTopicPartitionsRequestHandler -import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache} +import kafka.server.metadata.ConfigRepository import kafka.server.share.SharePartitionManager import kafka.utils.Logging import org.apache.kafka.admin.AdminUtils @@ -113,11 +113,8 @@ class KafkaApis(val requestChannel: RequestChannel, val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time) val aclApis = new AclApis(authHelper, authorizer, requestHelper, "broker", config) val configManager = new ConfigAdminManager(brokerId, config, configRepository) - val describeTopicPartitionsRequestHandler : Option[DescribeTopicPartitionsRequestHandler] = metadataCache match { - case kRaftMetadataCache: KRaftMetadataCache => - Some(new DescribeTopicPartitionsRequestHandler(kRaftMetadataCache, authHelper, config)) - case _ => None - } + val describeTopicPartitionsRequestHandler = new DescribeTopicPartitionsRequestHandler( + metadataCache, authHelper, config) def close(): Unit = { aclApis.close() @@ -967,21 +964,14 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): Unit = { - describeTopicPartitionsRequestHandler match { - case Some(handler) => { - val response = handler.handleDescribeTopicPartitionsRequest(request) - trace("Sending topic partitions metadata %s for correlation id %d to client %s".format(response.topics().asScala.mkString(","), - request.header.correlationId, request.header.clientId)) + val response = describeTopicPartitionsRequestHandler.handleDescribeTopicPartitionsRequest(request) + trace("Sending topic partitions metadata %s for correlation id %d to client %s".format(response.topics().asScala.mkString(","), + request.header.correlationId, request.header.clientId)) - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { - response.setThrottleTimeMs(requestThrottleMs) - new DescribeTopicPartitionsResponse(response) - }) - } - case None => { - requestHelper.sendMaybeThrottle(request, request.body[DescribeTopicPartitionsRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - } - } + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { + response.setThrottleTimeMs(requestThrottleMs) + new DescribeTopicPartitionsResponse(response) + }) } /** diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 67ecbbead6a..703f2d8e320 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -19,7 +19,7 @@ package kafka.server import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache} import org.apache.kafka.admin.BrokerMetadata -import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, DescribeClientQuotasResponseData, DescribeUserScramCredentialsRequestData, DescribeUserScramCredentialsResponseData, MetadataResponseData} +import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, DescribeClientQuotasResponseData, DescribeTopicPartitionsResponseData, DescribeUserScramCredentialsRequestData, DescribeUserScramCredentialsResponseData, MetadataResponseData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid} import org.apache.kafka.metadata.LeaderAndIsr @@ -56,6 +56,12 @@ trait MetadataCache extends ConfigRepository { def getAliveBrokers(): Iterable[BrokerMetadata] + def getAliveBrokerEpoch(brokerId: Int): Option[Long] + + def isBrokerFenced(brokerId: Int): Boolean + + def isBrokerShuttingDown(brokerId: Int): Boolean + def getTopicId(topicName: String): Uuid def getTopicName(topicId: Uuid): Option[String] @@ -105,6 +111,30 @@ trait MetadataCache extends ConfigRepository { def describeClientQuotas(request: DescribeClientQuotasRequestData): DescribeClientQuotasResponseData def describeScramCredentials(request: DescribeUserScramCredentialsRequestData): DescribeUserScramCredentialsResponseData + + /** + * Get the topic metadata for the given topics. + * + * The quota is used to limit the number of partitions to return. The NextTopicPartition field points to the first + * partition can't be returned due the limit. + * If a topic can't return any partition due to quota limit reached, this topic will not be included in the response. + * + * Note, the topics should be sorted in alphabetical order. The topics in the DescribeTopicPartitionsResponseData + * will also be sorted in alphabetical order. + * + * @param topics The iterator of topics and their corresponding first partition id to fetch. + * @param listenerName The listener name. + * @param topicPartitionStartIndex The start partition index for the first topic + * @param maximumNumberOfPartitions The max number of partitions to return. + * @param ignoreTopicsWithExceptions Whether ignore the topics with exception. + */ + def describeTopicResponse( + topics: Iterator[String], + listenerName: ListenerName, + topicPartitionStartIndex: String => Int, + maximumNumberOfPartitions: Int, + ignoreTopicsWithExceptions: Boolean + ): DescribeTopicPartitionsResponseData } object MetadataCache { diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala index be13635c1ab..4b6fa5f3a5f 100644 --- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala @@ -256,23 +256,7 @@ class KRaftMetadataCache( } } - /** - * Get the topic metadata for the given topics. - * - * The quota is used to limit the number of partitions to return. The NextTopicPartition field points to the first - * partition can't be returned due the limit. - * If a topic can't return any partition due to quota limit reached, this topic will not be included in the response. - * - * Note, the topics should be sorted in alphabetical order. The topics in the DescribeTopicPartitionsResponseData - * will also be sorted in alphabetical order. - * - * @param topics The iterator of topics and their corresponding first partition id to fetch. - * @param listenerName The listener name. - * @param topicPartitionStartIndex The start partition index for the first topic - * @param maximumNumberOfPartitions The max number of partitions to return. - * @param ignoreTopicsWithExceptions Whether ignore the topics with exception. - */ - def getTopicMetadataForDescribeTopicResponse( + override def describeTopicResponse( topics: Iterator[String], listenerName: ListenerName, topicPartitionStartIndex: String => Int, @@ -353,11 +337,11 @@ class KRaftMetadataCache( Option(_currentImage.cluster.broker(brokerId)).count(!_.fenced()) == 1 } - def isBrokerFenced(brokerId: Int): Boolean = { + override def isBrokerFenced(brokerId: Int): Boolean = { Option(_currentImage.cluster.broker(brokerId)).count(_.fenced) == 1 } - def isBrokerShuttingDown(brokerId: Int): Boolean = { + override def isBrokerShuttingDown(brokerId: Int): Boolean = { Option(_currentImage.cluster.broker(brokerId)).count(_.inControlledShutdown) == 1 } @@ -452,7 +436,7 @@ class KRaftMetadataCache( } } - def getAliveBrokerEpoch(brokerId: Int): Option[Long] = { + override def getAliveBrokerEpoch(brokerId: Int): Option[Long] = { Option(_currentImage.cluster().broker(brokerId)).filterNot(_.fenced()). map(brokerRegistration => brokerRegistration.epoch()) } diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index 0015fca546a..f3a949c22ec 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -348,6 +348,7 @@ class PartitionLockTest extends Logging { val controllerEpoch = 0 val replicas = (0 to numReplicaFetchers).map(i => Integer.valueOf(brokerId + i)).toList.asJava val isr = replicas + replicas.forEach(replicaId => when(metadataCache.getAliveBrokerEpoch(replicaId)).thenReturn(Some(1L))) assertTrue(partition.makeLeader(new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index d634fc3442c..f52e356b057 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -186,6 +186,7 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 10 val logStartOffset = 0L val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch, isLeader = true) + addBrokerEpochToMockMetadataCache(metadataCache, List(remoteReplicaId)) def epochEndOffset(epoch: Int, endOffset: Long): FetchResponseData.EpochEndOffset = { new FetchResponseData.EpochEndOffset() @@ -354,6 +355,7 @@ class PartitionTest extends AbstractPartitionTest { val isr = List[Integer](leader, validReplica).asJava val leaderEpoch = 8 val partitionEpoch = 1 + addBrokerEpochToMockMetadataCache(metadataCache, List(leader, addingReplica1, addingReplica2)) assertTrue(partition.makeLeader(new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) @@ -641,6 +643,7 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true) + addBrokerEpochToMockMetadataCache(metadataCache, List(remoteReplicaId)) def sendFetch(leaderEpoch: Option[Int]): LogReadInfo = { fetchFollower( partition, @@ -788,6 +791,7 @@ class PartitionTest extends AbstractPartitionTest { val batch2 = TestUtils.records(records = List(new SimpleRecord("k3".getBytes, "v1".getBytes), new SimpleRecord(20,"k4".getBytes, "v2".getBytes), new SimpleRecord(21,"k5".getBytes, "v3".getBytes))) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) val leaderState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) @@ -1121,7 +1125,7 @@ class PartitionTest extends AbstractPartitionTest { val leader = brokerId val follower1 = brokerId + 1 val follower2 = brokerId + 2 - val replicas = List[Integer](leader, follower1, follower2).asJava + val replicas = Seq(leader, follower1, follower2) val isr = List[Integer](leader, follower2).asJava val leaderEpoch = 8 val batch1 = TestUtils.records(records = List(new SimpleRecord("k1".getBytes, "v1".getBytes), @@ -1131,6 +1135,7 @@ class PartitionTest extends AbstractPartitionTest { new SimpleRecord("k5".getBytes, "v3".getBytes))) val batch3 = TestUtils.records(records = List(new SimpleRecord("k6".getBytes, "v1".getBytes), new SimpleRecord("k7".getBytes, "v2".getBytes))) + addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) val leaderState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) @@ -1138,7 +1143,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas) + .setReplicas(replicas.map(Int.box).asJava) .setIsNew(true) assertTrue(partition.makeLeader(leaderState, offsetCheckpoints, None), "Expected first makeLeader() to return 'leader changed'") assertEquals(leaderEpoch, partition.getLeaderEpoch, "Current leader epoch") @@ -1165,7 +1170,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch + 1) .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas) + .setReplicas(replicas.map(Int.box).asJava) .setIsNew(false) partition.makeFollower(followerState, offsetCheckpoints, None) @@ -1175,7 +1180,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch + 2) .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas) + .setReplicas(replicas.map(Int.box).asJava) .setIsNew(false) assertTrue(partition.makeLeader(newLeaderState, offsetCheckpoints, None), "Expected makeLeader() to return 'leader changed' after makeFollower()") @@ -1311,8 +1316,9 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = List[Integer](brokerId, remoteBrokerId).asJava + val replicas = Seq(brokerId, remoteBrokerId) val isr = replicas + addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) @@ -1322,9 +1328,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) - .setIsr(isr) + .setIsr(isr.map(Int.box).asJava) .setPartitionEpoch(1) - .setReplicas(replicas) + .setReplicas(replicas.map(Int.box).asJava) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") @@ -1355,7 +1361,6 @@ class PartitionTest extends AbstractPartitionTest { @Test def testIsReplicaIsrEligibleWithEmptyReplicaMap(): Unit = { - val mockMetadataCache: KRaftMetadataCache = mock(classOf[KRaftMetadataCache]) val partition = spy(new Partition(topicPartition, replicaLagTimeMaxMs = ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_DEFAULT, localBrokerId = brokerId, @@ -1363,7 +1368,7 @@ class PartitionTest extends AbstractPartitionTest { time, alterPartitionListener, delayedOperations, - mockMetadataCache, + metadataCache, logManager, alterPartitionManager)) @@ -1376,7 +1381,7 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 val replicas = List(brokerId, remoteBrokerId) - addBrokerEpochToMockMetadataCache(mockMetadataCache, replicas) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) @@ -1395,7 +1400,7 @@ class PartitionTest extends AbstractPartitionTest { doAnswer(_ => { // simulate topic is deleted at the moment partition.delete() - val replica = new Replica(remoteBrokerId, topicPartition, mockMetadataCache) + val replica = new Replica(remoteBrokerId, topicPartition, metadataCache) partition.updateFollowerFetchState(replica, mock(classOf[LogOffsetMetadata]), 0, initializeTimeMs, 0, defaultBrokerEpoch(remoteBrokerId)) mock(classOf[LogReadInfo]) }).when(partition).fetchRecords(any(), any(), anyLong(), anyInt(), anyBoolean(), anyBoolean()) @@ -1411,8 +1416,9 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = List[Integer](brokerId, remoteBrokerId).asJava + val replicas = Seq(brokerId, remoteBrokerId) val isr = List[Integer](brokerId).asJava + addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) assertTrue(partition.makeLeader( @@ -1422,7 +1428,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas) + .setReplicas(replicas.map(Int.box).asJava) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") assertEquals(Set(brokerId), partition.partitionState.isr) @@ -1464,6 +1470,7 @@ class PartitionTest extends AbstractPartitionTest { val remoteBrokerId = brokerId + 1 val replicas = List(brokerId, remoteBrokerId) val isr = List[Integer](brokerId).asJava + addBrokerEpochToMockMetadataCache(metadataCache, replicas) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) assertTrue(partition.makeLeader( @@ -1497,8 +1504,8 @@ class PartitionTest extends AbstractPartitionTest { val isrItem = alterPartitionManager.isrUpdates.head assertEquals(isrItem.leaderAndIsr.isr, List(brokerId, remoteBrokerId).map(Int.box).asJava) isrItem.leaderAndIsr.isrWithBrokerEpoch.asScala.foreach { brokerState => - // the broker epochs in the leaderAndIsr should be -1. - assertEquals(-1, brokerState.brokerEpoch()) + // the broker epochs should be equal to broker epoch of the leader + assertEquals(defaultBrokerEpoch(brokerState.brokerId()), brokerState.brokerEpoch()) } assertEquals(Set(brokerId), partition.partitionState.isr) assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.maximalIsr) @@ -1525,8 +1532,9 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = List[Integer](brokerId, remoteBrokerId).asJava + val replicas = Seq(brokerId, remoteBrokerId) val isr = List[Integer](brokerId).asJava + addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) assertTrue(partition.makeLeader( @@ -1536,7 +1544,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas) + .setReplicas(replicas.map(Int.box).asJava) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") assertEquals(Set(brokerId), partition.partitionState.isr) @@ -1583,7 +1591,6 @@ class PartitionTest extends AbstractPartitionTest { val replicas = List(brokerId, remoteBrokerId) val shrinkedIsr = Set(brokerId) - val metadataCache = mock(classOf[KRaftMetadataCache]) addBrokerEpochToMockMetadataCache(metadataCache, replicas) val partition = new Partition( @@ -1684,7 +1691,6 @@ class PartitionTest extends AbstractPartitionTest { val replicas = List(brokerId, remoteBrokerId) val isr = Set(brokerId) - val metadataCache = mock(classOf[KRaftMetadataCache]) addBrokerEpochToMockMetadataCache(metadataCache, replicas) // Mark the remote broker as eligible or ineligible in the metadata cache of the leader. @@ -1887,7 +1893,6 @@ class PartitionTest extends AbstractPartitionTest { val replicas = List(brokerId, remoteBrokerId1) val isr = Set(brokerId, remoteBrokerId1) - val metadataCache = mock(classOf[KRaftMetadataCache]) addBrokerEpochToMockMetadataCache(metadataCache, replicas) val partition = new Partition( @@ -1953,7 +1958,6 @@ class PartitionTest extends AbstractPartitionTest { val replicas = List(brokerId, remoteBrokerId) val isr = Set(brokerId) - val metadataCache = mock(classOf[KRaftMetadataCache]) addBrokerEpochToMockMetadataCache(metadataCache, replicas) val partition = new Partition( topicPartition, @@ -2189,7 +2193,6 @@ class PartitionTest extends AbstractPartitionTest { val replicas = Seq(brokerId, remoteBrokerId1, remoteBrokerId2) val isr = Seq(brokerId, remoteBrokerId1) - val metadataCache = mock(classOf[KRaftMetadataCache]) addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) val partition = new Partition( @@ -2304,6 +2307,7 @@ class PartitionTest extends AbstractPartitionTest { val replicas = Seq(brokerId, remoteBrokerId) val isr = Seq(brokerId, remoteBrokerId) val initializeTimeMs = time.milliseconds() + addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) assertTrue(makeLeader( topicId = None, @@ -2363,6 +2367,7 @@ class PartitionTest extends AbstractPartitionTest { val replicas = Seq(brokerId, remoteBrokerId) val isr = Seq(brokerId, remoteBrokerId) val initializeTimeMs = time.milliseconds() + addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) assertTrue(makeLeader( topicId = None, @@ -2496,6 +2501,7 @@ class PartitionTest extends AbstractPartitionTest { val remoteBrokerId = brokerId + 1 val replicas = Seq(brokerId, remoteBrokerId) val isr = Seq(brokerId) + addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) assertTrue(makeLeader( topicId = None, @@ -2592,6 +2598,7 @@ class PartitionTest extends AbstractPartitionTest { val replicas = Seq(brokerId, follower1, follower2, follower3) val isr = Seq(brokerId, follower1, follower2) val partitionEpoch = 1 + addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) doNothing().when(delayedOperations).checkAndCompleteAll() @@ -2647,6 +2654,7 @@ class PartitionTest extends AbstractPartitionTest { val follower3 = brokerId + 3 val replicas = Seq(brokerId, follower1, follower2, follower3) val isr = Seq(brokerId, follower1, follower2) + addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) doNothing().when(delayedOperations).checkAndCompleteAll() @@ -3026,6 +3034,7 @@ class PartitionTest extends AbstractPartitionTest { val replicas = List(leaderId, followerId) val leaderEpoch = 8 val topicId = Uuid.randomUuid() + addBrokerEpochToMockMetadataCache(metadataCache, replicas) val initialLeaderState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) @@ -3259,13 +3268,16 @@ class PartitionTest extends AbstractPartitionTest { def testAddAndRemoveListeners(): Unit = { partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None) + val replicas = Seq(brokerId, brokerId + 1) + val isr = replicas + addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) partition.makeLeader( new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(0) .setLeader(brokerId) .setLeaderEpoch(0) - .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava) - .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava) + .setIsr(isr.map(Int.box).asJava) + .setReplicas(replicas.map(Int.box).asJava) .setPartitionEpoch(1) .setIsNew(true), offsetCheckpoints, @@ -3350,13 +3362,16 @@ class PartitionTest extends AbstractPartitionTest { def testPartitionListenerWhenLogOffsetsChanged(): Unit = { partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None) + val replicas = Seq(brokerId, brokerId + 1) + val isr = Seq(brokerId, brokerId + 1) + addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) partition.makeLeader( new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(0) .setLeader(brokerId) .setLeaderEpoch(0) - .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava) - .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava) + .setIsr(isr.map(Int.box).asJava) + .setReplicas(replicas.map(Int.box).asJava) .setPartitionEpoch(1) .setIsNew(true), offsetCheckpoints, @@ -3442,13 +3457,16 @@ class PartitionTest extends AbstractPartitionTest { partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints, topicId = None) assertTrue(partition.log.isDefined) + val replicas = Seq(brokerId, brokerId + 1) + val isr = replicas + addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) partition.makeLeader( new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(0) .setLeader(brokerId) .setLeaderEpoch(0) - .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava) - .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava) + .setIsr(isr.map(Int.box).asJava) + .setReplicas(replicas.map(Int.box).asJava) .setPartitionEpoch(1) .setIsNew(true), offsetCheckpoints, @@ -3730,9 +3748,9 @@ class PartitionTest extends AbstractPartitionTest { ) } - private def addBrokerEpochToMockMetadataCache(kRaftMetadataCache: KRaftMetadataCache, brokers: List[Int]): Unit = { + private def addBrokerEpochToMockMetadataCache(metadataCache: MetadataCache, brokers: List[Int]): Unit = { brokers.foreach { broker => - when(kRaftMetadataCache.getAliveBrokerEpoch(broker)).thenReturn(Option(defaultBrokerEpoch(broker))) + when(metadataCache.getAliveBrokerEpoch(broker)).thenReturn(Option(defaultBrokerEpoch(broker))) } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 901309b289d..18930be5ee6 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -212,18 +212,13 @@ class KafkaApisTest extends Logging { private def setupFeatures(featureVersions: Seq[FeatureVersion]): Unit = { if (featureVersions.isEmpty) return - metadataCache match { - case cache: KRaftMetadataCache => - when(cache.features()).thenReturn { - new FinalizedFeatures( - MetadataVersion.latestTesting, - featureVersions.map { featureVersion => - featureVersion.featureName -> featureVersion.featureLevel.asInstanceOf[java.lang.Short] - }.toMap.asJava, - 0) - } - - case _ => throw new IllegalStateException("Test must set an instance of KRaftMetadataCache") + when(metadataCache.features()).thenReturn { + new FinalizedFeatures( + MetadataVersion.latestTesting, + featureVersions.map { featureVersion => + featureVersion.featureName -> featureVersion.featureLevel.asInstanceOf[java.lang.Short] + }.toMap.asJava, + 0) } } diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index 285a5dded31..dec55bafd0c 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -709,7 +709,7 @@ class MetadataCacheTest { } @Test - def testGetTopicMetadataForDescribeTopicPartitionsResponse(): Unit = { + def testDescribeTopicResponse(): Unit = { val metadataCache = MetadataCache.kRaftMetadataCache(0, () => KRaftVersion.KRAFT_VERSION_0) val securityProtocol = SecurityProtocol.PLAINTEXT @@ -810,7 +810,7 @@ class MetadataCacheTest { } // Basic test - var result = metadataCache.getTopicMetadataForDescribeTopicResponse(Seq(topic0, topic1).iterator, listenerName, _ => 0, 10, false).topics().asScala.toList + var result = metadataCache.describeTopicResponse(Seq(topic0, topic1).iterator, listenerName, _ => 0, 10, false).topics().asScala.toList assertEquals(2, result.size) var resultTopic = result(0) assertEquals(topic0, resultTopic.name()) @@ -827,7 +827,7 @@ class MetadataCacheTest { checkTopicMetadata(topic1, Set(0), resultTopic.partitions().asScala) // Quota reached - var response = metadataCache.getTopicMetadataForDescribeTopicResponse(Seq(topic0, topic1).iterator, listenerName, _ => 0, 2, false) + var response = metadataCache.describeTopicResponse(Seq(topic0, topic1).iterator, listenerName, _ => 0, 2, false) result = response.topics().asScala.toList assertEquals(1, result.size) resultTopic = result(0) @@ -840,7 +840,7 @@ class MetadataCacheTest { assertEquals(2, response.nextCursor().partitionIndex()) // With start index - result = metadataCache.getTopicMetadataForDescribeTopicResponse(Seq(topic0).iterator, listenerName, t => if (t.equals(topic0)) 1 else 0, 10, false).topics().asScala.toList + result = metadataCache.describeTopicResponse(Seq(topic0).iterator, listenerName, t => if (t.equals(topic0)) 1 else 0, 10, false).topics().asScala.toList assertEquals(1, result.size) resultTopic = result(0) assertEquals(topic0, resultTopic.name()) @@ -850,7 +850,7 @@ class MetadataCacheTest { checkTopicMetadata(topic0, Set(1, 2), resultTopic.partitions().asScala) // With start index and quota reached - response = metadataCache.getTopicMetadataForDescribeTopicResponse(Seq(topic0, topic1).iterator, listenerName, t => if (t.equals(topic0)) 2 else 0, 1, false) + response = metadataCache.describeTopicResponse(Seq(topic0, topic1).iterator, listenerName, t => if (t.equals(topic0)) 2 else 0, 1, false) result = response.topics().asScala.toList assertEquals(1, result.size) @@ -864,7 +864,7 @@ class MetadataCacheTest { assertEquals(0, response.nextCursor().partitionIndex()) // When the first topic does not exist - result = metadataCache.getTopicMetadataForDescribeTopicResponse(Seq("Non-exist", topic0).iterator, listenerName, t => if (t.equals("Non-exist")) 1 else 0, 1, false).topics().asScala.toList + result = metadataCache.describeTopicResponse(Seq("Non-exist", topic0).iterator, listenerName, t => if (t.equals("Non-exist")) 1 else 0, 1, false).topics().asScala.toList assertEquals(2, result.size) resultTopic = result(0) assertEquals("Non-exist", resultTopic.name()) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index a6aa6f4b5ee..86dfba59c9d 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -2717,6 +2717,7 @@ class ReplicaManagerTest { thenReturn(Map(leaderBrokerId -> new Node(leaderBrokerId, "host1", 9092, "rack-a"), followerBrokerId -> new Node(followerBrokerId, "host2", 9092, "rack-b")).toMap) when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion) + when(metadataCache.getAliveBrokerEpoch(leaderBrokerId)).thenReturn(Some(brokerEpoch)) val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce]( "Produce", timer, 0, false) val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]( @@ -3141,6 +3142,7 @@ class ReplicaManagerTest { when(metadataCache.topicIdsToNames()).thenReturn(topicNames.asJava) when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion) mockGetAliveBrokerFunctions(metadataCache, aliveBrokers) + when(metadataCache.getAliveBrokerEpoch(brokerId+1)).thenReturn(Some(brokerEpoch)) val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce]( "Produce", timer, 0, false) val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](