KAFKA-18555 Avoid casting MetadataCache to KRaftMetadataCache (#18632)

Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
PoAn Yang 2025-01-25 23:02:28 +08:00 committed by GitHub
parent c774352c35
commit be7415cb8b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 135 additions and 128 deletions

View File

@ -20,7 +20,7 @@ package kafka.server.handlers;
import kafka.network.RequestChannel;
import kafka.server.AuthHelper;
import kafka.server.KafkaConfig;
import kafka.server.metadata.KRaftMetadataCache;
import kafka.server.MetadataCache;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidRequestException;
@ -44,12 +44,12 @@ import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
import static org.apache.kafka.common.resource.ResourceType.TOPIC;
public class DescribeTopicPartitionsRequestHandler {
KRaftMetadataCache metadataCache;
MetadataCache metadataCache;
AuthHelper authHelper;
KafkaConfig config;
public DescribeTopicPartitionsRequestHandler(
KRaftMetadataCache metadataCache,
MetadataCache metadataCache,
AuthHelper authHelper,
KafkaConfig config
) {
@ -104,7 +104,7 @@ public class DescribeTopicPartitionsRequestHandler {
return isAuthorized;
});
DescribeTopicPartitionsResponseData response = metadataCache.getTopicMetadataForDescribeTopicResponse(
DescribeTopicPartitionsResponseData response = metadataCache.describeTopicResponse(
CollectionConverters.asScala(authorizedTopicsStream.iterator()),
abstractRequest.context().listenerName,
(String topicName) -> topicName.equals(cursorTopicName) ? cursor.partitionIndex() : 0,

View File

@ -23,7 +23,6 @@ import kafka.controller.StateChangeLogger
import kafka.log._
import kafka.log.remote.RemoteLogManager
import kafka.server._
import kafka.server.metadata.KRaftMetadataCache
import kafka.server.share.DelayedShareFetch
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
@ -1050,28 +1049,23 @@ class Partition(val topicPartition: TopicPartition,
}
private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
metadataCache match {
// In KRaft mode, only a replica which meets all of the following requirements is allowed to join the ISR.
// 1. It is not fenced.
// 2. It is not in controlled shutdown.
// 3. Its metadata cached broker epoch matches its Fetch request broker epoch. Or the Fetch
// request broker epoch is -1 which bypasses the epoch verification.
case kRaftMetadataCache: KRaftMetadataCache =>
val mayBeReplica = getReplica(followerReplicaId)
// The topic is already deleted and we don't have any replica information. In this case, we can return false
// so as to avoid NPE
if (mayBeReplica.isEmpty) {
warn(s"The replica state of replica ID:[$followerReplicaId] doesn't exist in the leader node. It might because the topic is already deleted.")
return false
}
val storedBrokerEpoch = mayBeReplica.get.stateSnapshot.brokerEpoch
val cachedBrokerEpoch = kRaftMetadataCache.getAliveBrokerEpoch(followerReplicaId)
!kRaftMetadataCache.isBrokerFenced(followerReplicaId) &&
!kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId) &&
isBrokerEpochIsrEligible(storedBrokerEpoch, cachedBrokerEpoch)
case _ => true
// A replica which meets all of the following requirements is allowed to join the ISR.
// 1. It is not fenced.
// 2. It is not in controlled shutdown.
// 3. Its metadata cached broker epoch matches its Fetch request broker epoch. Or the Fetch
// request broker epoch is -1 which bypasses the epoch verification.
val mayBeReplica = getReplica(followerReplicaId)
// The topic is already deleted and we don't have any replica information. In this case, we can return false
// so as to avoid NPE
if (mayBeReplica.isEmpty) {
warn(s"The replica state of replica ID:[$followerReplicaId] doesn't exist in the leader node. It might because the topic is already deleted.")
return false
}
val storedBrokerEpoch = mayBeReplica.get.stateSnapshot.brokerEpoch
val cachedBrokerEpoch = metadataCache.getAliveBrokerEpoch(followerReplicaId)
!metadataCache.isBrokerFenced(followerReplicaId) &&
!metadataCache.isBrokerShuttingDown(followerReplicaId) &&
isBrokerEpochIsrEligible(storedBrokerEpoch, cachedBrokerEpoch)
}
private def isBrokerEpochIsrEligible(storedBrokerEpoch: Option[Long], cachedBrokerEpoch: Option[Long]): Boolean = {
@ -1793,9 +1787,7 @@ class Partition(val topicPartition: TopicPartition,
private def addBrokerEpochToIsr(isr: List[Int]): List[BrokerState] = {
isr.map { brokerId =>
val brokerState = new BrokerState().setBrokerId(brokerId)
if (!metadataCache.isInstanceOf[KRaftMetadataCache]) {
brokerState.setBrokerEpoch(-1)
} else if (brokerId == localBrokerId) {
if (brokerId == localBrokerId) {
brokerState.setBrokerEpoch(localBrokerEpochSupplier())
} else {
val replica = remoteReplicasMap.get(brokerId)

View File

@ -19,7 +19,6 @@ package kafka.cluster
import kafka.log.UnifiedLog
import kafka.server.MetadataCache
import kafka.server.metadata.KRaftMetadataCache
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
@ -113,15 +112,11 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition, val metadat
brokerEpoch: Long
): Unit = {
replicaState.updateAndGet { currentReplicaState =>
metadataCache match {
case kRaftMetadataCache: KRaftMetadataCache =>
val cachedBrokerEpoch = kRaftMetadataCache.getAliveBrokerEpoch(brokerId)
// Fence the update if it provides a stale broker epoch.
if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) {
throw new NotLeaderOrFollowerException(s"Received stale fetch state update. broker epoch=$brokerEpoch " +
s"vs expected=${cachedBrokerEpoch.get}")
}
case _ =>
val cachedBrokerEpoch = metadataCache.getAliveBrokerEpoch(brokerId)
// Fence the update if it provides a stale broker epoch.
if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) {
throw new NotLeaderOrFollowerException(s"Received stale fetch state update. broker epoch=$brokerEpoch " +
s"vs expected=${currentReplicaState.brokerEpoch.get}")
}
val lastCaughtUpTime = if (followerFetchOffsetMetadata.messageOffset >= leaderEndOffset) {

View File

@ -21,7 +21,7 @@ import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinat
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
import kafka.server.handlers.DescribeTopicPartitionsRequestHandler
import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache}
import kafka.server.metadata.ConfigRepository
import kafka.server.share.SharePartitionManager
import kafka.utils.Logging
import org.apache.kafka.admin.AdminUtils
@ -113,11 +113,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
val aclApis = new AclApis(authHelper, authorizer, requestHelper, "broker", config)
val configManager = new ConfigAdminManager(brokerId, config, configRepository)
val describeTopicPartitionsRequestHandler : Option[DescribeTopicPartitionsRequestHandler] = metadataCache match {
case kRaftMetadataCache: KRaftMetadataCache =>
Some(new DescribeTopicPartitionsRequestHandler(kRaftMetadataCache, authHelper, config))
case _ => None
}
val describeTopicPartitionsRequestHandler = new DescribeTopicPartitionsRequestHandler(
metadataCache, authHelper, config)
def close(): Unit = {
aclApis.close()
@ -967,21 +964,14 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): Unit = {
describeTopicPartitionsRequestHandler match {
case Some(handler) => {
val response = handler.handleDescribeTopicPartitionsRequest(request)
trace("Sending topic partitions metadata %s for correlation id %d to client %s".format(response.topics().asScala.mkString(","),
request.header.correlationId, request.header.clientId))
val response = describeTopicPartitionsRequestHandler.handleDescribeTopicPartitionsRequest(request)
trace("Sending topic partitions metadata %s for correlation id %d to client %s".format(response.topics().asScala.mkString(","),
request.header.correlationId, request.header.clientId))
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
response.setThrottleTimeMs(requestThrottleMs)
new DescribeTopicPartitionsResponse(response)
})
}
case None => {
requestHelper.sendMaybeThrottle(request, request.body[DescribeTopicPartitionsRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
}
}
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
response.setThrottleTimeMs(requestThrottleMs)
new DescribeTopicPartitionsResponse(response)
})
}
/**

View File

@ -19,7 +19,7 @@ package kafka.server
import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache}
import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, DescribeClientQuotasResponseData, DescribeUserScramCredentialsRequestData, DescribeUserScramCredentialsResponseData, MetadataResponseData}
import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, DescribeClientQuotasResponseData, DescribeTopicPartitionsResponseData, DescribeUserScramCredentialsRequestData, DescribeUserScramCredentialsResponseData, MetadataResponseData}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderAndIsr
@ -56,6 +56,12 @@ trait MetadataCache extends ConfigRepository {
def getAliveBrokers(): Iterable[BrokerMetadata]
def getAliveBrokerEpoch(brokerId: Int): Option[Long]
def isBrokerFenced(brokerId: Int): Boolean
def isBrokerShuttingDown(brokerId: Int): Boolean
def getTopicId(topicName: String): Uuid
def getTopicName(topicId: Uuid): Option[String]
@ -105,6 +111,30 @@ trait MetadataCache extends ConfigRepository {
def describeClientQuotas(request: DescribeClientQuotasRequestData): DescribeClientQuotasResponseData
def describeScramCredentials(request: DescribeUserScramCredentialsRequestData): DescribeUserScramCredentialsResponseData
/**
* Get the topic metadata for the given topics.
*
* The quota is used to limit the number of partitions to return. The NextTopicPartition field points to the first
* partition can't be returned due the limit.
* If a topic can't return any partition due to quota limit reached, this topic will not be included in the response.
*
* Note, the topics should be sorted in alphabetical order. The topics in the DescribeTopicPartitionsResponseData
* will also be sorted in alphabetical order.
*
* @param topics The iterator of topics and their corresponding first partition id to fetch.
* @param listenerName The listener name.
* @param topicPartitionStartIndex The start partition index for the first topic
* @param maximumNumberOfPartitions The max number of partitions to return.
* @param ignoreTopicsWithExceptions Whether ignore the topics with exception.
*/
def describeTopicResponse(
topics: Iterator[String],
listenerName: ListenerName,
topicPartitionStartIndex: String => Int,
maximumNumberOfPartitions: Int,
ignoreTopicsWithExceptions: Boolean
): DescribeTopicPartitionsResponseData
}
object MetadataCache {

View File

@ -256,23 +256,7 @@ class KRaftMetadataCache(
}
}
/**
* Get the topic metadata for the given topics.
*
* The quota is used to limit the number of partitions to return. The NextTopicPartition field points to the first
* partition can't be returned due the limit.
* If a topic can't return any partition due to quota limit reached, this topic will not be included in the response.
*
* Note, the topics should be sorted in alphabetical order. The topics in the DescribeTopicPartitionsResponseData
* will also be sorted in alphabetical order.
*
* @param topics The iterator of topics and their corresponding first partition id to fetch.
* @param listenerName The listener name.
* @param topicPartitionStartIndex The start partition index for the first topic
* @param maximumNumberOfPartitions The max number of partitions to return.
* @param ignoreTopicsWithExceptions Whether ignore the topics with exception.
*/
def getTopicMetadataForDescribeTopicResponse(
override def describeTopicResponse(
topics: Iterator[String],
listenerName: ListenerName,
topicPartitionStartIndex: String => Int,
@ -353,11 +337,11 @@ class KRaftMetadataCache(
Option(_currentImage.cluster.broker(brokerId)).count(!_.fenced()) == 1
}
def isBrokerFenced(brokerId: Int): Boolean = {
override def isBrokerFenced(brokerId: Int): Boolean = {
Option(_currentImage.cluster.broker(brokerId)).count(_.fenced) == 1
}
def isBrokerShuttingDown(brokerId: Int): Boolean = {
override def isBrokerShuttingDown(brokerId: Int): Boolean = {
Option(_currentImage.cluster.broker(brokerId)).count(_.inControlledShutdown) == 1
}
@ -452,7 +436,7 @@ class KRaftMetadataCache(
}
}
def getAliveBrokerEpoch(brokerId: Int): Option[Long] = {
override def getAliveBrokerEpoch(brokerId: Int): Option[Long] = {
Option(_currentImage.cluster().broker(brokerId)).filterNot(_.fenced()).
map(brokerRegistration => brokerRegistration.epoch())
}

View File

@ -348,6 +348,7 @@ class PartitionLockTest extends Logging {
val controllerEpoch = 0
val replicas = (0 to numReplicaFetchers).map(i => Integer.valueOf(brokerId + i)).toList.asJava
val isr = replicas
replicas.forEach(replicaId => when(metadataCache.getAliveBrokerEpoch(replicaId)).thenReturn(Some(1L)))
assertTrue(partition.makeLeader(new LeaderAndIsrRequest.PartitionState()
.setControllerEpoch(controllerEpoch)

View File

@ -186,6 +186,7 @@ class PartitionTest extends AbstractPartitionTest {
val leaderEpoch = 10
val logStartOffset = 0L
val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch, isLeader = true)
addBrokerEpochToMockMetadataCache(metadataCache, List(remoteReplicaId))
def epochEndOffset(epoch: Int, endOffset: Long): FetchResponseData.EpochEndOffset = {
new FetchResponseData.EpochEndOffset()
@ -354,6 +355,7 @@ class PartitionTest extends AbstractPartitionTest {
val isr = List[Integer](leader, validReplica).asJava
val leaderEpoch = 8
val partitionEpoch = 1
addBrokerEpochToMockMetadataCache(metadataCache, List(leader, addingReplica1, addingReplica2))
assertTrue(partition.makeLeader(new LeaderAndIsrRequest.PartitionState()
.setControllerEpoch(controllerEpoch)
@ -641,6 +643,7 @@ class PartitionTest extends AbstractPartitionTest {
val leaderEpoch = 5
val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)
addBrokerEpochToMockMetadataCache(metadataCache, List(remoteReplicaId))
def sendFetch(leaderEpoch: Option[Int]): LogReadInfo = {
fetchFollower(
partition,
@ -788,6 +791,7 @@ class PartitionTest extends AbstractPartitionTest {
val batch2 = TestUtils.records(records = List(new SimpleRecord("k3".getBytes, "v1".getBytes),
new SimpleRecord(20,"k4".getBytes, "v2".getBytes),
new SimpleRecord(21,"k5".getBytes, "v3".getBytes)))
addBrokerEpochToMockMetadataCache(metadataCache, replicas)
val leaderState = new LeaderAndIsrRequest.PartitionState()
.setControllerEpoch(controllerEpoch)
@ -1121,7 +1125,7 @@ class PartitionTest extends AbstractPartitionTest {
val leader = brokerId
val follower1 = brokerId + 1
val follower2 = brokerId + 2
val replicas = List[Integer](leader, follower1, follower2).asJava
val replicas = Seq(leader, follower1, follower2)
val isr = List[Integer](leader, follower2).asJava
val leaderEpoch = 8
val batch1 = TestUtils.records(records = List(new SimpleRecord("k1".getBytes, "v1".getBytes),
@ -1131,6 +1135,7 @@ class PartitionTest extends AbstractPartitionTest {
new SimpleRecord("k5".getBytes, "v3".getBytes)))
val batch3 = TestUtils.records(records = List(new SimpleRecord("k6".getBytes, "v1".getBytes),
new SimpleRecord("k7".getBytes, "v2".getBytes)))
addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList)
val leaderState = new LeaderAndIsrRequest.PartitionState()
.setControllerEpoch(controllerEpoch)
@ -1138,7 +1143,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setPartitionEpoch(1)
.setReplicas(replicas)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true)
assertTrue(partition.makeLeader(leaderState, offsetCheckpoints, None), "Expected first makeLeader() to return 'leader changed'")
assertEquals(leaderEpoch, partition.getLeaderEpoch, "Current leader epoch")
@ -1165,7 +1170,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeaderEpoch(leaderEpoch + 1)
.setIsr(isr)
.setPartitionEpoch(1)
.setReplicas(replicas)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(false)
partition.makeFollower(followerState, offsetCheckpoints, None)
@ -1175,7 +1180,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeaderEpoch(leaderEpoch + 2)
.setIsr(isr)
.setPartitionEpoch(1)
.setReplicas(replicas)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(false)
assertTrue(partition.makeLeader(newLeaderState, offsetCheckpoints, None),
"Expected makeLeader() to return 'leader changed' after makeFollower()")
@ -1311,8 +1316,9 @@ class PartitionTest extends AbstractPartitionTest {
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
val replicas = List[Integer](brokerId, remoteBrokerId).asJava
val replicas = Seq(brokerId, remoteBrokerId)
val isr = replicas
addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
@ -1322,9 +1328,9 @@ class PartitionTest extends AbstractPartitionTest {
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setIsr(isr.map(Int.box).asJava)
.setPartitionEpoch(1)
.setReplicas(replicas)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true),
offsetCheckpoints, None), "Expected become leader transition to succeed")
@ -1355,7 +1361,6 @@ class PartitionTest extends AbstractPartitionTest {
@Test
def testIsReplicaIsrEligibleWithEmptyReplicaMap(): Unit = {
val mockMetadataCache: KRaftMetadataCache = mock(classOf[KRaftMetadataCache])
val partition = spy(new Partition(topicPartition,
replicaLagTimeMaxMs = ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_DEFAULT,
localBrokerId = brokerId,
@ -1363,7 +1368,7 @@ class PartitionTest extends AbstractPartitionTest {
time,
alterPartitionListener,
delayedOperations,
mockMetadataCache,
metadataCache,
logManager,
alterPartitionManager))
@ -1376,7 +1381,7 @@ class PartitionTest extends AbstractPartitionTest {
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
val replicas = List(brokerId, remoteBrokerId)
addBrokerEpochToMockMetadataCache(mockMetadataCache, replicas)
addBrokerEpochToMockMetadataCache(metadataCache, replicas)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
@ -1395,7 +1400,7 @@ class PartitionTest extends AbstractPartitionTest {
doAnswer(_ => {
// simulate topic is deleted at the moment
partition.delete()
val replica = new Replica(remoteBrokerId, topicPartition, mockMetadataCache)
val replica = new Replica(remoteBrokerId, topicPartition, metadataCache)
partition.updateFollowerFetchState(replica, mock(classOf[LogOffsetMetadata]), 0, initializeTimeMs, 0, defaultBrokerEpoch(remoteBrokerId))
mock(classOf[LogReadInfo])
}).when(partition).fetchRecords(any(), any(), anyLong(), anyInt(), anyBoolean(), anyBoolean())
@ -1411,8 +1416,9 @@ class PartitionTest extends AbstractPartitionTest {
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
val replicas = List[Integer](brokerId, remoteBrokerId).asJava
val replicas = Seq(brokerId, remoteBrokerId)
val isr = List[Integer](brokerId).asJava
addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
assertTrue(partition.makeLeader(
@ -1422,7 +1428,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setPartitionEpoch(1)
.setReplicas(replicas)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true),
offsetCheckpoints, None), "Expected become leader transition to succeed")
assertEquals(Set(brokerId), partition.partitionState.isr)
@ -1464,6 +1470,7 @@ class PartitionTest extends AbstractPartitionTest {
val remoteBrokerId = brokerId + 1
val replicas = List(brokerId, remoteBrokerId)
val isr = List[Integer](brokerId).asJava
addBrokerEpochToMockMetadataCache(metadataCache, replicas)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
assertTrue(partition.makeLeader(
@ -1497,8 +1504,8 @@ class PartitionTest extends AbstractPartitionTest {
val isrItem = alterPartitionManager.isrUpdates.head
assertEquals(isrItem.leaderAndIsr.isr, List(brokerId, remoteBrokerId).map(Int.box).asJava)
isrItem.leaderAndIsr.isrWithBrokerEpoch.asScala.foreach { brokerState =>
// the broker epochs in the leaderAndIsr should be -1.
assertEquals(-1, brokerState.brokerEpoch())
// the broker epochs should be equal to broker epoch of the leader
assertEquals(defaultBrokerEpoch(brokerState.brokerId()), brokerState.brokerEpoch())
}
assertEquals(Set(brokerId), partition.partitionState.isr)
assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.maximalIsr)
@ -1525,8 +1532,9 @@ class PartitionTest extends AbstractPartitionTest {
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
val replicas = List[Integer](brokerId, remoteBrokerId).asJava
val replicas = Seq(brokerId, remoteBrokerId)
val isr = List[Integer](brokerId).asJava
addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
assertTrue(partition.makeLeader(
@ -1536,7 +1544,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setPartitionEpoch(1)
.setReplicas(replicas)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true),
offsetCheckpoints, None), "Expected become leader transition to succeed")
assertEquals(Set(brokerId), partition.partitionState.isr)
@ -1583,7 +1591,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List(brokerId, remoteBrokerId)
val shrinkedIsr = Set(brokerId)
val metadataCache = mock(classOf[KRaftMetadataCache])
addBrokerEpochToMockMetadataCache(metadataCache, replicas)
val partition = new Partition(
@ -1684,7 +1691,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List(brokerId, remoteBrokerId)
val isr = Set(brokerId)
val metadataCache = mock(classOf[KRaftMetadataCache])
addBrokerEpochToMockMetadataCache(metadataCache, replicas)
// Mark the remote broker as eligible or ineligible in the metadata cache of the leader.
@ -1887,7 +1893,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List(brokerId, remoteBrokerId1)
val isr = Set(brokerId, remoteBrokerId1)
val metadataCache = mock(classOf[KRaftMetadataCache])
addBrokerEpochToMockMetadataCache(metadataCache, replicas)
val partition = new Partition(
@ -1953,7 +1958,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List(brokerId, remoteBrokerId)
val isr = Set(brokerId)
val metadataCache = mock(classOf[KRaftMetadataCache])
addBrokerEpochToMockMetadataCache(metadataCache, replicas)
val partition = new Partition(
topicPartition,
@ -2189,7 +2193,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = Seq(brokerId, remoteBrokerId1, remoteBrokerId2)
val isr = Seq(brokerId, remoteBrokerId1)
val metadataCache = mock(classOf[KRaftMetadataCache])
addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList)
val partition = new Partition(
@ -2304,6 +2307,7 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = Seq(brokerId, remoteBrokerId)
val isr = Seq(brokerId, remoteBrokerId)
val initializeTimeMs = time.milliseconds()
addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList)
assertTrue(makeLeader(
topicId = None,
@ -2363,6 +2367,7 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = Seq(brokerId, remoteBrokerId)
val isr = Seq(brokerId, remoteBrokerId)
val initializeTimeMs = time.milliseconds()
addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList)
assertTrue(makeLeader(
topicId = None,
@ -2496,6 +2501,7 @@ class PartitionTest extends AbstractPartitionTest {
val remoteBrokerId = brokerId + 1
val replicas = Seq(brokerId, remoteBrokerId)
val isr = Seq(brokerId)
addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList)
assertTrue(makeLeader(
topicId = None,
@ -2592,6 +2598,7 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = Seq(brokerId, follower1, follower2, follower3)
val isr = Seq(brokerId, follower1, follower2)
val partitionEpoch = 1
addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList)
doNothing().when(delayedOperations).checkAndCompleteAll()
@ -2647,6 +2654,7 @@ class PartitionTest extends AbstractPartitionTest {
val follower3 = brokerId + 3
val replicas = Seq(brokerId, follower1, follower2, follower3)
val isr = Seq(brokerId, follower1, follower2)
addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList)
doNothing().when(delayedOperations).checkAndCompleteAll()
@ -3026,6 +3034,7 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List(leaderId, followerId)
val leaderEpoch = 8
val topicId = Uuid.randomUuid()
addBrokerEpochToMockMetadataCache(metadataCache, replicas)
val initialLeaderState = new LeaderAndIsrRequest.PartitionState()
.setControllerEpoch(controllerEpoch)
@ -3259,13 +3268,16 @@ class PartitionTest extends AbstractPartitionTest {
def testAddAndRemoveListeners(): Unit = {
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
val replicas = Seq(brokerId, brokerId + 1)
val isr = replicas
addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList)
partition.makeLeader(
new LeaderAndIsrRequest.PartitionState()
.setControllerEpoch(0)
.setLeader(brokerId)
.setLeaderEpoch(0)
.setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
.setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
.setIsr(isr.map(Int.box).asJava)
.setReplicas(replicas.map(Int.box).asJava)
.setPartitionEpoch(1)
.setIsNew(true),
offsetCheckpoints,
@ -3350,13 +3362,16 @@ class PartitionTest extends AbstractPartitionTest {
def testPartitionListenerWhenLogOffsetsChanged(): Unit = {
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
val replicas = Seq(brokerId, brokerId + 1)
val isr = Seq(brokerId, brokerId + 1)
addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList)
partition.makeLeader(
new LeaderAndIsrRequest.PartitionState()
.setControllerEpoch(0)
.setLeader(brokerId)
.setLeaderEpoch(0)
.setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
.setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
.setIsr(isr.map(Int.box).asJava)
.setReplicas(replicas.map(Int.box).asJava)
.setPartitionEpoch(1)
.setIsNew(true),
offsetCheckpoints,
@ -3442,13 +3457,16 @@ class PartitionTest extends AbstractPartitionTest {
partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints, topicId = None)
assertTrue(partition.log.isDefined)
val replicas = Seq(brokerId, brokerId + 1)
val isr = replicas
addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList)
partition.makeLeader(
new LeaderAndIsrRequest.PartitionState()
.setControllerEpoch(0)
.setLeader(brokerId)
.setLeaderEpoch(0)
.setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
.setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
.setIsr(isr.map(Int.box).asJava)
.setReplicas(replicas.map(Int.box).asJava)
.setPartitionEpoch(1)
.setIsNew(true),
offsetCheckpoints,
@ -3730,9 +3748,9 @@ class PartitionTest extends AbstractPartitionTest {
)
}
private def addBrokerEpochToMockMetadataCache(kRaftMetadataCache: KRaftMetadataCache, brokers: List[Int]): Unit = {
private def addBrokerEpochToMockMetadataCache(metadataCache: MetadataCache, brokers: List[Int]): Unit = {
brokers.foreach { broker =>
when(kRaftMetadataCache.getAliveBrokerEpoch(broker)).thenReturn(Option(defaultBrokerEpoch(broker)))
when(metadataCache.getAliveBrokerEpoch(broker)).thenReturn(Option(defaultBrokerEpoch(broker)))
}
}

View File

@ -212,18 +212,13 @@ class KafkaApisTest extends Logging {
private def setupFeatures(featureVersions: Seq[FeatureVersion]): Unit = {
if (featureVersions.isEmpty) return
metadataCache match {
case cache: KRaftMetadataCache =>
when(cache.features()).thenReturn {
new FinalizedFeatures(
MetadataVersion.latestTesting,
featureVersions.map { featureVersion =>
featureVersion.featureName -> featureVersion.featureLevel.asInstanceOf[java.lang.Short]
}.toMap.asJava,
0)
}
case _ => throw new IllegalStateException("Test must set an instance of KRaftMetadataCache")
when(metadataCache.features()).thenReturn {
new FinalizedFeatures(
MetadataVersion.latestTesting,
featureVersions.map { featureVersion =>
featureVersion.featureName -> featureVersion.featureLevel.asInstanceOf[java.lang.Short]
}.toMap.asJava,
0)
}
}

View File

@ -709,7 +709,7 @@ class MetadataCacheTest {
}
@Test
def testGetTopicMetadataForDescribeTopicPartitionsResponse(): Unit = {
def testDescribeTopicResponse(): Unit = {
val metadataCache = MetadataCache.kRaftMetadataCache(0, () => KRaftVersion.KRAFT_VERSION_0)
val securityProtocol = SecurityProtocol.PLAINTEXT
@ -810,7 +810,7 @@ class MetadataCacheTest {
}
// Basic test
var result = metadataCache.getTopicMetadataForDescribeTopicResponse(Seq(topic0, topic1).iterator, listenerName, _ => 0, 10, false).topics().asScala.toList
var result = metadataCache.describeTopicResponse(Seq(topic0, topic1).iterator, listenerName, _ => 0, 10, false).topics().asScala.toList
assertEquals(2, result.size)
var resultTopic = result(0)
assertEquals(topic0, resultTopic.name())
@ -827,7 +827,7 @@ class MetadataCacheTest {
checkTopicMetadata(topic1, Set(0), resultTopic.partitions().asScala)
// Quota reached
var response = metadataCache.getTopicMetadataForDescribeTopicResponse(Seq(topic0, topic1).iterator, listenerName, _ => 0, 2, false)
var response = metadataCache.describeTopicResponse(Seq(topic0, topic1).iterator, listenerName, _ => 0, 2, false)
result = response.topics().asScala.toList
assertEquals(1, result.size)
resultTopic = result(0)
@ -840,7 +840,7 @@ class MetadataCacheTest {
assertEquals(2, response.nextCursor().partitionIndex())
// With start index
result = metadataCache.getTopicMetadataForDescribeTopicResponse(Seq(topic0).iterator, listenerName, t => if (t.equals(topic0)) 1 else 0, 10, false).topics().asScala.toList
result = metadataCache.describeTopicResponse(Seq(topic0).iterator, listenerName, t => if (t.equals(topic0)) 1 else 0, 10, false).topics().asScala.toList
assertEquals(1, result.size)
resultTopic = result(0)
assertEquals(topic0, resultTopic.name())
@ -850,7 +850,7 @@ class MetadataCacheTest {
checkTopicMetadata(topic0, Set(1, 2), resultTopic.partitions().asScala)
// With start index and quota reached
response = metadataCache.getTopicMetadataForDescribeTopicResponse(Seq(topic0, topic1).iterator, listenerName, t => if (t.equals(topic0)) 2 else 0, 1, false)
response = metadataCache.describeTopicResponse(Seq(topic0, topic1).iterator, listenerName, t => if (t.equals(topic0)) 2 else 0, 1, false)
result = response.topics().asScala.toList
assertEquals(1, result.size)
@ -864,7 +864,7 @@ class MetadataCacheTest {
assertEquals(0, response.nextCursor().partitionIndex())
// When the first topic does not exist
result = metadataCache.getTopicMetadataForDescribeTopicResponse(Seq("Non-exist", topic0).iterator, listenerName, t => if (t.equals("Non-exist")) 1 else 0, 1, false).topics().asScala.toList
result = metadataCache.describeTopicResponse(Seq("Non-exist", topic0).iterator, listenerName, t => if (t.equals("Non-exist")) 1 else 0, 1, false).topics().asScala.toList
assertEquals(2, result.size)
resultTopic = result(0)
assertEquals("Non-exist", resultTopic.name())

View File

@ -2717,6 +2717,7 @@ class ReplicaManagerTest {
thenReturn(Map(leaderBrokerId -> new Node(leaderBrokerId, "host1", 9092, "rack-a"),
followerBrokerId -> new Node(followerBrokerId, "host2", 9092, "rack-b")).toMap)
when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
when(metadataCache.getAliveBrokerEpoch(leaderBrokerId)).thenReturn(Some(brokerEpoch))
val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
"Produce", timer, 0, false)
val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](
@ -3141,6 +3142,7 @@ class ReplicaManagerTest {
when(metadataCache.topicIdsToNames()).thenReturn(topicNames.asJava)
when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
mockGetAliveBrokerFunctions(metadataCache, aliveBrokers)
when(metadataCache.getAliveBrokerEpoch(brokerId+1)).thenReturn(Some(brokerEpoch))
val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
"Produce", timer, 0, false)
val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](