MINOR: convert ConsumerBounceTest to KRaft (#17997)

Reviewers: TengYao Chi <kitingiao@gmail.com>, TaiJuWu <tjwu1217@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Colin Patrick McCabe 2024-12-05 23:25:48 -08:00 committed by GitHub
parent e7d986e48c
commit 81ada393d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 35 additions and 22 deletions

View File

@ -26,10 +26,10 @@ import org.apache.kafka.common.message.FindCoordinatorRequestData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinatorResponse}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.util.ShutdownableThread
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Disabled, Test}
import org.junit.jupiter.api.{AfterEach, Disabled, Test, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
@ -54,17 +54,30 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
generateKafkaConfigs()
}
val testConfigs = Map[String, String](
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG -> "3", // don't want to lose offset
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG -> "1",
GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG -> "10", // set small enough session timeout
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG -> "0",
GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG -> maxGroupSize.toString,
ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG -> "false",
ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG -> "true",
ReplicationConfigs.UNCLEAN_LEADER_ELECTION_INTERVAL_MS_CONFIG -> "50",
KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG -> "50",
KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG -> "300",
)
override def kraftControllerConfigs(testInfo: TestInfo): Seq[Properties] = {
super.kraftControllerConfigs(testInfo).map(props => {
testConfigs.foreachEntry((k, v) => props.setProperty(k, v))
props
})
}
private def generateKafkaConfigs(maxGroupSize: String = maxGroupSize.toString): Seq[KafkaConfig] = {
val properties = new Properties
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "3") // don't want to lose offset
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1")
properties.put(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "10") // set small enough session timeout
properties.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0")
properties.put(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG, maxGroupSize)
properties.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true")
properties.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false")
FixedPortTestUtils.createBrokerConfigs(brokerCount, zkConnect, enableControlledShutdown = false)
testConfigs.foreachEntry((k, v) => properties.setProperty(k, v))
FixedPortTestUtils.createBrokerConfigs(brokerCount, null, enableControlledShutdown = false)
.map(KafkaConfig.fromProps(_, properties))
}
@ -81,7 +94,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testConsumptionWithBrokerFailures(quorum: String, groupProtocol: String): Unit = consumeWithBrokerFailures(10)
/*
@ -126,7 +139,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testSeekAndCommitWithBrokerFailures(quorum: String, groupProtocol: String): Unit = seekAndCommitWithBrokerFailures(5)
def seekAndCommitWithBrokerFailures(numIters: Int): Unit = {
@ -139,7 +152,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
consumer.seek(tp, 0)
// wait until all the followers have synced the last HW with leader
TestUtils.waitUntilTrue(() => servers.forall(server =>
TestUtils.waitUntilTrue(() => brokerServers.forall(server =>
server.replicaManager.localLog(tp).get.highWatermark == numRecords
), "Failed to update high watermark for followers after timeout")
@ -170,7 +183,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testSubscribeWhenTopicUnavailable(quorum: String, groupProtocol: String): Unit = {
val numRecords = 1000
val newtopic = "newtopic"
@ -210,7 +223,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
receiveExactRecords(poller, numRecords, 10000)
poller.shutdown()
servers.foreach(server => killBroker(server.config.brokerId))
brokerServers.foreach(server => killBroker(server.config.brokerId))
Thread.sleep(500)
restartDeadBrokers()
@ -222,7 +235,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testClose(quorum: String, groupProtocol: String): Unit = {
val numRecords = 10
val producer = createProducer()
@ -293,7 +306,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
this.consumerConfig.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout.toString)
val consumer2 = createConsumerAndReceive(group2, manualAssign = true, numRecords)
servers.foreach(server => killBroker(server.config.brokerId))
brokerServers.foreach(server => killBroker(server.config.brokerId))
val closeTimeout = 2000
val future1 = submitCloseAndValidate(consumer1, closeTimeout, None, Some(closeTimeout))
val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(requestTimeout))
@ -325,7 +338,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
// roll all brokers with a lesser max group size to make sure coordinator has the new config
val newConfigs = generateKafkaConfigs(maxGroupSize.toString)
for (serverIdx <- servers.indices) {
for (serverIdx <- brokerServers.indices) {
killBroker(serverIdx)
val config = newConfigs(serverIdx)
servers(serverIdx) = TestUtils.createServer(config, time = brokerTime(config.brokerId))
@ -348,7 +361,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
* When we have the consumer group max size configured to X, the X+1th consumer trying to join should receive a fatal exception
*/
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize(quorum: String, groupProtocol: String): Unit = {
val group = "fatal-exception-test"
val topic = "fatal-exception-test"
@ -387,7 +400,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
* close should terminate immediately without sending leave group.
*/
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testCloseDuringRebalance(quorum: String, groupProtocol: String): Unit = {
val topic = "closetest"
createTopic(topic, 10, brokerCount)
@ -439,7 +452,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
// Trigger another rebalance and shutdown all brokers
// This consumer poll() doesn't complete and `tearDown` shuts down the executor and closes the consumer
createConsumerToRebalance()
servers.foreach(server => killBroker(server.config.brokerId))
brokerServers.foreach(server => killBroker(server.config.brokerId))
// consumer2 should close immediately without LeaveGroup request since there are no brokers available
val closeFuture2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(0))