KAFKA-17061 Improve the performance of isReplicaOnline (#16529)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Okada Haruki 2024-07-12 13:52:59 +09:00 committed by GitHub
parent ec9dabf86f
commit 01cf24a1ca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 6 additions and 2 deletions

View File

@ -216,6 +216,10 @@ class ControllerContext extends ControllerChannelContext {
// getter // getter
def liveBrokerIds: Set[Int] = liveBrokerEpochs.keySet.diff(shuttingDownBrokerIds) def liveBrokerIds: Set[Int] = liveBrokerEpochs.keySet.diff(shuttingDownBrokerIds)
// To just check if a broker is live, we should use this method instead of liveBrokerIds.contains(brokerId)
// which is more expensive because it constructs the set of live broker IDs.
// See KAFKA-17061 for the details.
def isLiveBroker(brokerId: Int): Boolean = liveBrokerEpochs.contains(brokerId) && !shuttingDownBrokerIds(brokerId)
def liveOrShuttingDownBrokerIds: Set[Int] = liveBrokerEpochs.keySet def liveOrShuttingDownBrokerIds: Set[Int] = liveBrokerEpochs.keySet
def liveOrShuttingDownBrokers: Set[Broker] = liveBrokers def liveOrShuttingDownBrokers: Set[Broker] = liveBrokers
def liveBrokerIdAndEpochs: Map[Int, Long] = liveBrokerEpochs def liveBrokerIdAndEpochs: Map[Int, Long] = liveBrokerEpochs
@ -238,7 +242,7 @@ class ControllerContext extends ControllerChannelContext {
def isReplicaOnline(brokerId: Int, topicPartition: TopicPartition, includeShuttingDownBrokers: Boolean): Boolean = { def isReplicaOnline(brokerId: Int, topicPartition: TopicPartition, includeShuttingDownBrokers: Boolean): Boolean = {
val brokerOnline = { val brokerOnline = {
if (includeShuttingDownBrokers) liveOrShuttingDownBrokerIds.contains(brokerId) if (includeShuttingDownBrokers) liveOrShuttingDownBrokerIds.contains(brokerId)
else liveBrokerIds.contains(brokerId) else isLiveBroker(brokerId)
} }
brokerOnline && !replicasOnOfflineDirs.getOrElse(brokerId, Set.empty).contains(topicPartition) brokerOnline && !replicasOnOfflineDirs.getOrElse(brokerId, Set.empty).contains(topicPartition)
} }

View File

@ -2241,7 +2241,7 @@ class KafkaController(val config: KafkaConfig,
case ElectionType.UNCLEAN => case ElectionType.UNCLEAN =>
val currentLeader = controllerContext.partitionLeadershipInfo(partition).get.leaderAndIsr.leader val currentLeader = controllerContext.partitionLeadershipInfo(partition).get.leaderAndIsr.leader
currentLeader == LeaderAndIsr.NoLeader || !controllerContext.liveBrokerIds.contains(currentLeader) currentLeader == LeaderAndIsr.NoLeader || !controllerContext.isLiveBroker(currentLeader)
} }
} }