KAFKA-18226: Disable CustomQuotaCallbackTest and remove isKRaftTest (#18166)

Reviewers: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
Ken Huang 2024-12-16 23:46:39 +08:00 committed by GitHub
parent 9cc1547672
commit 92f61b36f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 87 additions and 399 deletions

View File

@ -55,7 +55,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
overridingProps.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, 4.toString)
TestUtils.createBrokerConfigs(
numServers,
zkConnectOrNull,
null,
interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile,
saslProperties = serverSaslProperties

View File

@ -19,7 +19,6 @@ import kafka.api.GroupedUserQuotaCallback._
import kafka.security.{JaasModule, JaasTestUtils}
import kafka.server._
import kafka.utils.{Logging, TestInfoUtils, TestUtils}
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
@ -31,7 +30,7 @@ import org.apache.kafka.common.{Cluster, Reconfigurable}
import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs}
import org.apache.kafka.server.quota._
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
@ -42,6 +41,7 @@ import java.{lang, util}
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
@Disabled("KAFKA-18213")
class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
override protected def securityProtocol = SecurityProtocol.SASL_SSL
@ -86,8 +86,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = {
super.configureSecurityBeforeServersStart(testInfo)
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
createScramCredentials(zkConnect, JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD)
createScramCredentials("", JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@ -148,7 +147,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle = true)
// Remove the second topic with large number of partitions, verify no longer throttled
adminZkClient.deleteTopic(largeTopic)
deleteTopic(largeTopic)
user = addUser("group1_user3", brokerId)
user.waitForQuotaUpdate(8000 * 100, 2500 * 100, defaultRequestQuota)
user.removeThrottleMetrics() // since group was throttled before
@ -180,7 +179,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
private def createTopic(topic: String, numPartitions: Int, leader: Int): Unit = {
val assignment = (0 until numPartitions).map { i => i -> Seq(leader) }.toMap
TestUtils.createTopic(zkClient, topic, assignment, servers)
TestUtils.createTopic(null, topic, assignment, servers)
}
private def createAdminClient(): Admin = {

View File

@ -102,7 +102,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
this.serverConfig.setProperty(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, classOf[MockBrokerMetricsReporter].getName)
override def generateConfigs = {
val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnectOrNull, interBrokerSecurityProtocol = Some(securityProtocol),
val cfgs = TestUtils.createBrokerConfigs(serverCount, null, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
cfgs.foreach(_ ++= serverConfig)
cfgs.map(KafkaConfig.fromProps)

View File

@ -66,7 +66,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
}
override def generateConfigs: Seq[KafkaConfig] = {
val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull, interBrokerSecurityProtocol = Some(securityProtocol),
val cfgs = TestUtils.createBrokerConfigs(brokerCount, null, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount)
configureListeners(cfgs)
modifyConfigs(cfgs)
@ -74,11 +74,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,share"))
cfgs.foreach(_.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true"))
}
if(isKRaftTest()) {
cfgs.foreach(_.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, TestUtils.tempDir().getAbsolutePath))
}
insertControllerListenersIfNeeded(cfgs)
cfgs.map(KafkaConfig.fromProps)
}
@ -103,7 +99,6 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
}
private def insertControllerListenersIfNeeded(props: Seq[Properties]): Unit = {
if (isKRaftTest()) {
props.foreach { config =>
// Add a security protocol for the controller endpoints, if one is not already set.
val securityPairs = config.getProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "").split(",")
@ -115,7 +110,6 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
}
}
}
}
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {

View File

@ -48,7 +48,7 @@ class ProducerCompressionTest extends QuorumTestHarness {
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
val props = TestUtils.createBrokerConfig(brokerId, zkConnectOrNull)
val props = TestUtils.createBrokerConfig(brokerId, null)
broker = createBroker(new KafkaConfig(props))
}

View File

@ -52,7 +52,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 1.toString)
def generateConfigs =
TestUtils.createBrokerConfigs(numServers, zkConnectOrNull, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
TestUtils.createBrokerConfigs(numServers, null, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
private var producer1: KafkaProducer[Array[Byte], Array[Byte]] = _
private var producer2: KafkaProducer[Array[Byte], Array[Byte]] = _

View File

@ -52,7 +52,7 @@ class ProducerIdExpirationTest extends KafkaServerTestHarness {
var admin: Admin = _
override def generateConfigs: Seq[KafkaConfig] = {
TestUtils.createBrokerConfigs(3, zkConnectOrNull).map(KafkaConfig.fromProps(_, serverProps()))
TestUtils.createBrokerConfigs(3, null).map(KafkaConfig.fromProps(_, serverProps()))
}
@BeforeEach

View File

@ -78,7 +78,7 @@ class ProducerSendWhileDeletionTest extends IntegrationTestHarness {
deleteTopic(topic, listenerName)
// Verify that the topic is deleted when no metadata request comes in
TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 2, brokers)
TestUtils.verifyTopicDeletion(topic, 2, brokers)
// Producer should be able to send messages even after topic gets deleted and auto-created
assertEquals(topic, producer.send(new ProducerRecord(topic, null, "value".getBytes(StandardCharsets.UTF_8))).get.topic())

View File

@ -69,7 +69,7 @@ class TransactionsBounceTest extends IntegrationTestHarness {
// Since such quick rotation of servers is incredibly unrealistic, we allow this one test to preallocate ports, leaving
// a small risk of hitting errors due to port conflicts. Hopefully this is infrequent enough to not cause problems.
override def generateConfigs = {
FixedPortTestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull)
FixedPortTestUtils.createBrokerConfigs(brokerCount, null)
.map(KafkaConfig.fromProps(_, overridingProps))
}

View File

@ -51,7 +51,7 @@ class TransactionsExpirationTest extends KafkaServerTestHarness {
var admin: Admin = _
override def generateConfigs: Seq[KafkaConfig] = {
TestUtils.createBrokerConfigs(3, zkConnectOrNull).map(KafkaConfig.fromProps(_, serverProps()))
TestUtils.createBrokerConfigs(3, null).map(KafkaConfig.fromProps(_, serverProps()))
}
@BeforeEach

View File

@ -50,7 +50,7 @@ class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness {
val transactionalConsumers = mutable.Buffer[Consumer[Array[Byte], Array[Byte]]]()
override def generateConfigs: Seq[KafkaConfig] = {
TestUtils.createBrokerConfigs(numBrokers, zkConnectOrNull).map(KafkaConfig.fromProps(_, serverProps()))
TestUtils.createBrokerConfigs(numBrokers, null).map(KafkaConfig.fromProps(_, serverProps()))
}
@BeforeEach

View File

@ -52,7 +52,7 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
}
override def generateConfigs: collection.Seq[KafkaConfig] = {
TestUtils.createBrokerConfigs(numNodes, zkConnectOrNull, enableControlledShutdown = false, enableFetchFromFollower = true)
TestUtils.createBrokerConfigs(numNodes, null, enableControlledShutdown = false, enableFetchFromFollower = true)
.map(KafkaConfig.fromProps(_, overridingProps))
}

View File

@ -26,7 +26,6 @@ import java.util.{Collections, Locale, Optional, OptionalInt, Properties, stream
import java.util.concurrent.{CompletableFuture, TimeUnit}
import javax.security.auth.login.Configuration
import kafka.utils.{CoreUtils, Logging, TestInfoUtils, TestUtils}
import kafka.zk.{AdminZkClient, EmbeddedZookeeper, KafkaZkClient}
import org.apache.kafka.clients.admin.AdminClientUnitTestEnv
import org.apache.kafka.clients.consumer.GroupProtocol
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
@ -34,7 +33,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.common.utils.{Exit, Time}
import org.apache.kafka.common.{DirectoryId, Uuid}
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR}
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion}
@ -47,8 +46,6 @@ import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, MetadataVe
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
import org.apache.kafka.server.util.timer.SystemTimer
import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag, TestInfo}
import org.junit.jupiter.params.provider.Arguments
@ -69,30 +66,6 @@ trait QuorumImplementation {
def shutdown(): Unit
}
class ZooKeeperQuorumImplementation(
val zookeeper: EmbeddedZookeeper,
val zkConnect: String,
val zkClient: KafkaZkClient,
val adminZkClient: AdminZkClient,
val log: Logging
) extends QuorumImplementation {
override def createBroker(
config: KafkaConfig,
time: Time,
startup: Boolean,
threadNamePrefix: Option[String],
): KafkaBroker = {
val server = new KafkaServer(config, time, threadNamePrefix)
if (startup) server.startup()
server
}
override def shutdown(): Unit = {
Utils.closeQuietly(zkClient, "zk client")
CoreUtils.swallow(zookeeper.shutdown(), log)
}
}
class KRaftQuorumImplementation(
val controllerServer: ControllerServer,
val faultHandlerFactory: FaultHandlerFactory,
@ -172,11 +145,6 @@ class QuorumTestHarnessFaultHandlerFactory(
@Tag("integration")
abstract class QuorumTestHarness extends Logging {
val zkConnectionTimeout = 10000
val zkSessionTimeout = 15000 // Allows us to avoid ZK session expiration due to GC up to 2/3 * 15000ms = 10 secs
val zkMaxInFlightRequests = Int.MaxValue
protected def zkAclsEnabled: Option[Boolean] = None
/**
* When in KRaft mode, the security protocol to use for the controller listener.
@ -193,10 +161,6 @@ abstract class QuorumTestHarness extends Logging {
private var testInfo: TestInfo = _
protected var implementation: QuorumImplementation = _
def isKRaftTest(): Boolean = {
TestInfoUtils.isKRaft(testInfo)
}
def isShareGroupTest(): Boolean = {
TestInfoUtils.isShareGroupTest(testInfo)
}
@ -214,53 +178,11 @@ abstract class QuorumTestHarness extends Logging {
gp.get
}
def checkIsZKTest(): Unit = {
if (isKRaftTest()) {
throw new RuntimeException("This function can't be accessed when running the test " +
"in KRaft mode. ZooKeeper mode is required.")
}
}
def checkIsKRaftTest(): Unit = {
if (!isKRaftTest()) {
throw new RuntimeException("This function can't be accessed when running the test " +
"in ZooKeeper mode. KRaft mode is required.")
}
}
private def asZk(): ZooKeeperQuorumImplementation = {
checkIsZKTest()
implementation.asInstanceOf[ZooKeeperQuorumImplementation]
}
private def asKRaft(): KRaftQuorumImplementation = {
checkIsKRaftTest()
implementation.asInstanceOf[KRaftQuorumImplementation]
}
def zookeeper: EmbeddedZookeeper = asZk().zookeeper
def zkClient: KafkaZkClient = asZk().zkClient
def zkClientOrNull: KafkaZkClient = if (isKRaftTest()) null else asZk().zkClient
def adminZkClient: AdminZkClient = asZk().adminZkClient
def zkPort: Int = asZk().zookeeper.port
def zkConnect: String = s"127.0.0.1:$zkPort"
def zkConnectOrNull: String = if (isKRaftTest()) null else zkConnect
private def asKRaft(): KRaftQuorumImplementation = implementation.asInstanceOf[KRaftQuorumImplementation]
def controllerServer: ControllerServer = asKRaft().controllerServer
def controllerServers: Seq[ControllerServer] = {
if (isKRaftTest()) {
Seq(asKRaft().controllerServer)
} else {
Seq()
}
}
def controllerServers: Seq[ControllerServer] = Seq(asKRaft().controllerServer)
val faultHandlerFactory = new QuorumTestHarnessFaultHandlerFactory(new MockFaultHandler("quorumTestHarnessFaultHandler"))
@ -297,13 +219,8 @@ abstract class QuorumTestHarness extends Logging {
val name = testInfo.getTestMethod.toScala
.map(_.toString)
.getOrElse("[unspecified]")
if (TestInfoUtils.isKRaft(testInfo)) {
info(s"Running KRAFT test $name")
implementation = newKRaftQuorum(testInfo)
} else {
info(s"Running ZK test $name")
implementation = newZooKeeperQuorum()
}
}
def createBroker(
@ -315,8 +232,6 @@ abstract class QuorumTestHarness extends Logging {
implementation.createBroker(config, time, startup, threadNamePrefix)
}
def shutdownZooKeeper(): Unit = asZk().shutdown()
def shutdownKRaftController(): Unit = {
// Note that the RaftManager instance is left running; it will be shut down in tearDown()
val kRaftQuorumImplementation = asKRaft()
@ -438,38 +353,6 @@ abstract class QuorumTestHarness extends Logging {
)
}
private def newZooKeeperQuorum(): ZooKeeperQuorumImplementation = {
val zookeeper = new EmbeddedZookeeper()
var zkClient: KafkaZkClient = null
var adminZkClient: AdminZkClient = null
val zkConnect = s"127.0.0.1:${zookeeper.port}"
try {
zkClient = KafkaZkClient(
zkConnect,
zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled),
zkSessionTimeout,
zkConnectionTimeout,
zkMaxInFlightRequests,
Time.SYSTEM,
name = "ZooKeeperTestHarness",
new ZKClientConfig,
enableEntityConfigControllerCheck = false)
adminZkClient = new AdminZkClient(zkClient)
} catch {
case t: Throwable =>
CoreUtils.swallow(zookeeper.shutdown(), this)
Utils.closeQuietly(zkClient, "zk client")
throw t
}
new ZooKeeperQuorumImplementation(
zookeeper,
zkConnect,
zkClient,
adminZkClient,
this
)
}
@AfterEach
def tearDown(): Unit = {
if (implementation != null) {
@ -482,22 +365,9 @@ abstract class QuorumTestHarness extends Logging {
Configuration.setConfiguration(null)
faultHandler.maybeRethrowFirstException()
}
// Trigger session expiry by reusing the session id in another client
def createZooKeeperClientToTriggerSessionExpiry(zooKeeper: ZooKeeper): ZooKeeper = {
val dummyWatcher = new Watcher {
override def process(event: WatchedEvent): Unit = {}
}
val anotherZkClient = new ZooKeeper(zkConnect, 1000, dummyWatcher,
zooKeeper.getSessionId,
zooKeeper.getSessionPasswd)
assertNull(anotherZkClient.exists("/nonexistent", false)) // Make sure new client works
anotherZkClient
}
}
object QuorumTestHarness {
val ZkClientEventThreadSuffix = "-EventThread"
/**
* Verify that a previous test that doesn't use QuorumTestHarness hasn't left behind an unexpected thread.
@ -527,7 +397,6 @@ object QuorumTestHarness {
KafkaProducer.NETWORK_THREAD_PREFIX,
AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(),
AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
QuorumTestHarness.ZkClientEventThreadSuffix,
KafkaEventQueue.EVENT_HANDLER_THREAD_SUFFIX,
ClientMetricsManager.CLIENT_METRICS_REAPER_THREAD_NAME,
SystemTimer.SYSTEM_TIMER_THREAD_PREFIX,

View File

@ -31,19 +31,6 @@ class EmptyTestInfo extends TestInfo {
}
object TestInfoUtils {
def isKRaft(testInfo: TestInfo): Boolean = {
if (testInfo.getDisplayName.contains("quorum=")) {
if (testInfo.getDisplayName.contains("quorum=kraft")) {
true
} else if (testInfo.getDisplayName.contains("quorum=zk")) {
false
} else {
throw new RuntimeException(s"Unknown quorum value")
}
} else {
false
}
}
final val TestWithParameterizedQuorumAndGroupProtocolNames = "{displayName}.quorum={0}.groupProtocol={1}"

View File

@ -57,10 +57,7 @@ class AddPartitionsTest extends BaseRequestTest {
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
if (isKRaftTest()) {
brokers.foreach(broker => broker.asInstanceOf[BrokerServer].lifecycleManager.initialUnfenceFuture.get())
}
createTopicWithAssignment(topic1, partitionReplicaAssignment = topic1Assignment.map { case (k, v) => k -> v.replicas })
createTopicWithAssignment(topic2, partitionReplicaAssignment = topic2Assignment.map { case (k, v) => k -> v.replicas })
createTopicWithAssignment(topic3, partitionReplicaAssignment = topic3Assignment.map { case (k, v) => k -> v.replicas })

View File

@ -20,10 +20,7 @@ package kafka.integration
import kafka.server._
import kafka.utils.TestUtils
import kafka.utils.TestUtils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter}
import org.apache.kafka.common.errors.TopicExistsException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.resource.ResourcePattern
@ -60,10 +57,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
*/
def servers: mutable.Buffer[KafkaBroker] = brokers
def brokerServers: mutable.Buffer[BrokerServer] = {
checkIsKRaftTest()
_brokers.asInstanceOf[mutable.Buffer[BrokerServer]]
}
def brokerServers: mutable.Buffer[BrokerServer] = _brokers.asInstanceOf[mutable.Buffer[BrokerServer]]
var alive: Array[Boolean] = _
@ -155,13 +149,9 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
listenerName: ListenerName = listenerName,
adminClientConfig: Properties = new Properties
): Unit = {
if (isKRaftTest()) {
Using.resource(createAdminClient(brokers, listenerName, adminClientConfig)) { admin =>
TestUtils.createOffsetsTopicWithAdmin(admin, brokers, controllerServers)
}
} else {
createOffsetsTopic(zkClient, servers)
}
}
/**
@ -177,7 +167,6 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
listenerName: ListenerName = listenerName,
adminClientConfig: Properties = new Properties
): scala.collection.immutable.Map[Int, Int] = {
if (isKRaftTest()) {
Using.resource(createAdminClient(brokers, listenerName, adminClientConfig)) { admin =>
TestUtils.createTopicWithAdmin(
admin = admin,
@ -189,16 +178,6 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
topicConfig = topicConfig
)
}
} else {
TestUtils.createTopic(
zkClient = zkClient,
topic = topic,
numPartitions = numPartitions,
replicationFactor = replicationFactor,
servers = servers,
topicConfig = topicConfig
)
}
}
/**
@ -210,8 +189,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
topic: String,
partitionReplicaAssignment: collection.Map[Int, Seq[Int]],
listenerName: ListenerName = listenerName
): scala.collection.immutable.Map[Int, Int] =
if (isKRaftTest()) {
): scala.collection.immutable.Map[Int, Int] = {
Using.resource(createAdminClient(brokers, listenerName)) { admin =>
TestUtils.createTopicWithAdmin(
admin = admin,
@ -221,20 +199,12 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
controllers = controllerServers
)
}
} else {
TestUtils.createTopic(
zkClient,
topic,
partitionReplicaAssignment,
servers
)
}
def deleteTopic(
topic: String,
listenerName: ListenerName = listenerName
): Unit = {
if (isKRaftTest()) {
Using.resource(createAdminClient(brokers, listenerName)) { admin =>
TestUtils.deleteTopicWithAdmin(
admin = admin,
@ -242,9 +212,6 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
brokers = aliveBrokers,
controllers = controllerServers)
}
} else {
adminZkClient.deleteTopic(topic)
}
}
def addAndVerifyAcls(acls: Set[AccessControlEntry], resource: ResourcePattern): Unit = {
@ -380,11 +347,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
}
private def createBrokerFromConfig(config: KafkaConfig): KafkaBroker = {
if (isKRaftTest()) {
createBroker(config, brokerTime(config.brokerId), startup = false)
} else {
TestUtils.createServer(config, time = brokerTime(config.brokerId), threadNamePrefix = None, startup = false)
}
}
def aliveBrokers: Seq[KafkaBroker] = {
@ -392,16 +355,13 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
}
def ensureConsistentKRaftMetadata(): Unit = {
if (isKRaftTest()) {
TestUtils.ensureConsistentKRaftMetadata(
aliveBrokers,
controllerServer
)
}
}
def changeClientIdConfig(sanitizedClientId: String, configs: Properties): Unit = {
if (isKRaftTest()) {
Using.resource(createAdminClient(brokers, listenerName)) {
admin => {
admin.alterClientQuotas(Collections.singleton(
@ -411,44 +371,4 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
}
}
}
else {
adminZkClient.changeClientIdConfig(sanitizedClientId, configs)
}
}
/**
* Ensures that the consumer offsets/group metadata topic exists. If it does not, the topic is created and the method waits
* until the leader is elected and metadata is propagated to all brokers. If it does, the method verifies that it has
* the expected number of partition and replication factor however it does not guarantee that the topic is empty.
*/
private def createOffsetsTopic(zkClient: KafkaZkClient, servers: Seq[KafkaBroker]): Unit = {
val server = servers.head
val numPartitions = server.config.groupCoordinatorConfig.offsetsTopicPartitions
val replicationFactor = server.config.groupCoordinatorConfig.offsetsTopicReplicationFactor.toInt
try {
TestUtils.createTopic(
zkClient,
Topic.GROUP_METADATA_TOPIC_NAME,
numPartitions,
replicationFactor,
servers,
server.groupCoordinator.groupMetadataTopicConfigs
)
} catch {
case ex: TopicExistsException =>
val allPartitionsMetadata = waitForAllPartitionsMetadata(
servers,
Topic.GROUP_METADATA_TOPIC_NAME,
numPartitions
)
// If the topic already exists, we ensure that it has the required
// number of partitions and replication factor. If it has not, the
// exception is thrown further.
if (allPartitionsMetadata.size != numPartitions || allPartitionsMetadata.head._2.replicas.size != replicationFactor) {
throw ex
}
}
}
}

View File

@ -51,7 +51,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with
@volatile private var running = true
override def generateConfigs = TestUtils.createBrokerConfigs(nodesNum, zkConnectOrNull)
override def generateConfigs = TestUtils.createBrokerConfigs(nodesNum, null)
.map(KafkaConfig.fromProps(_, overridingProps))
@BeforeEach
@ -148,7 +148,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with
for (t <- topics if running) {
try {
deleteTopic(t)
TestUtils.verifyTopicDeletion(null, t, partitionNum, servers)
TestUtils.verifyTopicDeletion(t, partitionNum, servers)
} catch {
case e: Exception => e.printStackTrace()
}

View File

@ -29,7 +29,7 @@ import org.junit.jupiter.params.provider.ValueSource
class MinIsrConfigTest extends KafkaServerTestHarness {
val overridingProps = new Properties()
overridingProps.put(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG, "5")
def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnectOrNull).map(KafkaConfig.fromProps(_, overridingProps))
def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, null).map(KafkaConfig.fromProps(_, overridingProps))
@ParameterizedTest
@ValueSource(strings = Array("kraft"))

View File

@ -71,8 +71,8 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
configProps1 = createBrokerConfig(brokerId1, zkConnectOrNull)
configProps2 = createBrokerConfig(brokerId2, zkConnectOrNull)
configProps1 = createBrokerConfig(brokerId1, null)
configProps2 = createBrokerConfig(brokerId2, null)
for (configProps <- List(configProps1, configProps2)) {
configProps.put("controlled.shutdown.enable", enableControlledShutdown.toString)

View File

@ -62,7 +62,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
val topic = "test-topic-metric"
createTopic(topic)
deleteTopic(topic)
TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 1, brokers)
TestUtils.verifyTopicDeletion(topic, 1, brokers)
assertEquals(Set.empty, topicMetricGroups(topic), "Topic metrics exists after deleteTopic")
}
@ -77,7 +77,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
assertTrue(topicMetricGroups(topic).nonEmpty, "Topic metrics don't exist")
brokers.foreach(b => assertNotNull(b.brokerTopicStats.topicStats(topic)))
deleteTopic(topic)
TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 1, brokers)
TestUtils.verifyTopicDeletion(topic, 1, brokers)
assertEquals(Set.empty, topicMetricGroups(topic), "Topic metrics exists after deleteTopic")
}

View File

@ -69,16 +69,9 @@ abstract class BaseRequestTest extends IntegrationTestHarness {
/**
* Return the socket server where admin request to be sent.
*
* For KRaft clusters that is any broker as the broker will forward the request to the active
* controller. For Legacy clusters that is the controller broker.
* KRaft clusters that is any broker as the broker will forward the request to the active controller.
*/
def adminSocketServer: SocketServer = {
if (isKRaftTest()) {
anySocketServer
} else {
controllerSocketServer
}
}
def adminSocketServer: SocketServer = anySocketServer
def connect(socketServer: SocketServer = anySocketServer,
listenerName: ListenerName = listenerName): Socket = {

View File

@ -40,7 +40,7 @@ class DeleteTopicsRequestWithDeletionDisabledTest extends BaseRequestTest {
}
override def generateConfigs = {
val props = TestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull,
val props = TestUtils.createBrokerConfigs(brokerCount, null,
enableControlledShutdown = false, enableDeleteTopic = false,
interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount)

View File

@ -44,7 +44,7 @@ import scala.jdk.CollectionConverters._
class EdgeCaseRequestTest extends KafkaServerTestHarness {
def generateConfigs = {
val props = TestUtils.createBrokerConfig(1, zkConnectOrNull)
val props = TestUtils.createBrokerConfig(1, null)
props.setProperty(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false")
List(KafkaConfig.fromProps(props))
}

View File

@ -71,7 +71,7 @@ class KafkaMetricsReporterTest extends QuorumTestHarness {
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
val props = TestUtils.createBrokerConfig(1, zkConnectOrNull)
val props = TestUtils.createBrokerConfig(1, null)
props.setProperty(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, "kafka.server.KafkaMetricsReporterTest$MockMetricsReporter")
props.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, "true")
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")

View File

@ -82,7 +82,7 @@ class LogRecoveryTest extends QuorumTestHarness {
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
configs = TestUtils.createBrokerConfigs(2, zkConnectOrNull, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
configs = TestUtils.createBrokerConfigs(2, null, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
// start both servers
server1 = createBroker(configProps1)

View File

@ -88,7 +88,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
* regular replication works as expected.
*/
brokers = (100 to 105).map { id => createBroker(fromProps(createBrokerConfig(id, zkConnectOrNull))) }
brokers = (100 to 105).map { id => createBroker(fromProps(createBrokerConfig(id, null))) }
//Given six partitions, led on nodes 0,1,2,3,4,5 but with followers on node 6,7 (not started yet)
//And two extra partitions 6,7, which we don't intend on throttling.
@ -202,7 +202,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
*/
//2 brokers with 1MB Segment Size & 1 partition
val config: Properties = createBrokerConfig(100, zkConnectOrNull)
val config: Properties = createBrokerConfig(100, null)
config.put("log.segment.bytes", (1024 * 1024).toString)
brokers = Seq(createBroker(fromProps(config)))
@ -231,7 +231,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
//Start the new broker (and hence start replicating)
debug("Starting new broker")
brokers = brokers :+ createBroker(fromProps(createBrokerConfig(101, zkConnectOrNull)))
brokers = brokers :+ createBroker(fromProps(createBrokerConfig(101, null)))
val start = System.currentTimeMillis()
waitForOffsetsToMatch(msgCount, 0, 101)
@ -261,7 +261,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
def createBrokers(brokerIds: Seq[Int]): Unit = {
brokerIds.foreach { id =>
brokers = brokers :+ createBroker(fromProps(createBrokerConfig(id, zkConnectOrNull)))
brokers = brokers :+ createBroker(fromProps(createBrokerConfig(id, null)))
}
}

View File

@ -68,7 +68,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader(quorum: String): Unit = {
brokers ++= (0 to 1).map { id => createBroker(fromProps(createBrokerConfig(id, zkConnectOrNull))) }
brokers ++= (0 to 1).map { id => createBroker(fromProps(createBrokerConfig(id, null))) }
// Given two topics with replication of a single partition
for (topic <- List(topic1, topic2)) {
@ -103,7 +103,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
def shouldSendLeaderEpochRequestAndGetAResponse(quorum: String): Unit = {
//3 brokers, put partition on 100/101 and then pretend to be 102
brokers ++= (100 to 102).map { id => createBroker(fromProps(createBrokerConfig(id, zkConnectOrNull))) }
brokers ++= (100 to 102).map { id => createBroker(fromProps(createBrokerConfig(id, null))) }
val assignment1 = Map(0 -> Seq(100), 1 -> Seq(101))
createTopic(topic1, assignment1)
@ -150,10 +150,10 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
@ValueSource(strings = Array("kraft"))
def shouldIncreaseLeaderEpochBetweenLeaderRestarts(quorum: String): Unit = {
//Setup: we are only interested in the single partition on broker 101
brokers += createBroker(fromProps(createBrokerConfig(100, zkConnectOrNull)))
brokers += createBroker(fromProps(createBrokerConfig(100, null)))
assertEquals(controllerServer.config.nodeId, waitUntilQuorumLeaderElected(controllerServer))
brokers += createBroker(fromProps(createBrokerConfig(101, zkConnectOrNull)))
brokers += createBroker(fromProps(createBrokerConfig(101, null)))
def leo() = brokers(1).replicaManager.localLog(tp).get.logEndOffset

View File

@ -526,39 +526,6 @@ object TestUtils extends Logging {
controllers.foreach(controller => ensureConsistentKRaftMetadata(brokers, controller))
}
/**
* Create a topic in ZooKeeper.
* Wait until the leader is elected and the metadata is propagated to all brokers.
* Return the leader for each partition.
*/
def createTopic(zkClient: KafkaZkClient,
topic: String,
numPartitions: Int = 1,
replicationFactor: Int = 1,
servers: Seq[KafkaBroker],
topicConfig: Properties = new Properties): scala.collection.immutable.Map[Int, Int] = {
val adminZkClient = new AdminZkClient(zkClient)
// create topic
waitUntilTrue( () => {
var hasSessionExpirationException = false
try {
adminZkClient.createTopic(topic, numPartitions, replicationFactor, topicConfig)
} catch {
case _: SessionExpiredException => hasSessionExpirationException = true
case e: Throwable => throw e // let other exceptions propagate
}
!hasSessionExpirationException},
s"Can't create topic $topic")
// wait until we've propagated all partitions metadata to all servers
val allPartitionsMetadata = waitForAllPartitionsMetadata(servers, topic, numPartitions)
(0 until numPartitions).map { i =>
i -> allPartitionsMetadata.get(new TopicPartition(topic, i)).map(_.leader()).getOrElse(
throw new IllegalStateException(s"Cannot get the partition leader for topic: $topic, partition: $i in server metadata cache"))
}.toMap
}
/**
* Create a topic in ZooKeeper using a customized replica assignment.
* Wait until the leader is elected and the metadata is propagated to all brokers.
@ -698,31 +665,6 @@ object TestUtils extends Logging {
new KafkaConsumer[K, V](consumerProps, keyDeserializer, valueDeserializer)
}
/**
* If neither oldLeaderOpt nor newLeaderOpt is defined, wait until the leader of a partition is elected.
* If oldLeaderOpt is defined, it waits until the new leader is different from the old leader.
* If newLeaderOpt is defined, it waits until the new leader becomes the expected new leader.
*
* @return The new leader (note that negative values are used to indicate conditions like NoLeader and
* LeaderDuringDelete).
* @throws AssertionError if the expected condition is not true within the timeout.
*/
def waitUntilLeaderIsElectedOrChanged(
zkClient: KafkaZkClient,
topic: String,
partition: Int,
timeoutMs: Long = 30000L,
oldLeaderOpt: Option[Int] = None,
newLeaderOpt: Option[Int] = None,
ignoreNoLeader: Boolean = false
): Int = {
def getPartitionLeader(topic: String, partition: Int): Option[Int] = {
zkClient.getLeaderForPartition(new TopicPartition(topic, partition))
.filter(p => !ignoreNoLeader || p != LeaderAndIsr.NO_LEADER)
}
doWaitUntilLeaderIsElectedOrChanged(getPartitionLeader, topic, partition, timeoutMs, oldLeaderOpt, newLeaderOpt)
}
/**
* If neither oldLeaderOpt nor newLeaderOpt is defined, wait until the leader of a partition is elected.
* If oldLeaderOpt is defined, it waits until the new leader is different from the old leader.
@ -1033,11 +975,6 @@ object TestUtils extends Logging {
}, msg)
}
def waitUntilControllerElected(zkClient: KafkaZkClient, timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = {
val (controllerId, _) = computeUntilTrue(zkClient.getControllerId, waitTime = timeout)(_.isDefined)
controllerId.getOrElse(throw new AssertionError(s"Controller not elected after $timeout ms"))
}
def awaitLeaderChange[B <: KafkaBroker](
brokers: Seq[B],
tp: TopicPartition,
@ -1240,18 +1177,10 @@ object TestUtils extends Logging {
}
def verifyTopicDeletion[B <: KafkaBroker](
zkClient: KafkaZkClient,
topic: String,
numPartitions: Int,
brokers: Seq[B]): Unit = {
val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _))
if (zkClient != null) {
// wait until admin path for delete topic is deleted, signaling completion of topic deletion
waitUntilTrue(() => !zkClient.isTopicMarkedForDeletion(topic),
"Admin path /admin/delete_topics/%s path not deleted even after a replica is restarted".format(topic))
waitUntilTrue(() => !zkClient.topicExists(topic),
"Topic path /brokers/topics/%s not deleted after /admin/delete_topics/%s path is deleted".format(topic, topic))
}
// ensure that the topic-partition has been deleted from all brokers' replica managers
waitUntilTrue(() =>
brokers.forall(broker => topicPartitions.forall(tp => broker.replicaManager.onlinePartition(tp).isEmpty)),