From b480135b4f5342ae0c4b4df214efc02a402c610e Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Thu, 3 Oct 2024 01:10:53 +0800 Subject: [PATCH] KAFKA-16974 KRaft support in SslAdminIntegrationTest (#17251) Reviewers: Chia-Ping Tsai --- ...minClientWithPoliciesIntegrationTest.scala | 8 +- .../kafka/api/BaseAdminIntegrationTest.scala | 4 +- .../api/PlaintextAdminIntegrationTest.scala | 343 ++++++------------ .../api/SaslSslAdminIntegrationTest.scala | 51 ++- .../kafka/api/SslAdminIntegrationTest.scala | 110 ++++-- .../kafka/server/QuorumTestHarness.scala | 13 +- .../scala/unit/kafka/utils/TestUtils.scala | 2 +- 7 files changed, 240 insertions(+), 291 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala index 3b2c8d9739c..6f902b2db3b 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala @@ -65,7 +65,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with Map[String, Object](AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()).asJava override def generateConfigs: collection.Seq[KafkaConfig] = { - val configs = TestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull) + val configs = TestUtils.createBrokerConfigs(brokerCount, null) configs.foreach(overrideNodeConfigs) configs.map(KafkaConfig.fromProps) } @@ -81,7 +81,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testValidAlterConfigs(quorum: String): Unit = { client = Admin.create(createConfig) // Create topics @@ -100,7 +100,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testInvalidAlterConfigs(quorum: String): Unit = { client = Admin.create(createConfig) PlaintextAdminIntegrationTest.checkInvalidAlterConfigs(this, client) @@ -108,7 +108,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with @nowarn("cat=deprecation") @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testInvalidAlterConfigsDueToPolicy(quorum: String): Unit = { client = Admin.create(createConfig) diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala index ab47edc8cd5..b385f9f222a 100644 --- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala @@ -71,7 +71,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testCreateDeleteTopics(quorum: String): Unit = { client = createAdminClient val topics = Seq("mytopic", "mytopic2", "mytopic3") @@ -164,7 +164,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testAuthorizedOperations(quorum: String): Unit = { client = createAdminClient diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index d439a59931d..d04c9e6ee22 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -41,7 +41,7 @@ import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig, SslConfig import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter} -import org.apache.kafka.common.requests.{DeleteRecordsRequest, MetadataResponse} +import org.apache.kafka.common.requests.{DeleteRecordsRequest} import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType} import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} import org.apache.kafka.common.utils.{Time, Utils} @@ -50,7 +50,7 @@ import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEX import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig} import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.security.authorizer.AclEntry -import org.apache.kafka.server.config.{QuotaConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs} +import org.apache.kafka.server.config.{QuotaConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig} import org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS import org.junit.jupiter.api.Assertions._ @@ -98,7 +98,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @ParameterizedTest @Timeout(30) - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testDescribeConfigWithOptionTimeoutMs(quorum: String): Unit = { val config = createConfig config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}") @@ -119,7 +119,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @ParameterizedTest @Timeout(30) - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testAlterConfigsWithOptionTimeoutMs(quorum: String): Unit = { client = createAdminClient val config = createConfig @@ -141,7 +141,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testCreatePartitionWithOptionRetryOnQuotaViolation(quorum: String): Unit = { // Since it's hard to stably reach quota limit in integration test, we only verify quota configs are set correctly val config = createConfig @@ -167,7 +167,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testDescribeUserScramCredentials(quorum: String): Unit = { client = createAdminClient @@ -235,7 +235,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @ParameterizedTest @Timeout(10) - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testDescribeUserScramCredentialsTimeout(quorum: String): Unit = { client = createInvalidAdminClient() try { @@ -266,7 +266,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testDescribeProducers(quorum: String): Unit = { client = createAdminClient client.createTopics(Collections.singletonList(new NewTopic(topic, 1, 1.toShort))).all().get() @@ -339,7 +339,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testDescribeTransactions(quorum: String): Unit = { client = createAdminClient client.createTopics(Collections.singletonList(new NewTopic(topic, 1, 1.toShort))).all().get() @@ -425,7 +425,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @ParameterizedTest @Timeout(10) - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testDescribeTransactionsTimeout(quorum: String): Unit = { client = createInvalidAdminClient() try { @@ -440,7 +440,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @ParameterizedTest @Timeout(10) - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testAbortTransactionTimeout(quorum: String): Unit = { client = createInvalidAdminClient() try { @@ -454,7 +454,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testListTransactions(quorum: String): Unit = { def createTransactionList(): Unit = { client = createAdminClient @@ -528,7 +528,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testAbortTransaction(quorum: String): Unit = { client = createAdminClient val tp = new TopicPartition("topic1", 0) @@ -593,7 +593,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testClose(quorum: String): Unit = { val client = createAdminClient client.close() @@ -601,7 +601,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testListNodes(quorum: String): Unit = { client = createAdminClient val brokerStrs = bootstrapServers().split(",").toList.sorted @@ -614,7 +614,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testAdminClientHandlingBadIPWithoutTimeout(quorum: String): Unit = { val config = createConfig config.put(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, "1000") @@ -629,7 +629,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testCreateExistingTopicsThrowTopicExistsException(quorum: String): Unit = { client = createAdminClient val topic = "mytopic" @@ -646,7 +646,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testDeleteTopicsWithIds(quorum: String): Unit = { client = createAdminClient val topics = Seq("mytopic", "mytopic2", "mytopic3") @@ -665,7 +665,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testDeleteTopicsWithOptionTimeoutMs(quorum: String): Unit = { client = createInvalidAdminClient() @@ -678,7 +678,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testListTopicsWithOptionTimeoutMs(quorum: String): Unit = { client = createInvalidAdminClient() @@ -691,7 +691,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testListTopicsWithOptionListInternal(quorum: String): Unit = { client = createAdminClient @@ -700,7 +700,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testDescribeTopicsWithOptionPartitionSizeLimitPerResponse(quorum: String): Unit = { client = createAdminClient @@ -717,7 +717,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testDescribeTopicsWithOptionTimeoutMs(quorum: String): Unit = { client = createInvalidAdminClient() @@ -733,7 +733,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { * describe should not auto create topics */ @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testDescribeNonExistingTopic(quorum: String): Unit = { client = createAdminClient @@ -745,13 +745,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val results = client.describeTopics(Seq(nonExistingTopic, existingTopic).asJava).topicNameValues() assertEquals(existingTopic, results.get(existingTopic).get.name) assertFutureExceptionTypeEquals(results.get(nonExistingTopic), classOf[UnknownTopicOrPartitionException]) - if (!isKRaftTest()) { - assertEquals(None, zkClient.getTopicPartitionCount(nonExistingTopic)) - } } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testDescribeTopicsWithIds(quorum: String): Unit = { client = createAdminClient @@ -770,7 +767,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testDescribeTopicsWithNames(quorum: String): Unit = { client = createAdminClient @@ -785,7 +782,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testDescribeCluster(quorum: String): Unit = { client = createAdminClient val result = client.describeCluster @@ -794,14 +791,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(brokers.head.dataPlaneRequestProcessor.clusterId, clusterId) val controller = result.controller().get() - if (isKRaftTest()) { - // In KRaft, we return a random brokerId as the current controller. - val brokerIds = brokers.map(_.config.brokerId).toSet - assertTrue(brokerIds.contains(controller.id)) - } else { - assertEquals(brokers.head.dataPlaneRequestProcessor.metadataCache.getControllerId.map(_.id). - getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id) - } + // In KRaft, we return a random brokerId as the current controller. + val brokerIds = brokers.map(_.config.brokerId).toSet + assertTrue(brokerIds.contains(controller.id)) val brokerEndpoints = bootstrapServers().split(",") assertEquals(brokerEndpoints.size, nodes.size) @@ -812,7 +804,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testDescribeLogDirs(quorum: String): Unit = { client = createAdminClient val topic = "topic" @@ -844,7 +836,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testDescribeReplicaLogDirs(quorum: String): Unit = { client = createAdminClient val topic = "topic" @@ -863,7 +855,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testAlterReplicaLogDirs(quorum: String): Unit = { client = createAdminClient val topic = "topic" @@ -953,7 +945,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testDescribeConfigsNonexistent(quorum: String): Unit = { client = createAdminClient @@ -988,7 +980,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testDescribeAndAlterConfigs(quorum: String): Unit = { client = createAdminClient @@ -1133,7 +1125,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testCreatePartitions(quorum: String): Unit = { client = createAdminClient @@ -1198,11 +1190,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { var e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get, () => s"$desc: Expect InvalidPartitionsException when newCount is a decrease") assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc) - var exceptionMsgStr = if (isKRaftTest()) { - "The topic create-partitions-topic-1 currently has 3 partition(s); 1 would not be an increase." - } else { - "Topic currently has 3 partitions, which is higher than the requested 1." - } + var exceptionMsgStr = "The topic create-partitions-topic-1 currently has 3 partition(s); 1 would not be an increase." assertEquals(exceptionMsgStr, e.getCause.getMessage, desc) assertEquals(3, numPartitions(topic1, expectedNumPartitionsOpt = Some(3)), desc) @@ -1212,11 +1200,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic2).get, () => s"$desc: Expect InvalidPartitionsException when requesting a noop") assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc) - exceptionMsgStr = if (isKRaftTest()) { - "Topic already has 3 partition(s)." - } else { - "Topic already has 3 partitions." - } + exceptionMsgStr = "Topic already has 3 partition(s)." assertEquals(exceptionMsgStr, e.getCause.getMessage, desc) assertEquals(3, numPartitions(topic2, expectedNumPartitionsOpt = Some(3)), desc) @@ -1243,11 +1227,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(unknownTopic).get, () => s"$desc: Expect InvalidTopicException when using an unknown topic") assertTrue(e.getCause.isInstanceOf[UnknownTopicOrPartitionException], desc) - exceptionMsgStr = if (isKRaftTest()) { - "This server does not host this topic-partition." - } else { - "The topic 'an-unknown-topic' does not exist." - } + exceptionMsgStr = "This server does not host this topic-partition." assertEquals(exceptionMsgStr, e.getCause.getMessage, desc) // try an invalid newCount @@ -1256,11 +1236,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get, () => s"$desc: Expect InvalidPartitionsException when newCount is invalid") assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc) - exceptionMsgStr = if (isKRaftTest()) { - "The topic create-partitions-topic-1 currently has 3 partition(s); -22 would not be an increase." - } else { - "Topic currently has 3 partitions, which is higher than the requested -22." - } + exceptionMsgStr = "The topic create-partitions-topic-1 currently has 3 partition(s); -22 would not be an increase." assertEquals(exceptionMsgStr, e.getCause.getMessage, desc) assertEquals(3, numPartitions(topic1, expectedNumPartitionsOpt = Some(3)), desc) @@ -1271,13 +1247,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get, () => s"$desc: Expect InvalidPartitionsException when #brokers != replication factor") assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc) - exceptionMsgStr = if (isKRaftTest()) { - "The manual partition assignment includes a partition with 2 replica(s), but this is not " + + exceptionMsgStr = "The manual partition assignment includes a partition with 2 replica(s), but this is not " + "consistent with previous partitions, which have 1 replica(s)." - } else { - "Inconsistent replication factor between partitions, partition 0 has 1 while partitions [3] " + - "have replication factors [2], respectively." - } assertEquals(exceptionMsgStr, e.getCause.getMessage, desc) assertEquals(3, numPartitions(topic1, expectedNumPartitionsOpt = Some(3)), desc) @@ -1287,11 +1258,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get, () => s"$desc: Expect InvalidReplicaAssignmentException when #assignments != newCount - oldCount") assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc) - exceptionMsgStr = if (isKRaftTest()) { - "Attempted to add 3 additional partition(s), but only 1 assignment(s) were specified." - } else { - "Increasing the number of partitions by 3 but 1 assignments provided." - } + exceptionMsgStr = "Attempted to add 3 additional partition(s), but only 1 assignment(s) were specified." assertEquals(exceptionMsgStr, e.getCause.getMessage, desc) assertEquals(3, numPartitions(topic1, expectedNumPartitionsOpt = Some(3)), desc) @@ -1300,11 +1267,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { NewPartitions.increaseTo(4, asList(asList(1), asList(2)))).asJava, option) e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get, () => s"$desc: Expect InvalidReplicaAssignmentException when #assignments != newCount - oldCount") - exceptionMsgStr = if (isKRaftTest()) { - "Attempted to add 1 additional partition(s), but only 2 assignment(s) were specified." - } else { - "Increasing the number of partitions by 1 but 2 assignments provided." - } + exceptionMsgStr = "Attempted to add 1 additional partition(s), but only 2 assignment(s) were specified." assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc) assertEquals(exceptionMsgStr, e.getCause.getMessage, desc) assertEquals(3, numPartitions(topic1, expectedNumPartitionsOpt = Some(3)), desc) @@ -1315,11 +1278,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get, () => s"$desc: Expect InvalidReplicaAssignmentException when assignments has duplicate brokers") assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc) - exceptionMsgStr = if (isKRaftTest()) { - "The manual partition assignment includes the broker 1 more than once." - } else { - "Duplicate brokers not allowed in replica assignment: 1, 1 for partition id 3." - } + exceptionMsgStr = "The manual partition assignment includes the broker 1 more than once." assertEquals(exceptionMsgStr, e.getCause.getMessage, desc) assertEquals(3, numPartitions(topic1, expectedNumPartitionsOpt = Some(3)), desc) @@ -1329,13 +1288,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get, () => s"$desc: Expect InvalidReplicaAssignmentException when assignments have differently sized inner lists") assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc) - exceptionMsgStr = if (isKRaftTest()) { - "The manual partition assignment includes a partition with 2 replica(s), but this is not " + + exceptionMsgStr = "The manual partition assignment includes a partition with 2 replica(s), but this is not " + "consistent with previous partitions, which have 1 replica(s)." - } else { - "Inconsistent replication factor between partitions, partition 0 has 1 " + - "while partitions [4] have replication factors [2], respectively." - } assertEquals(exceptionMsgStr, e.getCause.getMessage, desc) assertEquals(3, numPartitions(topic1, expectedNumPartitionsOpt = Some(3)), desc) @@ -1345,11 +1299,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get, () => s"$desc: Expect InvalidReplicaAssignmentException when assignments contains an unknown broker") assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc) - exceptionMsgStr = if (isKRaftTest()) { - "The manual partition assignment includes broker 12, but no such broker is registered." - } else { - "Unknown broker(s) in replica assignment: 12." - } + exceptionMsgStr = "The manual partition assignment includes broker 12, but no such broker is registered." assertEquals(exceptionMsgStr, e.getCause.getMessage, desc) assertEquals(3, numPartitions(topic1, expectedNumPartitionsOpt = Some(3)), desc) @@ -1359,11 +1309,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get, () => s"$desc: Expect InvalidReplicaAssignmentException when assignments is empty") assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc) - exceptionMsgStr = if (isKRaftTest()) { - "Attempted to add 1 additional partition(s), but only 0 assignment(s) were specified." - } else { - "Increasing the number of partitions by 1 but 0 assignments provided." - } + exceptionMsgStr = "Attempted to add 1 additional partition(s), but only 0 assignment(s) were specified." assertEquals(exceptionMsgStr, e.getCause.getMessage, desc) assertEquals(3, numPartitions(topic1, expectedNumPartitionsOpt = Some(3)), desc) } @@ -1377,34 +1323,24 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { TestUtils.waitForAllPartitionsMetadata(brokers, topic1, expectedNumPartitions = 4) var e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic2).get) assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException]) - val exceptionMsgStr = if (isKRaftTest()) { - "The topic create-partitions-topic-2 currently has 3 partition(s); 2 would not be an increase." - } else { - "Topic currently has 3 partitions, which is higher than the requested 2." - } + val exceptionMsgStr = "The topic create-partitions-topic-2 currently has 3 partition(s); 2 would not be an increase." assertEquals(exceptionMsgStr, e.getCause.getMessage) TestUtils.waitForAllPartitionsMetadata(brokers, topic2, expectedNumPartitions = 3) - // Delete the topic. Verify addition of partitions to deleted topic is not possible. In - // Zookeeper mode, the topic is queued for deletion. In KRaft, the deletion occurs - // immediately and hence we have a different Exception thrown in the response. + // Delete the topic. Verify addition of partitions to deleted topic is not possible. + // In KRaft, the deletion occurs immediately and hence we have a different Exception thrown in the response. val deleteResult = client.deleteTopics(asList(topic1)) deleteResult.topicNameValues.get(topic1).get alterResult = client.createPartitions(Map(topic1 -> NewPartitions.increaseTo(4)).asJava, validateOnly) e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get, () => "Expect InvalidTopicException or UnknownTopicOrPartitionException when the topic is queued for deletion") - if (isKRaftTest()) { - assertTrue(e.getCause.isInstanceOf[UnknownTopicOrPartitionException], e.toString) - assertEquals("This server does not host this topic-partition.", e.getCause.getMessage) - } else { - assertTrue(e.getCause.isInstanceOf[InvalidTopicException], e.toString) - assertEquals("The topic is queued for deletion.", e.getCause.getMessage) - } + assertTrue(e.getCause.isInstanceOf[UnknownTopicOrPartitionException], e.toString) + assertEquals("This server does not host this topic-partition.", e.getCause.getMessage) } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testSeekAfterDeleteRecords(quorum: String): Unit = { createTopic(topic, numPartitions = 2, replicationFactor = brokerCount) @@ -1434,7 +1370,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testLogStartOffsetCheckpoint(quorum: String): Unit = { createTopic(topic, numPartitions = 2, replicationFactor = brokerCount) @@ -1474,7 +1410,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testLogStartOffsetAfterDeleteRecords(quorum: String): Unit = { createTopic(topic, numPartitions = 2, replicationFactor = brokerCount) @@ -1495,7 +1431,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(quorum: String): Unit = { val leaders = createTopic(topic, replicationFactor = brokerCount) val followerIndex = if (leaders(0) != brokers.head.config.brokerId) 0 else 1 @@ -1544,7 +1480,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testAlterLogDirsAfterDeleteRecords(quorum: String): Unit = { client = createAdminClient createTopic(topic, replicationFactor = brokerCount) @@ -1577,7 +1513,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testOffsetsForTimesAfterDeleteRecords(quorum: String): Unit = { createTopic(topic, numPartitions = 2, replicationFactor = brokerCount) @@ -1600,7 +1536,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testConsumeAfterDeleteRecords(quorum: String): Unit = { val consumer = createConsumer() subscribeAndWaitForAssignment(topic, consumer) @@ -1624,7 +1560,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testDeleteRecordsWithException(quorum: String): Unit = { val consumer = createConsumer() subscribeAndWaitForAssignment(topic, consumer) @@ -1644,7 +1580,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testDescribeConfigsForTopic(quorum: String): Unit = { createTopic(topic, numPartitions = 2, replicationFactor = brokerCount) client = createAdminClient @@ -1664,7 +1600,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testIncludeDocumentation(quorum: String): Unit = { createTopic(topic) client = createAdminClient @@ -1700,7 +1636,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testInvalidAlterConfigs(quorum: String): Unit = { client = createAdminClient checkInvalidAlterConfigs(this, client) @@ -1712,7 +1648,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { * when the authorizer is enabled. */ @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testAclOperations(quorum: String): Unit = { val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)) @@ -1729,7 +1665,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { * since they can be done within the timeout. New calls should receive exceptions. */ @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testDelayedClose(quorum: String): Unit = { client = createAdminClient val topics = Seq("mytopic", "mytopic2") @@ -1747,7 +1683,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { * timeouts in progress. The calls should be aborted after the hard shutdown timeout elapses. */ @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testForceClose(quorum: String): Unit = { val config = createConfig config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}") @@ -1765,7 +1701,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { * even when the default request timeout is shorter. */ @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testMinimumRequestTimeouts(quorum: String): Unit = { val config = createConfig config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}") @@ -1783,7 +1719,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { * Test injecting timeouts for calls that are in flight. */ @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testCallInFlightTimeouts(quorum: String): Unit = { val config = createConfig config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000000") @@ -1803,7 +1739,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { * Test the consumer group APIs. */ @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testConsumerGroups(quorum: String): Unit = { val config = createConfig client = Admin.create(config) @@ -2072,7 +2008,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testDeleteConsumerGroupOffsets(quorum: String): Unit = { val config = createConfig client = Admin.create(config) @@ -2295,7 +2231,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testElectPreferredLeaders(quorum: String): Unit = { client = createAdminClient @@ -2380,11 +2316,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { ): Unit = { val exception = result.partitions.get.get(topicPartition).get assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass) - if (isKRaftTest()) { - assertEquals(s"No such topic as ${topicPartition.topic()}", exception.getMessage) - } else { - assertEquals("The partition does not exist.", exception.getMessage) - } + assertEquals(s"No such topic as ${topicPartition.topic()}", exception.getMessage) } // unknown topic @@ -2423,15 +2355,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { ): Unit = { val exception = result.partitions.get.get(topicPartition).get assertEquals(classOf[PreferredLeaderNotAvailableException], exception.getClass) - if (isKRaftTest()) { - assertTrue(exception.getMessage.contains( - "The preferred leader was not available."), - s"Unexpected message: ${exception.getMessage}") - } else { - assertTrue(exception.getMessage.contains( - s"Failed to elect leader for partition $topicPartition under strategy PreferredReplicaPartitionLeaderElectionStrategy"), - s"Unexpected message: ${exception.getMessage}") - } + assertTrue(exception.getMessage.contains( + "The preferred leader was not available."), + s"Unexpected message: ${exception.getMessage}") } // ... now what happens if we try to elect the preferred leader and it's down? @@ -2454,7 +2380,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testElectUncleanLeadersForOnePartition(quorum: String): Unit = { // Case: unclean leader election with one topic partition client = createAdminClient @@ -2482,7 +2408,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testElectUncleanLeadersForManyPartitions(quorum: String): Unit = { // Case: unclean leader election with many topic partitions client = createAdminClient @@ -2522,7 +2448,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testElectUncleanLeadersForAllPartitions(quorum: String): Unit = { // Case: noop unclean leader election and valid unclean leader election for all partitions client = createAdminClient @@ -2562,7 +2488,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testElectUncleanLeadersForUnknownPartitions(quorum: String): Unit = { // Case: unclean leader election for unknown topic client = createAdminClient @@ -2588,7 +2514,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testElectUncleanLeadersWhenNoLiveBrokers(quorum: String): Unit = { // Case: unclean leader election with no live brokers client = createAdminClient @@ -2617,7 +2543,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testElectUncleanLeadersNoop(quorum: String): Unit = { // Case: noop unclean leader election with explicit topic partitions client = createAdminClient @@ -2645,7 +2571,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testElectUncleanLeadersAndNoop(quorum: String): Unit = { // Case: one noop unclean leader election and one valid unclean leader election client = createAdminClient @@ -2685,7 +2611,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testListReassignmentsDoesNotShowNonReassigningPartitions(quorum: String): Unit = { client = createAdminClient @@ -2702,7 +2628,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testListReassignmentsDoesNotShowDeletedPartitions(quorum: String): Unit = { client = createAdminClient @@ -2717,7 +2643,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testValidIncrementalAlterConfigs(quorum: String): Unit = { client = createAdminClient @@ -2827,17 +2753,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { topic1Resource -> topic1AlterConfigs ).asJava, new AlterConfigsOptions().validateOnly(true)) - if (isKRaftTest()) { - assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException], - Some("Invalid value zip for configuration compression.type")) - } else { - assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException], - Some("Invalid config value for resource")) - } + assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException], + Some("Invalid value zip for configuration compression.type")) } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testAppendAlreadyExistsConfigsAndSubtractNotExistsConfigs(quorum: String): Unit = { client = createAdminClient @@ -2878,7 +2799,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testIncrementalAlterConfigsDeleteAndSetBrokerConfigs(quorum: String): Unit = { client = createAdminClient val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0") @@ -2912,7 +2833,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testIncrementalAlterConfigsDeleteBrokerConfigs(quorum: String): Unit = { client = createAdminClient val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0") @@ -2949,7 +2870,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testInvalidIncrementalAlterConfigs(quorum: String): Unit = { client = createAdminClient @@ -3013,18 +2934,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet) assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException], - if (isKRaftTest()) { - Some("Can't APPEND to key compression.type because its type is not LIST.") - } else { - Some("Config value append is not allowed for config") - }) + Some("Can't APPEND to key compression.type because its type is not LIST.")) assertFutureExceptionTypeEquals(alterResult.values().get(topic2Resource), classOf[InvalidConfigurationException], - if (isKRaftTest()) { - Some("Can't SUBTRACT to key compression.type because its type is not LIST.") - } else { - Some("Config value subtract is not allowed for config") - }) + Some("Can't SUBTRACT to key compression.type because its type is not LIST.")) // Try to add invalid config topic1AlterConfigs = Seq( @@ -3037,15 +2950,11 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(Set(topic1Resource).asJava, alterResult.values.keySet) assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException], - if (isKRaftTest()) { - Some("Invalid value 1.1 for configuration min.cleanable.dirty.ratio: Value must be no more than 1") - } else { - Some("Invalid config value for resource") - }) + Some("Invalid value 1.1 for configuration min.cleanable.dirty.ratio: Value must be no more than 1")) } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testInvalidAlterPartitionReassignments(quorum: String): Unit = { client = createAdminClient val topic = "alter-reassignments-topic-1" @@ -3085,7 +2994,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testLongTopicNames(quorum: String): Unit = { val client = createAdminClient val longTopicName = String.join("", Collections.nCopies(249, "x")); @@ -3105,17 +3014,13 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { // Verify that createTopics and alterConfigs fail with null values @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testNullConfigs(quorum: String): Unit = { def validateLogConfig(compressionType: String): Unit = { - val logConfig = if (isKRaftTest()) { - ensureConsistentKRaftMetadata() - val topicProps = brokers.head.metadataCache.asInstanceOf[KRaftMetadataCache].topicConfig(topic) - LogConfig.fromProps(Collections.emptyMap[String, AnyRef], topicProps) - } else { - zkClient.getLogConfigs(Set(topic), Collections.emptyMap[String, AnyRef])._1(topic) - } + ensureConsistentKRaftMetadata() + val topicProps = brokers.head.metadataCache.asInstanceOf[KRaftMetadataCache].topicConfig(topic) + val logConfig = LogConfig.fromProps(Collections.emptyMap[String, AnyRef], topicProps) assertEquals(compressionType, logConfig.originals.get(TopicConfig.COMPRESSION_TYPE_CONFIG)) assertNull(logConfig.originals.get(TopicConfig.RETENTION_BYTES_CONFIG)) @@ -3153,7 +3058,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testDescribeConfigsForLog4jLogLevels(quorum: String): Unit = { client = createAdminClient LoggerFactory.getLogger("kafka.cluster.Replica").trace("Message to create the logger") @@ -3170,7 +3075,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) @Disabled // To be re-enabled once KAFKA-8779 is resolved def testIncrementalAlterConfigsForLog4jLogLevels(quorum: String): Unit = { client = createAdminClient @@ -3235,7 +3140,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { * 5. Ensure the kafka.controller.KafkaController logger's level is ERROR (the current root logger level) */ @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) @Disabled // To be re-enabled once KAFKA-8779 is resolved def testIncrementalAlterConfigsForLog4jLogLevelsCanResetLoggerToCurrentRoot(quorum: String): Unit = { client = createAdminClient @@ -3278,8 +3183,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - @Disabled // Zk to be re-enabled once KAFKA-8779 is resolved + @ValueSource(strings = Array("kraft")) + @Disabled // to be re-enabled once KAFKA-8779 is resolved def testIncrementalAlterConfigsForLog4jLogLevelsCannotResetRootLogger(quorum: String): Unit = { client = createAdminClient val deleteRootLoggerEntry = Seq( @@ -3290,7 +3195,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) @Disabled // To be re-enabled once KAFKA-8779 is resolved def testIncrementalAlterConfigsForLog4jLogLevelsDoesNotWorkWithInvalidConfigs(quorum: String): Unit = { client = createAdminClient @@ -3335,7 +3240,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { */ @nowarn("cat=deprecation") @ParameterizedTest - @ValueSource(strings = Array("kraft")) // Zk to be re-enabled once KAFKA-8779 is resolved + @ValueSource(strings = Array("kraft")) def testAlterConfigsForLog4jLogLevelsDoesNotWork(quorum: String): Unit = { client = createAdminClient @@ -3382,13 +3287,13 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testAppendConfigToEmptyDefaultValue(ignored: String): Unit = { testAppendConfig(new Properties(), "0:0", "0:0") } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testAppendConfigToExistentValue(ignored: String): Unit = { val props = new Properties() props.setProperty(QuotaConfigs.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, "1:1") @@ -3450,7 +3355,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { * BaseAdminIntegrationTest.modifyConfigs. */ @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testCreateTopicsReturnsConfigs(quorum: String): Unit = { client = Admin.create(super.createConfig) @@ -3464,17 +3369,15 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { TestUtils.incrementalAlterConfigs(brokers, client, newLogCleanerDeleteRetention, perBrokerConfig = true) .all().get(15, TimeUnit.SECONDS) - if (isKRaftTest()) { - // In KRaft mode, we don't yet support altering configs on controller nodes, except by setting - // default node configs. Therefore, we have to set the dynamic config directly to test this. - val controllerNodeResource = new ConfigResource(ConfigResource.Type.BROKER, - controllerServer.config.nodeId.toString) - controllerServer.controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT, - Collections.singletonMap(controllerNodeResource, - Collections.singletonMap(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP, - new SimpleImmutableEntry(AlterConfigOp.OpType.SET, "34"))), false).get() - ensureConsistentKRaftMetadata() - } + // In KRaft mode, we don't yet support altering configs on controller nodes, except by setting + // default node configs. Therefore, we have to set the dynamic config directly to test this. + val controllerNodeResource = new ConfigResource(ConfigResource.Type.BROKER, + controllerServer.config.nodeId.toString) + controllerServer.controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT, + Collections.singletonMap(controllerNodeResource, + Collections.singletonMap(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP, + new SimpleImmutableEntry(AlterConfigOp.OpType.SET, "34"))), false).get() + ensureConsistentKRaftMetadata() waitUntilTrue(() => brokers.forall(_.config.originals.getOrDefault( CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP, "").toString.equals("34")), @@ -3526,11 +3429,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { topicConfigs.get(TopicConfig.SEGMENT_JITTER_MS_CONFIG)) // From static broker config by the synonym LogRollTimeHoursProp. - val segmentMsPropType = if (isKRaftTest()) { - ConfigSource.STATIC_BROKER_CONFIG - } else { - ConfigSource.DEFAULT_CONFIG - } + val segmentMsPropType = ConfigSource.STATIC_BROKER_CONFIG assertEquals(new ConfigEntry(TopicConfig.SEGMENT_MS_CONFIG, "7200000", segmentMsPropType, false, false, Collections.emptyList(), null, null), topicConfigs.get(TopicConfig.SEGMENT_MS_CONFIG)) @@ -3636,7 +3535,7 @@ object PlaintextAdminIntegrationTest { var topicConfigEntries2 = Seq(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy")).asJava val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, test.brokers.head.config.brokerId.toString) - val brokerConfigEntries = Seq(new ConfigEntry(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")).asJava + val brokerConfigEntries = Seq(new ConfigEntry(ServerConfigs.BROKER_ID_CONFIG, "10")).asJava // Alter configs: first and third are invalid, second is valid var alterResult = admin.alterConfigs(Map( diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala index bda617eaeac..35c11382ffc 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala @@ -14,9 +14,8 @@ package kafka.api import java.time import kafka.security.JaasTestUtils -import kafka.security.authorizer.AclAuthorizer import kafka.utils.TestUtils._ -import kafka.utils.{TestInfoUtils, TestUtils} +import kafka.utils.TestUtils import org.apache.kafka.clients.admin._ import org.apache.kafka.common.Uuid import org.apache.kafka.common.acl._ @@ -31,7 +30,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.utils.SecurityUtils import org.apache.kafka.common.security.token.delegation.DelegationToken import org.apache.kafka.security.authorizer.AclEntry.{WILDCARD_HOST, WILDCARD_PRINCIPAL_STRING} -import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, ServerConfigs, ZkConfigs} +import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, ServerConfigs} import org.apache.kafka.metadata.authorizer.StandardAuthorizer import org.apache.kafka.storage.internals.log.LogConfig import org.junit.jupiter.api.Assertions._ @@ -50,7 +49,6 @@ import scala.util.{Failure, Success, Try} @Timeout(120) class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetup { val clusterResourcePattern = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL) - val zkAuthorizerClassName = classOf[AclAuthorizer].getName val kraftAuthorizerClassName = classOf[StandardAuthorizer].getName val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME) var superUserAdmin: Admin = _ @@ -61,18 +59,11 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu @BeforeEach override def setUp(testInfo: TestInfo): Unit = { - // set this to use delegation token - if (TestInfoUtils.isKRaft(testInfo)) { - this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, kraftAuthorizerClassName) - this.controllerConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, kraftAuthorizerClassName) - // controllers talk to brokers as User:ANONYMOUS therefore it needs to be super user - this.serverConfig.setProperty(StandardAuthorizer.SUPER_USERS_CONFIG, kafkaPrincipal.toString + ";" + KafkaPrincipal.ANONYMOUS.toString) - this.controllerConfig.setProperty(StandardAuthorizer.SUPER_USERS_CONFIG, kafkaPrincipal.toString) - } else { - this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, zkAuthorizerClassName) - this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true") - this.serverConfig.setProperty(AclAuthorizer.SuperUsersProp, kafkaPrincipal.toString) - } + this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, kraftAuthorizerClassName) + this.controllerConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, kraftAuthorizerClassName) + // controllers talk to brokers as User:ANONYMOUS therefore it needs to be super user + this.serverConfig.setProperty(StandardAuthorizer.SUPER_USERS_CONFIG, kafkaPrincipal.toString + ";" + KafkaPrincipal.ANONYMOUS.toString) + this.controllerConfig.setProperty(StandardAuthorizer.SUPER_USERS_CONFIG, kafkaPrincipal.toString) // Enable delegationTokenControlManager this.serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG, secretKey) @@ -140,7 +131,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu @ParameterizedTest @Timeout(30) - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testAclOperationsWithOptionTimeoutMs(quorum: String): Unit = { val config = createConfig // this will cause timeout connecting to broker @@ -159,7 +150,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu @ParameterizedTest @Timeout(30) - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testDeleteAclsWithOptionTimeoutMs(quorum: String): Unit = { val config = createConfig // this will cause timeout connecting to broker @@ -175,7 +166,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu } @ParameterizedTest - @ValueSource(strings = Array("zk","kraft")) + @ValueSource(strings = Array("kraft")) def testExpireDelegationTokenWithOptionExpireTimePeriodMs(quorum: String): Unit = { client = createAdminClient val renewer = List(SecurityUtils.parseKafkaPrincipal("User:renewer")) @@ -211,7 +202,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testAclOperations(quorum: String): Unit = { client = createAdminClient val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL), @@ -233,7 +224,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testAclOperations2(quorum: String): Unit = { client = createAdminClient val results = client.createAcls(List(acl2, acl2, transactionalIdAcl).asJava) @@ -260,7 +251,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testAclDescribe(quorum: String): Unit = { client = createAdminClient ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl)) @@ -288,7 +279,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testAclDelete(quorum: String): Unit = { client = createAdminClient ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl)) @@ -356,7 +347,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu //noinspection ScalaDeprecation - test explicitly covers clients using legacy / deprecated constructors @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testLegacyAclOpsNeverAffectOrReturnPrefixed(quorum: String): Unit = { client = createAdminClient ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl)) // <-- prefixed exists, but should never be returned. @@ -402,7 +393,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testAttemptToCreateInvalidAcls(quorum: String): Unit = { client = createAdminClient val clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "foobar", PatternType.LITERAL), @@ -416,7 +407,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testCreateDelegationTokenWithLargeTimeout(quorum: String): Unit = { client = createAdminClient val timeout = Long.MaxValue @@ -429,7 +420,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testExpiredTimeStampLargerThanMaxLifeStamp(quorum: String): Unit = { client = createAdminClient val timeout = -1 @@ -528,7 +519,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testAclAuthorizationDenied(quorum: String): Unit = { client = createAdminClient @@ -578,7 +569,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testCreateTopicsResponseMetadataAndConfig(quorum: String): Unit = { val topic1 = "mytopic1" val topic2 = "mytopic2" @@ -639,7 +630,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("kraft")) def testExpireDelegationToken(quorum: String): Unit = { client = createAdminClient val createDelegationTokenOptions = new CreateDelegationTokenOptions() diff --git a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala index bc6bd972f3a..5d365e4c05b 100644 --- a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala @@ -17,24 +17,28 @@ import java.util.concurrent._ import java.util.Properties import com.yammer.metrics.core.Gauge import kafka.security.JaasTestUtils -import kafka.security.authorizer.AclAuthorizer import kafka.utils.TestUtils -import org.apache.kafka.clients.admin.CreateAclsResult +import org.apache.kafka.clients.admin.{AdminClientConfig, CreateAclsResult} import org.apache.kafka.common.acl._ import org.apache.kafka.common.config.SslConfigs import org.apache.kafka.common.config.internals.BrokerSecurityConfigs +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType} import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, SecurityProtocol, SslAuthenticationContext} import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder import org.apache.kafka.server.authorizer._ import org.apache.kafka.common.network.ConnectionMode +import org.apache.kafka.common.utils.Utils +import org.apache.kafka.metadata.authorizer.{ClusterMetadataAuthorizer, StandardAuthorizer} import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue} -import org.junit.jupiter.api.{AfterEach, Test} +import org.junit.jupiter.api.{AfterEach, TestInfo} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import scala.jdk.CollectionConverters._ -import scala.collection.mutable +import scala.collection.{Seq, mutable} import scala.compat.java8.OptionConverters object SslAdminIntegrationTest { @@ -45,7 +49,7 @@ object SslAdminIntegrationTest { val serverUser = "server" val clientCn = "client" - class TestableAclAuthorizer extends AclAuthorizer { + class TestableStandardAuthorizer extends StandardAuthorizer with ClusterMetadataAuthorizer { override def createAcls(requestContext: AuthorizableRequestContext, aclBindings: util.List[AclBinding]): util.List[_ <: CompletionStage[AclCreateResult]] = { lastUpdateRequestContext = Some(requestContext) @@ -65,11 +69,10 @@ object SslAdminIntegrationTest { semaphore.foreach(_.acquire()) try { action.apply().asScala.zip(futures).foreach { case (baseFuture, resultFuture) => - baseFuture.whenComplete { (result, exception) => - if (exception != null) - resultFuture.completeExceptionally(exception) - else - resultFuture.complete(result) + try { + resultFuture.complete(baseFuture.toCompletableFuture.get()) + } catch { + case e: Throwable => resultFuture.completeExceptionally(e) } } } finally { @@ -110,19 +113,39 @@ object SslAdminIntegrationTest { } class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { - override val zkAuthorizerClassName: String = classOf[SslAdminIntegrationTest.TestableAclAuthorizer].getName + override val kraftAuthorizerClassName: String = classOf[SslAdminIntegrationTest.TestableStandardAuthorizer].getName this.serverConfig.setProperty(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required") this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[SslAdminIntegrationTest.TestPrincipalBuilder].getName) override protected def securityProtocol = SecurityProtocol.SSL override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SslAdminIntegrationTest.serverUser) + private val extraControllerSecurityProtocol = SecurityProtocol.SSL + override def setUpSasl(): Unit = { SslAdminIntegrationTest.semaphore = None SslAdminIntegrationTest.executor = None SslAdminIntegrationTest.lastUpdateRequestContext = None - startSasl(jaasSections(List.empty, None, ZkSasl)) + startSasl(jaasSections(List.empty, None, KafkaSasl)) + } + + override def createConfig: util.Map[String, Object] = { + val config = new util.HashMap[String, Object] + config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000") + config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "40000") + config + } + + override def kraftControllerConfigs(testInfo: TestInfo): Seq[Properties] = { + val configs = super.kraftControllerConfigs(testInfo) + JaasTestUtils.sslConfigs(ConnectionMode.SERVER, false, OptionConverters.toJava(trustStoreFile), s"controller") + .forEach((k, v) => configs.foreach(c => c.put(k, v))) + configs + } + + override def extraControllerSecurityProtocols(): Seq[SecurityProtocol] = { + Seq(extraControllerSecurityProtocol) } @AfterEach @@ -135,13 +158,15 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { super.tearDown() } - @Test - def testAclUpdatesUsingSynchronousAuthorizer(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testAclUpdatesUsingSynchronousAuthorizer(quorum: String): Unit = { verifyAclUpdates() } - @Test - def testAclUpdatesUsingAsynchronousAuthorizer(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testAclUpdatesUsingAsynchronousAuthorizer(quorum: String): Unit = { SslAdminIntegrationTest.executor = Some(Executors.newSingleThreadExecutor) verifyAclUpdates() } @@ -150,31 +175,36 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { * Verify that ACL updates using synchronous authorizer are performed synchronously * on request threads without any performance overhead introduced by a purgatory. */ - @Test - def testSynchronousAuthorizerAclUpdatesBlockRequestThreads(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testSynchronousAuthorizerAclUpdatesBlockRequestThreads(quorum: String): Unit = { val testSemaphore = new Semaphore(0) SslAdminIntegrationTest.semaphore = Some(testSemaphore) waitForNoBlockedRequestThreads() + useBoostrapControllers() // Queue requests until all threads are blocked. ACL create requests are sent to least loaded // node, so we may need more than `numRequestThreads` requests to block all threads. val aclFutures = mutable.Buffer[CreateAclsResult]() - while (blockedRequestThreads.size < numRequestThreads) { + // In KRaft mode, ACL creation is handled exclusively by controller servers, not brokers. + // Therefore, only the number of controller I/O threads is relevant in this context. + val numReqThreads = controllerServers.head.config.numIoThreads * controllerServers.size + while (blockedRequestThreads.size < numReqThreads) { aclFutures += createAdminClient.createAcls(List(acl2).asJava) - assertTrue(aclFutures.size < numRequestThreads * 10, - s"Request threads not blocked numRequestThreads=$numRequestThreads blocked=$blockedRequestThreads") + assertTrue(aclFutures.size < numReqThreads * 10, + s"Request threads not blocked numRequestThreads=$numReqThreads blocked=$blockedRequestThreads aclFutures=${aclFutures.size}") } assertEquals(0, purgatoryMetric("NumDelayedOperations")) assertEquals(0, purgatoryMetric("PurgatorySize")) // Verify that operations on other clients are blocked - val describeFuture = createAdminClient.describeCluster().clusterId() - assertFalse(describeFuture.isDone) + val listPartitionReassignmentsFuture = createAdminClient.listPartitionReassignments().reassignments() + assertFalse(listPartitionReassignmentsFuture.isDone) // Release the semaphore and verify that all requests complete testSemaphore.release(aclFutures.size) waitForNoBlockedRequestThreads() - assertNotNull(describeFuture.get(10, TimeUnit.SECONDS)) + assertNotNull(listPartitionReassignmentsFuture.get(10, TimeUnit.SECONDS)) // If any of the requests time out since we were blocking the threads earlier, retry the request. val numTimedOut = aclFutures.count { future => try { @@ -197,19 +227,25 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { * Verify that ACL updates using an asynchronous authorizer are completed asynchronously * using a purgatory, enabling other requests to be processed even when ACL updates are blocked. */ - @Test - def testAsynchronousAuthorizerAclUpdatesDontBlockRequestThreads(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testAsynchronousAuthorizerAclUpdatesDontBlockRequestThreads(quorum: String): Unit = { SslAdminIntegrationTest.executor = Some(Executors.newSingleThreadExecutor) val testSemaphore = new Semaphore(0) SslAdminIntegrationTest.semaphore = Some(testSemaphore) waitForNoBlockedRequestThreads() - val aclFutures = (0 until numRequestThreads).map(_ => createAdminClient.createAcls(List(acl2).asJava)) + useBoostrapControllers() + // In KRaft mode, ACL creation is handled exclusively by controller servers, not brokers. + // Therefore, only the number of controller I/O threads is relevant in this context. + val numReqThreads = controllerServers.head.config.numIoThreads * controllerServers.size + val aclFutures = (0 until numReqThreads).map(_ => createAdminClient.createAcls(List(acl2).asJava)) + waitForNoBlockedRequestThreads() assertTrue(aclFutures.forall(future => !future.all.isDone)) // Other requests should succeed even though ACL updates are blocked - assertNotNull(createAdminClient.describeCluster().clusterId().get(10, TimeUnit.SECONDS)) + assertNotNull(createAdminClient.listPartitionReassignments().reassignments().get(10, TimeUnit.SECONDS)) TestUtils.waitUntilTrue(() => purgatoryMetric("PurgatorySize") > 0, "PurgatorySize metrics not updated") TestUtils.waitUntilTrue(() => purgatoryMetric("NumDelayedOperations") > 0, "NumDelayedOperations metrics not updated") @@ -238,6 +274,7 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { val testSemaphore = new Semaphore(0) SslAdminIntegrationTest.semaphore = Some(testSemaphore) + useBoostrapControllers() client = createAdminClient val results = client.createAcls(List(acl2, acl3).asJava).values assertEquals(Set(acl2, acl3), results.keySet().asScala) @@ -267,7 +304,9 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { requestThreads.filter(_.getState == Thread.State.WAITING).toList } - private def numRequestThreads = servers.head.config.numIoThreads * servers.size + private def numRequestThreads = { + brokers.head.config.numIoThreads * (brokers.size + controllerServers.size) + } private def waitForNoBlockedRequestThreads(): Unit = { val (blockedThreads, _) = TestUtils.computeUntilTrue(blockedRequestThreads)(_.isEmpty) @@ -298,4 +337,17 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { props.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG) props } + + private def useBoostrapControllers(): Unit = { + val controllerListenerName = ListenerName.forSecurityProtocol(extraControllerSecurityProtocol) + val config = controllerServers.map { s => + val listener = s.config.effectiveAdvertisedControllerListeners + .find(_.listenerName == controllerListenerName) + .getOrElse(throw new IllegalArgumentException(s"Could not find listener with name $controllerListenerName")) + Utils.formatAddress(listener.host, s.socketServer.boundPort(controllerListenerName)) + }.mkString(",") + + adminClientConfig.remove(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG) + adminClientConfig.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, config) + } } diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index 81353be9249..f98edab1854 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -314,6 +314,10 @@ abstract class QuorumTestHarness extends Logging { newKRaftQuorum(new Properties()) } + protected def extraControllerSecurityProtocols(): Seq[SecurityProtocol] = { + Seq.empty + } + protected def newKRaftQuorum(overridingProps: Properties): KRaftQuorumImplementation = { val propsList = kraftControllerConfigs(testInfo) if (propsList.size != 1) { @@ -331,9 +335,12 @@ abstract class QuorumTestHarness extends Logging { val metadataDir = TestUtils.tempDir() props.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, metadataDir.getAbsolutePath) val proto = controllerListenerSecurityProtocol.toString - props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, s"CONTROLLER:$proto") - props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, s"CONTROLLER://localhost:0") - props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") + val securityProtocolMaps = extraControllerSecurityProtocols().map(sc => sc + ":" + sc).mkString(",") + val listeners = extraControllerSecurityProtocols().map(sc => sc + "://localhost:0").mkString(",") + val listenerNames = extraControllerSecurityProtocols().mkString(",") + props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, s"CONTROLLER:$proto,$securityProtocolMaps") + props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, s"CONTROLLER://localhost:0,$listeners") + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, s"CONTROLLER,$listenerNames") props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$nodeId@localhost:0") // Setting the configuration to the same value set on the brokers via TestUtils to keep KRaft based and Zk based controller configs are consistent. props.setProperty(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, "1000") diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index a833258836a..326bb1ed918 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -399,7 +399,7 @@ object TestUtils extends Logging { ): Admin = { val adminClientProperties = new Properties() adminClientProperties.putAll(adminConfig) - if (!adminClientProperties.containsKey(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)) { + if (!adminClientProperties.containsKey(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG) && !adminClientProperties.containsKey(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG)) { adminClientProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers(brokers, listenerName)) } Admin.create(adminClientProperties)