mirror of https://github.com/apache/kafka.git
KAFKA-10149: Allow auto preferred leader election when there are ongoing partition reassignments (#12543)
Reviewers: Justine Olshan <jolshan@confluent.io>, Chris Egerton <fearthecellos@gmail.com>
This commit is contained in:
parent
06dc67a480
commit
47adb86636
|
@ -1260,10 +1260,7 @@ class KafkaController(val config: KafkaConfig,
|
|||
// check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
|
||||
// that need to be on this broker
|
||||
if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
|
||||
// do this check only if the broker is live and there are no partitions being reassigned currently
|
||||
// and preferred replica election is not in progress
|
||||
val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp =>
|
||||
controllerContext.partitionsBeingReassigned.isEmpty &&
|
||||
!topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) &&
|
||||
controllerContext.allTopics.contains(tp.topic) &&
|
||||
canPreferredReplicaBeLeader(tp)
|
||||
|
|
|
@ -490,6 +490,87 @@ class ControllerIntegrationTest extends QuorumTestHarness {
|
|||
"failed to get expected partition state upon broker startup")
|
||||
}
|
||||
|
||||
@Test
|
||||
def testAutoPreferredReplicaLeaderElectionWithOtherReassigningPartitions(): Unit = {
|
||||
servers = makeServers(3, autoLeaderRebalanceEnable = true)
|
||||
val controllerId = TestUtils.waitUntilControllerElected(zkClient)
|
||||
val leaderBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
|
||||
val otherBrokerId = servers.map(_.config.brokerId).filter(e => e != controllerId && e != leaderBrokerId).head
|
||||
|
||||
// Partition tp: [leaderBrokerId, controllerId]
|
||||
// Partition reassigningTp: [controllerId]
|
||||
val tp = new TopicPartition("t", 0)
|
||||
val assignment = Map(tp.partition -> Seq(leaderBrokerId, controllerId))
|
||||
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
|
||||
val reassigningTp = new TopicPartition("reassigning", 0)
|
||||
val reassigningTpAssignment = Map(reassigningTp.partition -> Seq(controllerId))
|
||||
TestUtils.createTopic(zkClient, reassigningTp.topic, partitionReplicaAssignment = reassigningTpAssignment, servers = servers)
|
||||
|
||||
// Shutdown broker leaderBrokerId so that broker controllerId will be elected as leader for partition tp
|
||||
servers(leaderBrokerId).shutdown()
|
||||
servers(leaderBrokerId).awaitShutdown()
|
||||
waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
|
||||
"failed to get expected partition state upon broker shutdown")
|
||||
|
||||
// Shutdown broker otherBrokerId and reassign partition reassigningTp from [controllerId] to [otherBrokerId]
|
||||
// to create a stuck reassignment.
|
||||
servers(otherBrokerId).shutdown()
|
||||
servers(otherBrokerId).awaitShutdown()
|
||||
val reassignment = Map(reassigningTp -> ReplicaAssignment(Seq(otherBrokerId), List(), List()))
|
||||
zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k -> v.replicas })
|
||||
waitForPartitionState(reassigningTp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
|
||||
"failed to get expected partition state during partition reassignment with offline replica")
|
||||
|
||||
// Start broker leaderBrokerId and make sure it is elected as leader (preferred) of partition tp automatically
|
||||
// even though there is some other ongoing reassignment.
|
||||
servers(leaderBrokerId).startup()
|
||||
waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId, LeaderAndIsr.InitialLeaderEpoch + 2,
|
||||
"failed to get expected partition state upon leader broker startup")
|
||||
|
||||
// Start broker otherBrokerId and make sure the reassignment which was stuck can be fulfilled.
|
||||
servers(otherBrokerId).startup()
|
||||
waitForPartitionState(reassigningTp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.InitialLeaderEpoch + 4,
|
||||
"failed to get expected partition state upon other broker startup")
|
||||
TestUtils.waitUntilTrue(() => zkClient.getFullReplicaAssignmentForTopics(Set(reassigningTp.topic)) == reassignment,
|
||||
"failed to get updated partition assignment on topic znode after partition reassignment")
|
||||
TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
|
||||
"failed to remove reassign partitions path after completion")
|
||||
}
|
||||
|
||||
@Test
|
||||
def testAutoPreferredReplicaLeaderElectionWithSamePartitionBeingReassigned(): Unit = {
|
||||
servers = makeServers(3, autoLeaderRebalanceEnable = true)
|
||||
val controllerId = TestUtils.waitUntilControllerElected(zkClient)
|
||||
val leaderBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
|
||||
val otherBrokerId = servers.map(_.config.brokerId).filter(e => e != controllerId && e != leaderBrokerId).head
|
||||
|
||||
// Partition tp: [controllerId, leaderBrokerId]
|
||||
val tp = new TopicPartition("t", 0)
|
||||
val assignment = Map(tp.partition -> Seq(controllerId, leaderBrokerId))
|
||||
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
|
||||
|
||||
// Shutdown broker otherBrokerId and reassign partition tp from [controllerId, leaderBrokerId] to [leaderBrokerId, controllerId, otherBrokerId]
|
||||
// to create a stuck reassignment.
|
||||
servers(otherBrokerId).shutdown()
|
||||
servers(otherBrokerId).awaitShutdown()
|
||||
val reassignment = Map(tp -> ReplicaAssignment(Seq(leaderBrokerId, controllerId, otherBrokerId), List(), List()))
|
||||
zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k -> v.replicas })
|
||||
|
||||
//Make sure broker leaderBrokerId is elected as leader (preferred) of partition tp automatically
|
||||
// even though the reassignment is still ongoing.
|
||||
waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId, LeaderAndIsr.InitialLeaderEpoch + 2,
|
||||
"failed to get expected partition state after auto preferred replica leader election")
|
||||
|
||||
// Start broker otherBrokerId and make sure the reassignment which was stuck can be fulfilled.
|
||||
servers(otherBrokerId).startup()
|
||||
waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId, LeaderAndIsr.InitialLeaderEpoch + 3,
|
||||
"failed to get expected partition state upon broker startup")
|
||||
TestUtils.waitUntilTrue(() => zkClient.getFullReplicaAssignmentForTopics(Set(tp.topic)) == reassignment,
|
||||
"failed to get updated partition assignment on topic znode after partition reassignment")
|
||||
TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
|
||||
"failed to remove reassign partitions path after completion")
|
||||
}
|
||||
|
||||
@Test
|
||||
def testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionDisabled(): Unit = {
|
||||
servers = makeServers(2)
|
||||
|
|
Loading…
Reference in New Issue