From a05eaaa8f41db9fca86020845a3336acc105ee19 Mon Sep 17 00:00:00 2001 From: huxi Date: Thu, 18 Apr 2019 03:41:46 +0800 Subject: [PATCH] KAFKA-7965; Fix testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup (#6557) Most of the time, the group coordinator runs on broker 1. Occasionally the group coordinator will be placed on broker 2. If that's the case, the loop starting at line 320 have no chance to check and update `kickedOutConsumerIdx`. A quick fix is to safely do another round of loop to ensure `kickedOutConsumerIdx` always be checked after the last broker restart. Reviewers: Stanislav Kozlovski , Jason Gustafson --- .../scala/integration/kafka/api/ConsumerBounceTest.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index eabd515aeeb..2ce5fab4d6d 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -316,8 +316,14 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { // roll all brokers with a lesser max group size to make sure coordinator has the new config val newConfigs = generateKafkaConfigs(maxGroupSize.toString) var kickedOutConsumerIdx: Option[Int] = None + val holdingGroupBrokers = servers.filter(!_.groupCoordinator.groupManager.currentGroups.isEmpty).map(_.config.brokerId) + // should only have one broker holding the group metadata + assertEquals(holdingGroupBrokers.size, 1) + val coordinator = holdingGroupBrokers.head + // ensure the coordinator broker will be restarted first + val orderedBrokersIds = List(coordinator) ++ servers.indices.toBuffer.filter(_ != coordinator) // restart brokers until the group moves to a Coordinator with the new config - breakable { for (broker <- servers.indices) { + breakable { for (broker <- orderedBrokersIds) { killBroker(broker) consumerPollers.indices.foreach(idx => { consumerPollers(idx).thrownException match {