diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index 99aefe0e51b..c8c36730f2c 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -55,7 +55,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { overridingProps.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, 4.toString) TestUtils.createBrokerConfigs( numServers, - zkConnectOrNull, + null, interBrokerSecurityProtocol = Some(securityProtocol), trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala index 4141342c7a9..a18c03fb593 100644 --- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala +++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala @@ -19,7 +19,6 @@ import kafka.api.GroupedUserQuotaCallback._ import kafka.security.{JaasModule, JaasTestUtils} import kafka.server._ import kafka.utils.{Logging, TestInfoUtils, TestUtils} -import kafka.zk.ConfigEntityChangeNotificationZNode import org.apache.kafka.clients.admin.{Admin, AdminClientConfig} import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} @@ -31,7 +30,7 @@ import org.apache.kafka.common.{Cluster, Reconfigurable} import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs} import org.apache.kafka.server.quota._ import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource @@ -42,6 +41,7 @@ import java.{lang, util} import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ +@Disabled("KAFKA-18213") class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { override protected def securityProtocol = SecurityProtocol.SASL_SSL @@ -86,8 +86,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = { super.configureSecurityBeforeServersStart(testInfo) - zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path) - createScramCredentials(zkConnect, JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD) + createScramCredentials("", JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD) } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @@ -148,7 +147,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle = true) // Remove the second topic with large number of partitions, verify no longer throttled - adminZkClient.deleteTopic(largeTopic) + deleteTopic(largeTopic) user = addUser("group1_user3", brokerId) user.waitForQuotaUpdate(8000 * 100, 2500 * 100, defaultRequestQuota) user.removeThrottleMetrics() // since group was throttled before @@ -180,7 +179,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { private def createTopic(topic: String, numPartitions: Int, leader: Int): Unit = { val assignment = (0 until numPartitions).map { i => i -> Seq(leader) }.toMap - TestUtils.createTopic(zkClient, topic, assignment, servers) + TestUtils.createTopic(null, topic, assignment, servers) } private def createAdminClient(): Admin = { diff --git a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala index 6c8d119daee..0cc927bd11d 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala @@ -102,7 +102,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness { this.serverConfig.setProperty(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, classOf[MockBrokerMetricsReporter].getName) override def generateConfigs = { - val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnectOrNull, interBrokerSecurityProtocol = Some(securityProtocol), + val cfgs = TestUtils.createBrokerConfigs(serverCount, null, interBrokerSecurityProtocol = Some(securityProtocol), trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties) cfgs.foreach(_ ++= serverConfig) cfgs.map(KafkaConfig.fromProps) diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index b7efed1d495..b660ff35219 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -66,7 +66,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { } override def generateConfigs: Seq[KafkaConfig] = { - val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull, interBrokerSecurityProtocol = Some(securityProtocol), + val cfgs = TestUtils.createBrokerConfigs(brokerCount, null, interBrokerSecurityProtocol = Some(securityProtocol), trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount) configureListeners(cfgs) modifyConfigs(cfgs) @@ -74,11 +74,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,share")) cfgs.foreach(_.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true")) } - - if(isKRaftTest()) { - cfgs.foreach(_.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, TestUtils.tempDir().getAbsolutePath)) - } - + cfgs.foreach(_.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, TestUtils.tempDir().getAbsolutePath)) insertControllerListenersIfNeeded(cfgs) cfgs.map(KafkaConfig.fromProps) } @@ -103,16 +99,14 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { } private def insertControllerListenersIfNeeded(props: Seq[Properties]): Unit = { - if (isKRaftTest()) { - props.foreach { config => - // Add a security protocol for the controller endpoints, if one is not already set. - val securityPairs = config.getProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "").split(",") - val toAdd = config.getProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "").split(",").filter( - e => !securityPairs.exists(_.startsWith(s"$e:"))) - if (toAdd.nonEmpty) { - config.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, (securityPairs ++ - toAdd.map(e => s"$e:${controllerListenerSecurityProtocol.toString}")).mkString(",")) - } + props.foreach { config => + // Add a security protocol for the controller endpoints, if one is not already set. + val securityPairs = config.getProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "").split(",") + val toAdd = config.getProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "").split(",").filter( + e => !securityPairs.exists(_.startsWith(s"$e:"))) + if (toAdd.nonEmpty) { + config.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, (securityPairs ++ + toAdd.map(e => s"$e:${controllerListenerSecurityProtocol.toString}")).mkString(",")) } } } diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index 521b9cc0a0f..31375752892 100755 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -48,7 +48,7 @@ class ProducerCompressionTest extends QuorumTestHarness { @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) - val props = TestUtils.createBrokerConfig(brokerId, zkConnectOrNull) + val props = TestUtils.createBrokerConfig(brokerId, null) broker = createBroker(new KafkaConfig(props)) } diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index a30d440f325..92eac4f1230 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -52,7 +52,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 1.toString) def generateConfigs = - TestUtils.createBrokerConfigs(numServers, zkConnectOrNull, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps)) + TestUtils.createBrokerConfigs(numServers, null, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps)) private var producer1: KafkaProducer[Array[Byte], Array[Byte]] = _ private var producer2: KafkaProducer[Array[Byte], Array[Byte]] = _ diff --git a/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala index 918d79b436e..2dee826dc9f 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala @@ -52,7 +52,7 @@ class ProducerIdExpirationTest extends KafkaServerTestHarness { var admin: Admin = _ override def generateConfigs: Seq[KafkaConfig] = { - TestUtils.createBrokerConfigs(3, zkConnectOrNull).map(KafkaConfig.fromProps(_, serverProps())) + TestUtils.createBrokerConfigs(3, null).map(KafkaConfig.fromProps(_, serverProps())) } @BeforeEach diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala index 397e4660da7..8d3e76e7448 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala @@ -78,7 +78,7 @@ class ProducerSendWhileDeletionTest extends IntegrationTestHarness { deleteTopic(topic, listenerName) // Verify that the topic is deleted when no metadata request comes in - TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 2, brokers) + TestUtils.verifyTopicDeletion(topic, 2, brokers) // Producer should be able to send messages even after topic gets deleted and auto-created assertEquals(topic, producer.send(new ProducerRecord(topic, null, "value".getBytes(StandardCharsets.UTF_8))).get.topic()) diff --git a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala index 1d1eca60ead..4f6fb7e483c 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala @@ -69,7 +69,7 @@ class TransactionsBounceTest extends IntegrationTestHarness { // Since such quick rotation of servers is incredibly unrealistic, we allow this one test to preallocate ports, leaving // a small risk of hitting errors due to port conflicts. Hopefully this is infrequent enough to not cause problems. override def generateConfigs = { - FixedPortTestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull) + FixedPortTestUtils.createBrokerConfigs(brokerCount, null) .map(KafkaConfig.fromProps(_, overridingProps)) } diff --git a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala index b0e291c5efc..26ed880aa1a 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala @@ -51,7 +51,7 @@ class TransactionsExpirationTest extends KafkaServerTestHarness { var admin: Admin = _ override def generateConfigs: Seq[KafkaConfig] = { - TestUtils.createBrokerConfigs(3, zkConnectOrNull).map(KafkaConfig.fromProps(_, serverProps())) + TestUtils.createBrokerConfigs(3, null).map(KafkaConfig.fromProps(_, serverProps())) } @BeforeEach diff --git a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala index 53899a43743..fe1ea323162 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala @@ -50,7 +50,7 @@ class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness { val transactionalConsumers = mutable.Buffer[Consumer[Array[Byte], Array[Byte]]]() override def generateConfigs: Seq[KafkaConfig] = { - TestUtils.createBrokerConfigs(numBrokers, zkConnectOrNull).map(KafkaConfig.fromProps(_, serverProps())) + TestUtils.createBrokerConfigs(numBrokers, null).map(KafkaConfig.fromProps(_, serverProps())) } @BeforeEach diff --git a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala index 8a8013921db..31f3663359e 100644 --- a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala @@ -52,7 +52,7 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest { } override def generateConfigs: collection.Seq[KafkaConfig] = { - TestUtils.createBrokerConfigs(numNodes, zkConnectOrNull, enableControlledShutdown = false, enableFetchFromFollower = true) + TestUtils.createBrokerConfigs(numNodes, null, enableControlledShutdown = false, enableFetchFromFollower = true) .map(KafkaConfig.fromProps(_, overridingProps)) } diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index ce953990af8..9c9067ef113 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -26,7 +26,6 @@ import java.util.{Collections, Locale, Optional, OptionalInt, Properties, stream import java.util.concurrent.{CompletableFuture, TimeUnit} import javax.security.auth.login.Configuration import kafka.utils.{CoreUtils, Logging, TestInfoUtils, TestUtils} -import kafka.zk.{AdminZkClient, EmbeddedZookeeper, KafkaZkClient} import org.apache.kafka.clients.admin.AdminClientUnitTestEnv import org.apache.kafka.clients.consumer.GroupProtocol import org.apache.kafka.clients.consumer.internals.AbstractCoordinator @@ -34,7 +33,7 @@ import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.common.utils.{Exit, Time, Utils} +import org.apache.kafka.common.utils.{Exit, Time} import org.apache.kafka.common.{DirectoryId, Uuid} import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR} import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion} @@ -47,8 +46,6 @@ import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, MetadataVe import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler} import org.apache.kafka.server.util.timer.SystemTimer -import org.apache.zookeeper.client.ZKClientConfig -import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag, TestInfo} import org.junit.jupiter.params.provider.Arguments @@ -69,30 +66,6 @@ trait QuorumImplementation { def shutdown(): Unit } -class ZooKeeperQuorumImplementation( - val zookeeper: EmbeddedZookeeper, - val zkConnect: String, - val zkClient: KafkaZkClient, - val adminZkClient: AdminZkClient, - val log: Logging -) extends QuorumImplementation { - override def createBroker( - config: KafkaConfig, - time: Time, - startup: Boolean, - threadNamePrefix: Option[String], - ): KafkaBroker = { - val server = new KafkaServer(config, time, threadNamePrefix) - if (startup) server.startup() - server - } - - override def shutdown(): Unit = { - Utils.closeQuietly(zkClient, "zk client") - CoreUtils.swallow(zookeeper.shutdown(), log) - } -} - class KRaftQuorumImplementation( val controllerServer: ControllerServer, val faultHandlerFactory: FaultHandlerFactory, @@ -172,11 +145,6 @@ class QuorumTestHarnessFaultHandlerFactory( @Tag("integration") abstract class QuorumTestHarness extends Logging { - val zkConnectionTimeout = 10000 - val zkSessionTimeout = 15000 // Allows us to avoid ZK session expiration due to GC up to 2/3 * 15000ms = 10 secs - val zkMaxInFlightRequests = Int.MaxValue - - protected def zkAclsEnabled: Option[Boolean] = None /** * When in KRaft mode, the security protocol to use for the controller listener. @@ -193,10 +161,6 @@ abstract class QuorumTestHarness extends Logging { private var testInfo: TestInfo = _ protected var implementation: QuorumImplementation = _ - def isKRaftTest(): Boolean = { - TestInfoUtils.isKRaft(testInfo) - } - def isShareGroupTest(): Boolean = { TestInfoUtils.isShareGroupTest(testInfo) } @@ -214,53 +178,11 @@ abstract class QuorumTestHarness extends Logging { gp.get } - def checkIsZKTest(): Unit = { - if (isKRaftTest()) { - throw new RuntimeException("This function can't be accessed when running the test " + - "in KRaft mode. ZooKeeper mode is required.") - } - } - - def checkIsKRaftTest(): Unit = { - if (!isKRaftTest()) { - throw new RuntimeException("This function can't be accessed when running the test " + - "in ZooKeeper mode. KRaft mode is required.") - } - } - - private def asZk(): ZooKeeperQuorumImplementation = { - checkIsZKTest() - implementation.asInstanceOf[ZooKeeperQuorumImplementation] - } - - private def asKRaft(): KRaftQuorumImplementation = { - checkIsKRaftTest() - implementation.asInstanceOf[KRaftQuorumImplementation] - } - - def zookeeper: EmbeddedZookeeper = asZk().zookeeper - - def zkClient: KafkaZkClient = asZk().zkClient - - def zkClientOrNull: KafkaZkClient = if (isKRaftTest()) null else asZk().zkClient - - def adminZkClient: AdminZkClient = asZk().adminZkClient - - def zkPort: Int = asZk().zookeeper.port - - def zkConnect: String = s"127.0.0.1:$zkPort" - - def zkConnectOrNull: String = if (isKRaftTest()) null else zkConnect + private def asKRaft(): KRaftQuorumImplementation = implementation.asInstanceOf[KRaftQuorumImplementation] def controllerServer: ControllerServer = asKRaft().controllerServer - def controllerServers: Seq[ControllerServer] = { - if (isKRaftTest()) { - Seq(asKRaft().controllerServer) - } else { - Seq() - } - } + def controllerServers: Seq[ControllerServer] = Seq(asKRaft().controllerServer) val faultHandlerFactory = new QuorumTestHarnessFaultHandlerFactory(new MockFaultHandler("quorumTestHarnessFaultHandler")) @@ -297,13 +219,8 @@ abstract class QuorumTestHarness extends Logging { val name = testInfo.getTestMethod.toScala .map(_.toString) .getOrElse("[unspecified]") - if (TestInfoUtils.isKRaft(testInfo)) { - info(s"Running KRAFT test $name") - implementation = newKRaftQuorum(testInfo) - } else { - info(s"Running ZK test $name") - implementation = newZooKeeperQuorum() - } + info(s"Running KRAFT test $name") + implementation = newKRaftQuorum(testInfo) } def createBroker( @@ -315,8 +232,6 @@ abstract class QuorumTestHarness extends Logging { implementation.createBroker(config, time, startup, threadNamePrefix) } - def shutdownZooKeeper(): Unit = asZk().shutdown() - def shutdownKRaftController(): Unit = { // Note that the RaftManager instance is left running; it will be shut down in tearDown() val kRaftQuorumImplementation = asKRaft() @@ -438,38 +353,6 @@ abstract class QuorumTestHarness extends Logging { ) } - private def newZooKeeperQuorum(): ZooKeeperQuorumImplementation = { - val zookeeper = new EmbeddedZookeeper() - var zkClient: KafkaZkClient = null - var adminZkClient: AdminZkClient = null - val zkConnect = s"127.0.0.1:${zookeeper.port}" - try { - zkClient = KafkaZkClient( - zkConnect, - zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), - zkSessionTimeout, - zkConnectionTimeout, - zkMaxInFlightRequests, - Time.SYSTEM, - name = "ZooKeeperTestHarness", - new ZKClientConfig, - enableEntityConfigControllerCheck = false) - adminZkClient = new AdminZkClient(zkClient) - } catch { - case t: Throwable => - CoreUtils.swallow(zookeeper.shutdown(), this) - Utils.closeQuietly(zkClient, "zk client") - throw t - } - new ZooKeeperQuorumImplementation( - zookeeper, - zkConnect, - zkClient, - adminZkClient, - this - ) - } - @AfterEach def tearDown(): Unit = { if (implementation != null) { @@ -482,22 +365,9 @@ abstract class QuorumTestHarness extends Logging { Configuration.setConfiguration(null) faultHandler.maybeRethrowFirstException() } - - // Trigger session expiry by reusing the session id in another client - def createZooKeeperClientToTriggerSessionExpiry(zooKeeper: ZooKeeper): ZooKeeper = { - val dummyWatcher = new Watcher { - override def process(event: WatchedEvent): Unit = {} - } - val anotherZkClient = new ZooKeeper(zkConnect, 1000, dummyWatcher, - zooKeeper.getSessionId, - zooKeeper.getSessionPasswd) - assertNull(anotherZkClient.exists("/nonexistent", false)) // Make sure new client works - anotherZkClient - } } object QuorumTestHarness { - val ZkClientEventThreadSuffix = "-EventThread" /** * Verify that a previous test that doesn't use QuorumTestHarness hasn't left behind an unexpected thread. @@ -527,7 +397,6 @@ object QuorumTestHarness { KafkaProducer.NETWORK_THREAD_PREFIX, AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(), AbstractCoordinator.HEARTBEAT_THREAD_PREFIX, - QuorumTestHarness.ZkClientEventThreadSuffix, KafkaEventQueue.EVENT_HANDLER_THREAD_SUFFIX, ClientMetricsManager.CLIENT_METRICS_REAPER_THREAD_NAME, SystemTimer.SYSTEM_TIMER_THREAD_PREFIX, diff --git a/core/src/test/scala/kafka/utils/TestInfoUtils.scala b/core/src/test/scala/kafka/utils/TestInfoUtils.scala index cd22727839e..a74d1ca1612 100644 --- a/core/src/test/scala/kafka/utils/TestInfoUtils.scala +++ b/core/src/test/scala/kafka/utils/TestInfoUtils.scala @@ -31,20 +31,7 @@ class EmptyTestInfo extends TestInfo { } object TestInfoUtils { - def isKRaft(testInfo: TestInfo): Boolean = { - if (testInfo.getDisplayName.contains("quorum=")) { - if (testInfo.getDisplayName.contains("quorum=kraft")) { - true - } else if (testInfo.getDisplayName.contains("quorum=zk")) { - false - } else { - throw new RuntimeException(s"Unknown quorum value") - } - } else { - false - } - } - + final val TestWithParameterizedQuorumAndGroupProtocolNames = "{displayName}.quorum={0}.groupProtocol={1}" def isShareGroupTest(testInfo: TestInfo): Boolean = { diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index 384d067e2b6..a9901ba65e7 100755 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -57,10 +57,7 @@ class AddPartitionsTest extends BaseRequestTest { @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) - - if (isKRaftTest()) { - brokers.foreach(broker => broker.asInstanceOf[BrokerServer].lifecycleManager.initialUnfenceFuture.get()) - } + brokers.foreach(broker => broker.asInstanceOf[BrokerServer].lifecycleManager.initialUnfenceFuture.get()) createTopicWithAssignment(topic1, partitionReplicaAssignment = topic1Assignment.map { case (k, v) => k -> v.replicas }) createTopicWithAssignment(topic2, partitionReplicaAssignment = topic2Assignment.map { case (k, v) => k -> v.replicas }) createTopicWithAssignment(topic3, partitionReplicaAssignment = topic3Assignment.map { case (k, v) => k -> v.replicas }) diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 162b14760fd..0fbf0143748 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -20,10 +20,7 @@ package kafka.integration import kafka.server._ import kafka.utils.TestUtils import kafka.utils.TestUtils._ -import kafka.zk.KafkaZkClient import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter} -import org.apache.kafka.common.errors.TopicExistsException -import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity} import org.apache.kafka.common.resource.ResourcePattern @@ -60,10 +57,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { */ def servers: mutable.Buffer[KafkaBroker] = brokers - def brokerServers: mutable.Buffer[BrokerServer] = { - checkIsKRaftTest() - _brokers.asInstanceOf[mutable.Buffer[BrokerServer]] - } + def brokerServers: mutable.Buffer[BrokerServer] = _brokers.asInstanceOf[mutable.Buffer[BrokerServer]] var alive: Array[Boolean] = _ @@ -155,12 +149,8 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { listenerName: ListenerName = listenerName, adminClientConfig: Properties = new Properties ): Unit = { - if (isKRaftTest()) { - Using.resource(createAdminClient(brokers, listenerName, adminClientConfig)) { admin => - TestUtils.createOffsetsTopicWithAdmin(admin, brokers, controllerServers) - } - } else { - createOffsetsTopic(zkClient, servers) + Using.resource(createAdminClient(brokers, listenerName, adminClientConfig)) { admin => + TestUtils.createOffsetsTopicWithAdmin(admin, brokers, controllerServers) } } @@ -177,25 +167,14 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { listenerName: ListenerName = listenerName, adminClientConfig: Properties = new Properties ): scala.collection.immutable.Map[Int, Int] = { - if (isKRaftTest()) { - Using.resource(createAdminClient(brokers, listenerName, adminClientConfig)) { admin => - TestUtils.createTopicWithAdmin( - admin = admin, - topic = topic, - brokers = brokers, - controllers = controllerServers, - numPartitions = numPartitions, - replicationFactor = replicationFactor, - topicConfig = topicConfig - ) - } - } else { - TestUtils.createTopic( - zkClient = zkClient, + Using.resource(createAdminClient(brokers, listenerName, adminClientConfig)) { admin => + TestUtils.createTopicWithAdmin( + admin = admin, topic = topic, + brokers = brokers, + controllers = controllerServers, numPartitions = numPartitions, replicationFactor = replicationFactor, - servers = servers, topicConfig = topicConfig ) } @@ -210,40 +189,28 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]], listenerName: ListenerName = listenerName - ): scala.collection.immutable.Map[Int, Int] = - if (isKRaftTest()) { - Using.resource(createAdminClient(brokers, listenerName)) { admin => - TestUtils.createTopicWithAdmin( - admin = admin, - topic = topic, - replicaAssignment = partitionReplicaAssignment, - brokers = brokers, - controllers = controllerServers - ) - } - } else { - TestUtils.createTopic( - zkClient, - topic, - partitionReplicaAssignment, - servers + ): scala.collection.immutable.Map[Int, Int] = { + Using.resource(createAdminClient(brokers, listenerName)) { admin => + TestUtils.createTopicWithAdmin( + admin = admin, + topic = topic, + replicaAssignment = partitionReplicaAssignment, + brokers = brokers, + controllers = controllerServers ) } + } def deleteTopic( topic: String, listenerName: ListenerName = listenerName ): Unit = { - if (isKRaftTest()) { - Using.resource(createAdminClient(brokers, listenerName)) { admin => - TestUtils.deleteTopicWithAdmin( - admin = admin, - topic = topic, - brokers = aliveBrokers, - controllers = controllerServers) - } - } else { - adminZkClient.deleteTopic(topic) + Using.resource(createAdminClient(brokers, listenerName)) { admin => + TestUtils.deleteTopicWithAdmin( + admin = admin, + topic = topic, + brokers = aliveBrokers, + controllers = controllerServers) } } @@ -380,11 +347,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { } private def createBrokerFromConfig(config: KafkaConfig): KafkaBroker = { - if (isKRaftTest()) { - createBroker(config, brokerTime(config.brokerId), startup = false) - } else { - TestUtils.createServer(config, time = brokerTime(config.brokerId), threadNamePrefix = None, startup = false) - } + createBroker(config, brokerTime(config.brokerId), startup = false) } def aliveBrokers: Seq[KafkaBroker] = { @@ -392,63 +355,20 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { } def ensureConsistentKRaftMetadata(): Unit = { - if (isKRaftTest()) { - TestUtils.ensureConsistentKRaftMetadata( - aliveBrokers, - controllerServer - ) - } + TestUtils.ensureConsistentKRaftMetadata( + aliveBrokers, + controllerServer + ) } def changeClientIdConfig(sanitizedClientId: String, configs: Properties): Unit = { - if (isKRaftTest()) { - Using.resource(createAdminClient(brokers, listenerName)) { - admin => { - admin.alterClientQuotas(Collections.singleton( - new ClientQuotaAlteration( - new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> (if (sanitizedClientId == "") null else sanitizedClientId)).asJava), - configs.asScala.map { case (key, value) => new ClientQuotaAlteration.Op(key, value.toDouble) }.toList.asJava))).all().get() - } + Using.resource(createAdminClient(brokers, listenerName)) { + admin => { + admin.alterClientQuotas(Collections.singleton( + new ClientQuotaAlteration( + new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> (if (sanitizedClientId == "") null else sanitizedClientId)).asJava), + configs.asScala.map { case (key, value) => new ClientQuotaAlteration.Op(key, value.toDouble) }.toList.asJava))).all().get() } } - else { - adminZkClient.changeClientIdConfig(sanitizedClientId, configs) - } - } - - /** - * Ensures that the consumer offsets/group metadata topic exists. If it does not, the topic is created and the method waits - * until the leader is elected and metadata is propagated to all brokers. If it does, the method verifies that it has - * the expected number of partition and replication factor however it does not guarantee that the topic is empty. - */ - private def createOffsetsTopic(zkClient: KafkaZkClient, servers: Seq[KafkaBroker]): Unit = { - val server = servers.head - val numPartitions = server.config.groupCoordinatorConfig.offsetsTopicPartitions - val replicationFactor = server.config.groupCoordinatorConfig.offsetsTopicReplicationFactor.toInt - - try { - TestUtils.createTopic( - zkClient, - Topic.GROUP_METADATA_TOPIC_NAME, - numPartitions, - replicationFactor, - servers, - server.groupCoordinator.groupMetadataTopicConfigs - ) - } catch { - case ex: TopicExistsException => - val allPartitionsMetadata = waitForAllPartitionsMetadata( - servers, - Topic.GROUP_METADATA_TOPIC_NAME, - numPartitions - ) - - // If the topic already exists, we ensure that it has the required - // number of partitions and replication factor. If it has not, the - // exception is thrown further. - if (allPartitionsMetadata.size != numPartitions || allPartitionsMetadata.head._2.replicas.size != replicationFactor) { - throw ex - } - } } } diff --git a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala index 01d96ef6318..10a32c4a3c1 100644 --- a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala @@ -51,7 +51,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with @volatile private var running = true - override def generateConfigs = TestUtils.createBrokerConfigs(nodesNum, zkConnectOrNull) + override def generateConfigs = TestUtils.createBrokerConfigs(nodesNum, null) .map(KafkaConfig.fromProps(_, overridingProps)) @BeforeEach @@ -148,7 +148,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with for (t <- topics if running) { try { deleteTopic(t) - TestUtils.verifyTopicDeletion(null, t, partitionNum, servers) + TestUtils.verifyTopicDeletion(t, partitionNum, servers) } catch { case e: Exception => e.printStackTrace() } diff --git a/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala b/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala index 0b7beeaab7a..4af4a49069f 100644 --- a/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala +++ b/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala @@ -29,7 +29,7 @@ import org.junit.jupiter.params.provider.ValueSource class MinIsrConfigTest extends KafkaServerTestHarness { val overridingProps = new Properties() overridingProps.put(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG, "5") - def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnectOrNull).map(KafkaConfig.fromProps(_, overridingProps)) + def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, null).map(KafkaConfig.fromProps(_, overridingProps)) @ParameterizedTest @ValueSource(strings = Array("kraft")) diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 3d78bf27aff..29714d1856b 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -71,8 +71,8 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) - configProps1 = createBrokerConfig(brokerId1, zkConnectOrNull) - configProps2 = createBrokerConfig(brokerId2, zkConnectOrNull) + configProps1 = createBrokerConfig(brokerId1, null) + configProps2 = createBrokerConfig(brokerId2, null) for (configProps <- List(configProps1, configProps2)) { configProps.put("controlled.shutdown.enable", enableControlledShutdown.toString) diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala index 1fa12ef990b..451b8cf2f87 100644 --- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala @@ -62,7 +62,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging { val topic = "test-topic-metric" createTopic(topic) deleteTopic(topic) - TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 1, brokers) + TestUtils.verifyTopicDeletion(topic, 1, brokers) assertEquals(Set.empty, topicMetricGroups(topic), "Topic metrics exists after deleteTopic") } @@ -77,7 +77,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging { assertTrue(topicMetricGroups(topic).nonEmpty, "Topic metrics don't exist") brokers.foreach(b => assertNotNull(b.brokerTopicStats.topicStats(topic))) deleteTopic(topic) - TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 1, brokers) + TestUtils.verifyTopicDeletion(topic, 1, brokers) assertEquals(Set.empty, topicMetricGroups(topic), "Topic metrics exists after deleteTopic") } diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala index 95a9f92fe86..3a0ffe1b477 100644 --- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala @@ -69,16 +69,9 @@ abstract class BaseRequestTest extends IntegrationTestHarness { /** * Return the socket server where admin request to be sent. * - * For KRaft clusters that is any broker as the broker will forward the request to the active - * controller. For Legacy clusters that is the controller broker. + * KRaft clusters that is any broker as the broker will forward the request to the active controller. */ - def adminSocketServer: SocketServer = { - if (isKRaftTest()) { - anySocketServer - } else { - controllerSocketServer - } - } + def adminSocketServer: SocketServer = anySocketServer def connect(socketServer: SocketServer = anySocketServer, listenerName: ListenerName = listenerName): Socket = { diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala index ec7cdffa1ef..81875ceaac2 100644 --- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala +++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala @@ -40,7 +40,7 @@ class DeleteTopicsRequestWithDeletionDisabledTest extends BaseRequestTest { } override def generateConfigs = { - val props = TestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull, + val props = TestUtils.createBrokerConfigs(brokerCount, null, enableControlledShutdown = false, enableDeleteTopic = false, interBrokerSecurityProtocol = Some(securityProtocol), trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount) diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala index f2be521fca4..0c9cab50d46 100755 --- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala @@ -44,7 +44,7 @@ import scala.jdk.CollectionConverters._ class EdgeCaseRequestTest extends KafkaServerTestHarness { def generateConfigs = { - val props = TestUtils.createBrokerConfig(1, zkConnectOrNull) + val props = TestUtils.createBrokerConfig(1, null) props.setProperty(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false") List(KafkaConfig.fromProps(props)) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala index a97cea67c5c..d319a18bc82 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala @@ -71,7 +71,7 @@ class KafkaMetricsReporterTest extends QuorumTestHarness { @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) - val props = TestUtils.createBrokerConfig(1, zkConnectOrNull) + val props = TestUtils.createBrokerConfig(1, null) props.setProperty(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, "kafka.server.KafkaMetricsReporterTest$MockMetricsReporter") props.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, "true") props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1") diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 156fa60a1d5..baf51347cb2 100755 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -82,7 +82,7 @@ class LogRecoveryTest extends QuorumTestHarness { override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) - configs = TestUtils.createBrokerConfigs(2, zkConnectOrNull, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps)) + configs = TestUtils.createBrokerConfigs(2, null, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps)) // start both servers server1 = createBroker(configProps1) diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala index 34720797e11..fbd6f75a6a9 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala @@ -88,7 +88,7 @@ class ReplicationQuotasTest extends QuorumTestHarness { * regular replication works as expected. */ - brokers = (100 to 105).map { id => createBroker(fromProps(createBrokerConfig(id, zkConnectOrNull))) } + brokers = (100 to 105).map { id => createBroker(fromProps(createBrokerConfig(id, null))) } //Given six partitions, led on nodes 0,1,2,3,4,5 but with followers on node 6,7 (not started yet) //And two extra partitions 6,7, which we don't intend on throttling. @@ -202,7 +202,7 @@ class ReplicationQuotasTest extends QuorumTestHarness { */ //2 brokers with 1MB Segment Size & 1 partition - val config: Properties = createBrokerConfig(100, zkConnectOrNull) + val config: Properties = createBrokerConfig(100, null) config.put("log.segment.bytes", (1024 * 1024).toString) brokers = Seq(createBroker(fromProps(config))) @@ -231,7 +231,7 @@ class ReplicationQuotasTest extends QuorumTestHarness { //Start the new broker (and hence start replicating) debug("Starting new broker") - brokers = brokers :+ createBroker(fromProps(createBrokerConfig(101, zkConnectOrNull))) + brokers = brokers :+ createBroker(fromProps(createBrokerConfig(101, null))) val start = System.currentTimeMillis() waitForOffsetsToMatch(msgCount, 0, 101) @@ -261,7 +261,7 @@ class ReplicationQuotasTest extends QuorumTestHarness { def createBrokers(brokerIds: Seq[Int]): Unit = { brokerIds.foreach { id => - brokers = brokers :+ createBroker(fromProps(createBrokerConfig(id, zkConnectOrNull))) + brokers = brokers :+ createBroker(fromProps(createBrokerConfig(id, null))) } } diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala index 4ac571f452a..7c7e7fd6e09 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala @@ -68,7 +68,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging { @ParameterizedTest @ValueSource(strings = Array("kraft")) def shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader(quorum: String): Unit = { - brokers ++= (0 to 1).map { id => createBroker(fromProps(createBrokerConfig(id, zkConnectOrNull))) } + brokers ++= (0 to 1).map { id => createBroker(fromProps(createBrokerConfig(id, null))) } // Given two topics with replication of a single partition for (topic <- List(topic1, topic2)) { @@ -103,7 +103,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging { def shouldSendLeaderEpochRequestAndGetAResponse(quorum: String): Unit = { //3 brokers, put partition on 100/101 and then pretend to be 102 - brokers ++= (100 to 102).map { id => createBroker(fromProps(createBrokerConfig(id, zkConnectOrNull))) } + brokers ++= (100 to 102).map { id => createBroker(fromProps(createBrokerConfig(id, null))) } val assignment1 = Map(0 -> Seq(100), 1 -> Seq(101)) createTopic(topic1, assignment1) @@ -150,10 +150,10 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging { @ValueSource(strings = Array("kraft")) def shouldIncreaseLeaderEpochBetweenLeaderRestarts(quorum: String): Unit = { //Setup: we are only interested in the single partition on broker 101 - brokers += createBroker(fromProps(createBrokerConfig(100, zkConnectOrNull))) + brokers += createBroker(fromProps(createBrokerConfig(100, null))) assertEquals(controllerServer.config.nodeId, waitUntilQuorumLeaderElected(controllerServer)) - brokers += createBroker(fromProps(createBrokerConfig(101, zkConnectOrNull))) + brokers += createBroker(fromProps(createBrokerConfig(101, null))) def leo() = brokers(1).replicaManager.localLog(tp).get.logEndOffset diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index b2447e7f7c5..379ba5a532f 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -526,39 +526,6 @@ object TestUtils extends Logging { controllers.foreach(controller => ensureConsistentKRaftMetadata(brokers, controller)) } - /** - * Create a topic in ZooKeeper. - * Wait until the leader is elected and the metadata is propagated to all brokers. - * Return the leader for each partition. - */ - def createTopic(zkClient: KafkaZkClient, - topic: String, - numPartitions: Int = 1, - replicationFactor: Int = 1, - servers: Seq[KafkaBroker], - topicConfig: Properties = new Properties): scala.collection.immutable.Map[Int, Int] = { - val adminZkClient = new AdminZkClient(zkClient) - // create topic - waitUntilTrue( () => { - var hasSessionExpirationException = false - try { - adminZkClient.createTopic(topic, numPartitions, replicationFactor, topicConfig) - } catch { - case _: SessionExpiredException => hasSessionExpirationException = true - case e: Throwable => throw e // let other exceptions propagate - } - !hasSessionExpirationException}, - s"Can't create topic $topic") - - // wait until we've propagated all partitions metadata to all servers - val allPartitionsMetadata = waitForAllPartitionsMetadata(servers, topic, numPartitions) - - (0 until numPartitions).map { i => - i -> allPartitionsMetadata.get(new TopicPartition(topic, i)).map(_.leader()).getOrElse( - throw new IllegalStateException(s"Cannot get the partition leader for topic: $topic, partition: $i in server metadata cache")) - }.toMap - } - /** * Create a topic in ZooKeeper using a customized replica assignment. * Wait until the leader is elected and the metadata is propagated to all brokers. @@ -698,31 +665,6 @@ object TestUtils extends Logging { new KafkaConsumer[K, V](consumerProps, keyDeserializer, valueDeserializer) } - /** - * If neither oldLeaderOpt nor newLeaderOpt is defined, wait until the leader of a partition is elected. - * If oldLeaderOpt is defined, it waits until the new leader is different from the old leader. - * If newLeaderOpt is defined, it waits until the new leader becomes the expected new leader. - * - * @return The new leader (note that negative values are used to indicate conditions like NoLeader and - * LeaderDuringDelete). - * @throws AssertionError if the expected condition is not true within the timeout. - */ - def waitUntilLeaderIsElectedOrChanged( - zkClient: KafkaZkClient, - topic: String, - partition: Int, - timeoutMs: Long = 30000L, - oldLeaderOpt: Option[Int] = None, - newLeaderOpt: Option[Int] = None, - ignoreNoLeader: Boolean = false - ): Int = { - def getPartitionLeader(topic: String, partition: Int): Option[Int] = { - zkClient.getLeaderForPartition(new TopicPartition(topic, partition)) - .filter(p => !ignoreNoLeader || p != LeaderAndIsr.NO_LEADER) - } - doWaitUntilLeaderIsElectedOrChanged(getPartitionLeader, topic, partition, timeoutMs, oldLeaderOpt, newLeaderOpt) - } - /** * If neither oldLeaderOpt nor newLeaderOpt is defined, wait until the leader of a partition is elected. * If oldLeaderOpt is defined, it waits until the new leader is different from the old leader. @@ -1033,11 +975,6 @@ object TestUtils extends Logging { }, msg) } - def waitUntilControllerElected(zkClient: KafkaZkClient, timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = { - val (controllerId, _) = computeUntilTrue(zkClient.getControllerId, waitTime = timeout)(_.isDefined) - controllerId.getOrElse(throw new AssertionError(s"Controller not elected after $timeout ms")) - } - def awaitLeaderChange[B <: KafkaBroker]( brokers: Seq[B], tp: TopicPartition, @@ -1240,18 +1177,10 @@ object TestUtils extends Logging { } def verifyTopicDeletion[B <: KafkaBroker]( - zkClient: KafkaZkClient, topic: String, numPartitions: Int, brokers: Seq[B]): Unit = { val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)) - if (zkClient != null) { - // wait until admin path for delete topic is deleted, signaling completion of topic deletion - waitUntilTrue(() => !zkClient.isTopicMarkedForDeletion(topic), - "Admin path /admin/delete_topics/%s path not deleted even after a replica is restarted".format(topic)) - waitUntilTrue(() => !zkClient.topicExists(topic), - "Topic path /brokers/topics/%s not deleted after /admin/delete_topics/%s path is deleted".format(topic, topic)) - } // ensure that the topic-partition has been deleted from all brokers' replica managers waitUntilTrue(() => brokers.forall(broker => topicPartitions.forall(tp => broker.replicaManager.onlinePartition(tp).isEmpty)),