MINOR: disable some rebootstrap tests, convert the others to KRaft (#17765)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Colin Patrick McCabe 2024-12-12 09:59:20 -08:00 committed by GitHub
parent 772aa241b2
commit 65820acad2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 42 additions and 88 deletions

View File

@ -20,7 +20,7 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
class AdminClientRebootstrapTest extends RebootstrapTest {
@ParameterizedTest
@ParameterizedTest(name = "{displayName}.quorum=kraft.useRebootstrapTriggerMs={0}")
@ValueSource(booleans = Array(false, true))
def testRebootstrap(useRebootstrapTriggerMs: Boolean): Unit = {

View File

@ -17,11 +17,12 @@
package kafka.api
import kafka.api.ConsumerRebootstrapTest._
import kafka.server.QuorumTestHarness.getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit
import kafka.server.QuorumTestHarness.getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
@ -31,6 +32,7 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
class ConsumerRebootstrapTest extends RebootstrapTest {
@Disabled("KAFKA-17986")
@ParameterizedTest(name = RebootstrapTestName)
@MethodSource(Array("rebootstrapTestParams"))
def testRebootstrap(quorum: String, groupProtocol: String, useRebootstrapTriggerMs: Boolean): Unit = {
@ -84,6 +86,7 @@ class ConsumerRebootstrapTest extends RebootstrapTest {
consumeAndVerifyRecords(consumer, 10, 20, startingKeyAndValueIndex = 20, startingTimestamp = 20)
}
@Disabled
@ParameterizedTest(name = RebootstrapTestName)
@MethodSource(Array("rebootstrapTestParams"))
def testRebootstrapDisabled(quorum: String, groupProtocol: String, useRebootstrapTriggerMs: Boolean): Unit = {
@ -133,8 +136,8 @@ object ConsumerRebootstrapTest {
final val RebootstrapTestName = s"${TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames}.useRebootstrapTriggerMs={2}"
def rebootstrapTestParams: stream.Stream[Arguments] = {
assertEquals(1, getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit.count())
val args = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit
assertEquals(1, getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly.count())
val args = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
.findFirst().get.get
stream.Stream.of(
Arguments.of((args :+ true):_*),

View File

@ -235,7 +235,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
createProducer(), createConsumer(), adminClient)
}
case class GroupedUser(user: String, userGroup: String, topic: String, leaderNode: KafkaServer,
case class GroupedUser(user: String, userGroup: String, topic: String, leaderNode: KafkaBroker,
producerClientId: String, consumerClientId: String,
override val producer: KafkaProducer[Array[Byte], Array[Byte]],
override val consumer: Consumer[Array[Byte], Array[Byte]],

View File

@ -18,11 +18,13 @@ package kafka.api
import org.apache.kafka.clients.producer.ProducerRecord
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
class ProducerRebootstrapTest extends RebootstrapTest {
@ParameterizedTest
@Disabled("KAFKA-17986")
@ParameterizedTest(name = "{displayName}.quorum=kraft.useRebootstrapTriggerMs={0}")
@ValueSource(booleans = Array(false, true))
def testRebootstrap(useRebootstrapTriggerMs: Boolean): Unit = {
server1.shutdown()

View File

@ -16,7 +16,7 @@
*/
package kafka.api
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.server.{KafkaBroker, KafkaConfig}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
@ -26,8 +26,8 @@ import java.util.Properties
abstract class RebootstrapTest extends AbstractConsumerTest {
override def brokerCount: Int = 2
def server0: KafkaServer = serverForId(0).get
def server1: KafkaServer = serverForId(1).get
def server0: KafkaBroker = serverForId(0).get
def server1: KafkaBroker = serverForId(1).get
override def generateConfigs: Seq[KafkaConfig] = {
val overridingProps = new Properties()
@ -36,7 +36,7 @@ abstract class RebootstrapTest extends AbstractConsumerTest {
// In this test, fixed ports are necessary, because brokers must have the
// same port after the restart.
FixedPortTestUtils.createBrokerConfigs(brokerCount, zkConnect, enableControlledShutdown = false)
FixedPortTestUtils.createBrokerConfigs(brokerCount, null, enableControlledShutdown = false)
.map(KafkaConfig.fromProps(_, overridingProps))
}

View File

@ -51,18 +51,14 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
private val _brokers = new mutable.ArrayBuffer[KafkaBroker]
/**
* Get the list of brokers, which could be either BrokerServer objects or KafkaServer objects.
* Get the list of brokers.
*/
def brokers: mutable.Buffer[KafkaBroker] = _brokers
/**
* Get the list of brokers, as instances of KafkaServer.
* This method should only be used when dealing with brokers that use ZooKeeper.
* Get the list of brokers.
*/
def servers: mutable.Buffer[KafkaServer] = {
checkIsZKTest()
_brokers.asInstanceOf[mutable.Buffer[KafkaServer]]
}
def servers: mutable.Buffer[KafkaBroker] = brokers
def brokerServers: mutable.Buffer[BrokerServer] = {
checkIsKRaftTest()
@ -102,9 +98,9 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
instanceConfigs
}
def serverForId(id: Int): Option[KafkaServer] = servers.find(s => s.config.brokerId == id)
def serverForId(id: Int): Option[KafkaBroker] = brokers.find(s => s.config.brokerId == id)
def boundPort(server: KafkaServer): Int = server.boundPort(listenerName)
def boundPort(server: KafkaBroker): Int = server.boundPort(listenerName)
def bootstrapServers(listenerName: ListenerName = listenerName): String = {
TestUtils.bootstrapServers(_brokers, listenerName)
@ -345,47 +341,26 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
}
}
def getController(): KafkaServer = {
checkIsZKTest()
val controllerId = TestUtils.waitUntilControllerElected(zkClient)
servers.filter(s => s.config.brokerId == controllerId).head
}
def getTopicIds(names: Seq[String]): Map[String, Uuid] = {
val result = new util.HashMap[String, Uuid]()
if (isKRaftTest()) {
val topicIdsMap = controllerServer.controller.findTopicIds(ANONYMOUS_CONTEXT, names.asJava).get()
names.foreach { name =>
val response = topicIdsMap.get(name)
result.put(name, response.result())
}
} else {
val topicIdsMap = getController().kafkaController.controllerContext.topicIds.toMap
names.foreach { name =>
if (topicIdsMap.contains(name)) result.put(name, topicIdsMap(name))
}
val topicIdsMap = controllerServer.controller.findTopicIds(ANONYMOUS_CONTEXT, names.asJava).get()
names.foreach { name =>
val response = topicIdsMap.get(name)
result.put(name, response.result())
}
result.asScala.toMap
}
def getTopicIds(): Map[String, Uuid] = {
if (isKRaftTest()) {
controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().asScala.toMap
} else {
getController().kafkaController.controllerContext.topicIds.toMap
}
controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().asScala.toMap
}
def getTopicNames(): Map[Uuid, String] = {
if (isKRaftTest()) {
val result = new util.HashMap[Uuid, String]()
controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().forEach {
(key, value) => result.put(value, key)
}
result.asScala.toMap
} else {
getController().kafkaController.controllerContext.topicNames.toMap
val result = new util.HashMap[Uuid, String]()
controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().forEach {
(key, value) => result.put(value, key)
}
result.asScala.toMap
}
private def createBrokers(startup: Boolean): Unit = {

View File

@ -56,25 +56,9 @@ abstract class BaseRequestTest extends IntegrationTestHarness {
}.map(_.socketServer).getOrElse(throw new IllegalStateException("No live broker is available"))
}
def controllerSocketServer: SocketServer = {
if (isKRaftTest()) {
controllerServer.socketServer
} else {
servers.find { server =>
server.kafkaController.isActive
}.map(_.socketServer).getOrElse(throw new IllegalStateException("No controller broker is available"))
}
}
def controllerSocketServer: SocketServer = controllerServer.socketServer
def notControllerSocketServer: SocketServer = {
if (isKRaftTest()) {
anySocketServer
} else {
servers.find { server =>
!server.kafkaController.isActive
}.map(_.socketServer).getOrElse(throw new IllegalStateException("No non-controller broker is available"))
}
}
def notControllerSocketServer: SocketServer = anySocketServer
def brokerSocketServer(brokerId: Int): SocketServer = {
brokers.find { broker =>

View File

@ -20,7 +20,6 @@ import java.io.File
import java.util.Collections
import java.util.concurrent.{ExecutionException, TimeUnit}
import kafka.api.IntegrationTestHarness
import kafka.controller.{OfflineReplica, PartitionAndReplica}
import kafka.utils.TestUtils.{Checkpoint, LogDirFailureType, Roll, waitUntilTrue}
import kafka.utils.{CoreUtils, TestInfoUtils, TestUtils}
import org.apache.kafka.clients.consumer.Consumer
@ -195,26 +194,17 @@ class LogDirFailureTest extends IntegrationTestHarness {
// Consumer should receive some messages
TestUtils.pollUntilAtLeastNumRecords(consumer, 1)
if (quorum == "kraft") {
waitUntilTrue(() => {
// get the broker with broker.nodeId == originalLeaderServerId
val brokerWithDirFail = brokers.find(_.config.nodeId == originalLeaderServerId).map(_.asInstanceOf[BrokerServer])
// check if the broker has the offline log dir
val hasOfflineDir = brokerWithDirFail.exists(_.logDirFailureChannel.hasOfflineLogDir(failedLogDir.toPath.toString))
// check if the broker has the offline replica
hasOfflineDir && brokerWithDirFail.exists(broker =>
broker.replicaManager.metadataCache
.getClusterMetadata(broker.clusterId, broker.config.interBrokerListenerName)
.partition(new TopicPartition(topic, 0)).offlineReplicas().map(_.id()).contains(originalLeaderServerId))
}, "Expected to find an offline log dir")
} else {
// There should be no remaining LogDirEventNotification znode
assertTrue(zkClient.getAllLogDirEventNotifications.isEmpty)
// The controller should have marked the replica on the original leader as offline
val controllerServer = servers.find(_.kafkaController.isActive).get
val offlineReplicas = controllerServer.kafkaController.controllerContext.replicasInState(topic, OfflineReplica)
assertTrue(offlineReplicas.contains(PartitionAndReplica(new TopicPartition(topic, 0), originalLeaderServerId)))
}
waitUntilTrue(() => {
// get the broker with broker.nodeId == originalLeaderServerId
val brokerWithDirFail = brokers.find(_.config.nodeId == originalLeaderServerId).map(_.asInstanceOf[BrokerServer])
// check if the broker has the offline log dir
val hasOfflineDir = brokerWithDirFail.exists(_.logDirFailureChannel.hasOfflineLogDir(failedLogDir.toPath.toString))
// check if the broker has the offline replica
hasOfflineDir && brokerWithDirFail.exists(broker =>
broker.replicaManager.metadataCache
.getClusterMetadata(broker.clusterId, broker.config.interBrokerListenerName)
.partition(new TopicPartition(topic, 0)).offlineReplicas().map(_.id()).contains(originalLeaderServerId))
}, "Expected to find an offline log dir")
}