KAFKA-7965; Fix testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup (#6557)

Most of the time, the group coordinator runs on broker 1. Occasionally the group coordinator will be placed on broker 2. If that's the case, the loop starting at line 320 have no chance to check and update `kickedOutConsumerIdx`. A quick fix is to safely do another round of loop to ensure `kickedOutConsumerIdx` always be checked after the last broker restart.

Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
huxi 2019-04-18 03:41:46 +08:00 committed by Jason Gustafson
parent 7f9b9a60da
commit a05eaaa8f4
1 changed files with 7 additions and 1 deletions

View File

@ -316,8 +316,14 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
// roll all brokers with a lesser max group size to make sure coordinator has the new config
val newConfigs = generateKafkaConfigs(maxGroupSize.toString)
var kickedOutConsumerIdx: Option[Int] = None
val holdingGroupBrokers = servers.filter(!_.groupCoordinator.groupManager.currentGroups.isEmpty).map(_.config.brokerId)
// should only have one broker holding the group metadata
assertEquals(holdingGroupBrokers.size, 1)
val coordinator = holdingGroupBrokers.head
// ensure the coordinator broker will be restarted first
val orderedBrokersIds = List(coordinator) ++ servers.indices.toBuffer.filter(_ != coordinator)
// restart brokers until the group moves to a Coordinator with the new config
breakable { for (broker <- servers.indices) {
breakable { for (broker <- orderedBrokersIds) {
killBroker(broker)
consumerPollers.indices.foreach(idx => {
consumerPollers(idx).thrownException match {