KAFKA-9866: Avoid election for topics where preferred leader is not in ISR (#8524)

In this commit we made sure that the auto leader election only happens after the newly starter broker is in the isr.

No accompany tests are added due to the fact that:

this is a change to the private method and no public facing change is made
it is hard to create tests for this change without considerable effort

Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Jun Rao <junrao@gmail.com>
This commit is contained in:
Leonard Ge 2020-04-27 21:51:10 +01:00 committed by GitHub
parent bd17085ec1
commit db9e55a50f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 3 deletions

View File

@ -1068,12 +1068,23 @@ class KafkaController(val config: KafkaConfig,
val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) && val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) &&
controllerContext.partitionsBeingReassigned.isEmpty && controllerContext.partitionsBeingReassigned.isEmpty &&
!topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) && !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) &&
controllerContext.allTopics.contains(tp.topic)) controllerContext.allTopics.contains(tp.topic) &&
canPreferredReplicaBeLeader(tp)
)
onReplicaElection(candidatePartitions.toSet, ElectionType.PREFERRED, AutoTriggered) onReplicaElection(candidatePartitions.toSet, ElectionType.PREFERRED, AutoTriggered)
} }
} }
} }
private def canPreferredReplicaBeLeader(tp: TopicPartition): Boolean = {
val assignment = controllerContext.partitionReplicaAssignment(tp)
val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, tp))
val isr = controllerContext.partitionLeadershipInfo(tp).leaderAndIsr.isr
PartitionLeaderElectionAlgorithms
.preferredReplicaPartitionLeaderElection(assignment, isr, liveReplicas.toSet)
.nonEmpty
}
private def processAutoPreferredReplicaLeaderElection(): Unit = { private def processAutoPreferredReplicaLeaderElection(): Unit = {
if (!isActive) return if (!isActive) return
try { try {

View File

@ -433,8 +433,8 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch, waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch,
"failed to get expected partition state upon topic creation") "failed to get expected partition state upon topic creation")
servers(1).shutdown() servers(otherBrokerId).shutdown()
servers(1).awaitShutdown() servers(otherBrokerId).awaitShutdown()
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp)) val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
leaderIsrAndControllerEpochMap.contains(tp) && leaderIsrAndControllerEpochMap.contains(tp) &&