KAFKA-16974 KRaft support in SslAdminIntegrationTest (#17251)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Kuan-Po Tseng 2024-10-03 01:10:53 +08:00 committed by GitHub
parent 540fb91103
commit b480135b4f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 240 additions and 291 deletions

View File

@ -65,7 +65,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
Map[String, Object](AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()).asJava
override def generateConfigs: collection.Seq[KafkaConfig] = {
val configs = TestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull)
val configs = TestUtils.createBrokerConfigs(brokerCount, null)
configs.foreach(overrideNodeConfigs)
configs.map(KafkaConfig.fromProps)
}
@ -81,7 +81,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testValidAlterConfigs(quorum: String): Unit = {
client = Admin.create(createConfig)
// Create topics
@ -100,7 +100,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testInvalidAlterConfigs(quorum: String): Unit = {
client = Admin.create(createConfig)
PlaintextAdminIntegrationTest.checkInvalidAlterConfigs(this, client)
@ -108,7 +108,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
@nowarn("cat=deprecation")
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testInvalidAlterConfigsDueToPolicy(quorum: String): Unit = {
client = Admin.create(createConfig)

View File

@ -71,7 +71,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testCreateDeleteTopics(quorum: String): Unit = {
client = createAdminClient
val topics = Seq("mytopic", "mytopic2", "mytopic3")
@ -164,7 +164,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testAuthorizedOperations(quorum: String): Unit = {
client = createAdminClient

View File

@ -41,7 +41,7 @@ import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig, SslConfig
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter}
import org.apache.kafka.common.requests.{DeleteRecordsRequest, MetadataResponse}
import org.apache.kafka.common.requests.{DeleteRecordsRequest}
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
import org.apache.kafka.common.utils.{Time, Utils}
@ -50,7 +50,7 @@ import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEX
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.config.{QuotaConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs}
import org.apache.kafka.server.config.{QuotaConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS
import org.junit.jupiter.api.Assertions._
@ -98,7 +98,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
@ParameterizedTest
@Timeout(30)
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testDescribeConfigWithOptionTimeoutMs(quorum: String): Unit = {
val config = createConfig
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}")
@ -119,7 +119,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
@ParameterizedTest
@Timeout(30)
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testAlterConfigsWithOptionTimeoutMs(quorum: String): Unit = {
client = createAdminClient
val config = createConfig
@ -141,7 +141,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testCreatePartitionWithOptionRetryOnQuotaViolation(quorum: String): Unit = {
// Since it's hard to stably reach quota limit in integration test, we only verify quota configs are set correctly
val config = createConfig
@ -167,7 +167,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testDescribeUserScramCredentials(quorum: String): Unit = {
client = createAdminClient
@ -235,7 +235,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
@ParameterizedTest
@Timeout(10)
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testDescribeUserScramCredentialsTimeout(quorum: String): Unit = {
client = createInvalidAdminClient()
try {
@ -266,7 +266,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testDescribeProducers(quorum: String): Unit = {
client = createAdminClient
client.createTopics(Collections.singletonList(new NewTopic(topic, 1, 1.toShort))).all().get()
@ -339,7 +339,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testDescribeTransactions(quorum: String): Unit = {
client = createAdminClient
client.createTopics(Collections.singletonList(new NewTopic(topic, 1, 1.toShort))).all().get()
@ -425,7 +425,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
@ParameterizedTest
@Timeout(10)
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testDescribeTransactionsTimeout(quorum: String): Unit = {
client = createInvalidAdminClient()
try {
@ -440,7 +440,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
@ParameterizedTest
@Timeout(10)
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testAbortTransactionTimeout(quorum: String): Unit = {
client = createInvalidAdminClient()
try {
@ -454,7 +454,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testListTransactions(quorum: String): Unit = {
def createTransactionList(): Unit = {
client = createAdminClient
@ -528,7 +528,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testAbortTransaction(quorum: String): Unit = {
client = createAdminClient
val tp = new TopicPartition("topic1", 0)
@ -593,7 +593,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testClose(quorum: String): Unit = {
val client = createAdminClient
client.close()
@ -601,7 +601,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testListNodes(quorum: String): Unit = {
client = createAdminClient
val brokerStrs = bootstrapServers().split(",").toList.sorted
@ -614,7 +614,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testAdminClientHandlingBadIPWithoutTimeout(quorum: String): Unit = {
val config = createConfig
config.put(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, "1000")
@ -629,7 +629,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testCreateExistingTopicsThrowTopicExistsException(quorum: String): Unit = {
client = createAdminClient
val topic = "mytopic"
@ -646,7 +646,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testDeleteTopicsWithIds(quorum: String): Unit = {
client = createAdminClient
val topics = Seq("mytopic", "mytopic2", "mytopic3")
@ -665,7 +665,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testDeleteTopicsWithOptionTimeoutMs(quorum: String): Unit = {
client = createInvalidAdminClient()
@ -678,7 +678,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testListTopicsWithOptionTimeoutMs(quorum: String): Unit = {
client = createInvalidAdminClient()
@ -691,7 +691,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testListTopicsWithOptionListInternal(quorum: String): Unit = {
client = createAdminClient
@ -700,7 +700,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testDescribeTopicsWithOptionPartitionSizeLimitPerResponse(quorum: String): Unit = {
client = createAdminClient
@ -717,7 +717,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testDescribeTopicsWithOptionTimeoutMs(quorum: String): Unit = {
client = createInvalidAdminClient()
@ -733,7 +733,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
* describe should not auto create topics
*/
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testDescribeNonExistingTopic(quorum: String): Unit = {
client = createAdminClient
@ -745,13 +745,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val results = client.describeTopics(Seq(nonExistingTopic, existingTopic).asJava).topicNameValues()
assertEquals(existingTopic, results.get(existingTopic).get.name)
assertFutureExceptionTypeEquals(results.get(nonExistingTopic), classOf[UnknownTopicOrPartitionException])
if (!isKRaftTest()) {
assertEquals(None, zkClient.getTopicPartitionCount(nonExistingTopic))
}
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testDescribeTopicsWithIds(quorum: String): Unit = {
client = createAdminClient
@ -770,7 +767,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testDescribeTopicsWithNames(quorum: String): Unit = {
client = createAdminClient
@ -785,7 +782,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testDescribeCluster(quorum: String): Unit = {
client = createAdminClient
val result = client.describeCluster
@ -794,14 +791,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertEquals(brokers.head.dataPlaneRequestProcessor.clusterId, clusterId)
val controller = result.controller().get()
if (isKRaftTest()) {
// In KRaft, we return a random brokerId as the current controller.
val brokerIds = brokers.map(_.config.brokerId).toSet
assertTrue(brokerIds.contains(controller.id))
} else {
assertEquals(brokers.head.dataPlaneRequestProcessor.metadataCache.getControllerId.map(_.id).
getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id)
}
// In KRaft, we return a random brokerId as the current controller.
val brokerIds = brokers.map(_.config.brokerId).toSet
assertTrue(brokerIds.contains(controller.id))
val brokerEndpoints = bootstrapServers().split(",")
assertEquals(brokerEndpoints.size, nodes.size)
@ -812,7 +804,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testDescribeLogDirs(quorum: String): Unit = {
client = createAdminClient
val topic = "topic"
@ -844,7 +836,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testDescribeReplicaLogDirs(quorum: String): Unit = {
client = createAdminClient
val topic = "topic"
@ -863,7 +855,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testAlterReplicaLogDirs(quorum: String): Unit = {
client = createAdminClient
val topic = "topic"
@ -953,7 +945,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testDescribeConfigsNonexistent(quorum: String): Unit = {
client = createAdminClient
@ -988,7 +980,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testDescribeAndAlterConfigs(quorum: String): Unit = {
client = createAdminClient
@ -1133,7 +1125,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testCreatePartitions(quorum: String): Unit = {
client = createAdminClient
@ -1198,11 +1190,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
var e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidPartitionsException when newCount is a decrease")
assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
var exceptionMsgStr = if (isKRaftTest()) {
"The topic create-partitions-topic-1 currently has 3 partition(s); 1 would not be an increase."
} else {
"Topic currently has 3 partitions, which is higher than the requested 1."
}
var exceptionMsgStr = "The topic create-partitions-topic-1 currently has 3 partition(s); 1 would not be an increase."
assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic1, expectedNumPartitionsOpt = Some(3)), desc)
@ -1212,11 +1200,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic2).get,
() => s"$desc: Expect InvalidPartitionsException when requesting a noop")
assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
exceptionMsgStr = if (isKRaftTest()) {
"Topic already has 3 partition(s)."
} else {
"Topic already has 3 partitions."
}
exceptionMsgStr = "Topic already has 3 partition(s)."
assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic2, expectedNumPartitionsOpt = Some(3)), desc)
@ -1243,11 +1227,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(unknownTopic).get,
() => s"$desc: Expect InvalidTopicException when using an unknown topic")
assertTrue(e.getCause.isInstanceOf[UnknownTopicOrPartitionException], desc)
exceptionMsgStr = if (isKRaftTest()) {
"This server does not host this topic-partition."
} else {
"The topic 'an-unknown-topic' does not exist."
}
exceptionMsgStr = "This server does not host this topic-partition."
assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
// try an invalid newCount
@ -1256,11 +1236,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidPartitionsException when newCount is invalid")
assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
exceptionMsgStr = if (isKRaftTest()) {
"The topic create-partitions-topic-1 currently has 3 partition(s); -22 would not be an increase."
} else {
"Topic currently has 3 partitions, which is higher than the requested -22."
}
exceptionMsgStr = "The topic create-partitions-topic-1 currently has 3 partition(s); -22 would not be an increase."
assertEquals(exceptionMsgStr, e.getCause.getMessage,
desc)
assertEquals(3, numPartitions(topic1, expectedNumPartitionsOpt = Some(3)), desc)
@ -1271,13 +1247,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidPartitionsException when #brokers != replication factor")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
exceptionMsgStr = if (isKRaftTest()) {
"The manual partition assignment includes a partition with 2 replica(s), but this is not " +
exceptionMsgStr = "The manual partition assignment includes a partition with 2 replica(s), but this is not " +
"consistent with previous partitions, which have 1 replica(s)."
} else {
"Inconsistent replication factor between partitions, partition 0 has 1 while partitions [3] " +
"have replication factors [2], respectively."
}
assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic1, expectedNumPartitionsOpt = Some(3)), desc)
@ -1287,11 +1258,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidReplicaAssignmentException when #assignments != newCount - oldCount")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
exceptionMsgStr = if (isKRaftTest()) {
"Attempted to add 3 additional partition(s), but only 1 assignment(s) were specified."
} else {
"Increasing the number of partitions by 3 but 1 assignments provided."
}
exceptionMsgStr = "Attempted to add 3 additional partition(s), but only 1 assignment(s) were specified."
assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic1, expectedNumPartitionsOpt = Some(3)), desc)
@ -1300,11 +1267,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
NewPartitions.increaseTo(4, asList(asList(1), asList(2)))).asJava, option)
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidReplicaAssignmentException when #assignments != newCount - oldCount")
exceptionMsgStr = if (isKRaftTest()) {
"Attempted to add 1 additional partition(s), but only 2 assignment(s) were specified."
} else {
"Increasing the number of partitions by 1 but 2 assignments provided."
}
exceptionMsgStr = "Attempted to add 1 additional partition(s), but only 2 assignment(s) were specified."
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic1, expectedNumPartitionsOpt = Some(3)), desc)
@ -1315,11 +1278,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidReplicaAssignmentException when assignments has duplicate brokers")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
exceptionMsgStr = if (isKRaftTest()) {
"The manual partition assignment includes the broker 1 more than once."
} else {
"Duplicate brokers not allowed in replica assignment: 1, 1 for partition id 3."
}
exceptionMsgStr = "The manual partition assignment includes the broker 1 more than once."
assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic1, expectedNumPartitionsOpt = Some(3)), desc)
@ -1329,13 +1288,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidReplicaAssignmentException when assignments have differently sized inner lists")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
exceptionMsgStr = if (isKRaftTest()) {
"The manual partition assignment includes a partition with 2 replica(s), but this is not " +
exceptionMsgStr = "The manual partition assignment includes a partition with 2 replica(s), but this is not " +
"consistent with previous partitions, which have 1 replica(s)."
} else {
"Inconsistent replication factor between partitions, partition 0 has 1 " +
"while partitions [4] have replication factors [2], respectively."
}
assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic1, expectedNumPartitionsOpt = Some(3)), desc)
@ -1345,11 +1299,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidReplicaAssignmentException when assignments contains an unknown broker")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
exceptionMsgStr = if (isKRaftTest()) {
"The manual partition assignment includes broker 12, but no such broker is registered."
} else {
"Unknown broker(s) in replica assignment: 12."
}
exceptionMsgStr = "The manual partition assignment includes broker 12, but no such broker is registered."
assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic1, expectedNumPartitionsOpt = Some(3)), desc)
@ -1359,11 +1309,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidReplicaAssignmentException when assignments is empty")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
exceptionMsgStr = if (isKRaftTest()) {
"Attempted to add 1 additional partition(s), but only 0 assignment(s) were specified."
} else {
"Increasing the number of partitions by 1 but 0 assignments provided."
}
exceptionMsgStr = "Attempted to add 1 additional partition(s), but only 0 assignment(s) were specified."
assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic1, expectedNumPartitionsOpt = Some(3)), desc)
}
@ -1377,34 +1323,24 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
TestUtils.waitForAllPartitionsMetadata(brokers, topic1, expectedNumPartitions = 4)
var e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic2).get)
assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException])
val exceptionMsgStr = if (isKRaftTest()) {
"The topic create-partitions-topic-2 currently has 3 partition(s); 2 would not be an increase."
} else {
"Topic currently has 3 partitions, which is higher than the requested 2."
}
val exceptionMsgStr = "The topic create-partitions-topic-2 currently has 3 partition(s); 2 would not be an increase."
assertEquals(exceptionMsgStr, e.getCause.getMessage)
TestUtils.waitForAllPartitionsMetadata(brokers, topic2, expectedNumPartitions = 3)
// Delete the topic. Verify addition of partitions to deleted topic is not possible. In
// Zookeeper mode, the topic is queued for deletion. In KRaft, the deletion occurs
// immediately and hence we have a different Exception thrown in the response.
// Delete the topic. Verify addition of partitions to deleted topic is not possible.
// In KRaft, the deletion occurs immediately and hence we have a different Exception thrown in the response.
val deleteResult = client.deleteTopics(asList(topic1))
deleteResult.topicNameValues.get(topic1).get
alterResult = client.createPartitions(Map(topic1 ->
NewPartitions.increaseTo(4)).asJava, validateOnly)
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
() => "Expect InvalidTopicException or UnknownTopicOrPartitionException when the topic is queued for deletion")
if (isKRaftTest()) {
assertTrue(e.getCause.isInstanceOf[UnknownTopicOrPartitionException], e.toString)
assertEquals("This server does not host this topic-partition.", e.getCause.getMessage)
} else {
assertTrue(e.getCause.isInstanceOf[InvalidTopicException], e.toString)
assertEquals("The topic is queued for deletion.", e.getCause.getMessage)
}
assertTrue(e.getCause.isInstanceOf[UnknownTopicOrPartitionException], e.toString)
assertEquals("This server does not host this topic-partition.", e.getCause.getMessage)
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testSeekAfterDeleteRecords(quorum: String): Unit = {
createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
@ -1434,7 +1370,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testLogStartOffsetCheckpoint(quorum: String): Unit = {
createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
@ -1474,7 +1410,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testLogStartOffsetAfterDeleteRecords(quorum: String): Unit = {
createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
@ -1495,7 +1431,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(quorum: String): Unit = {
val leaders = createTopic(topic, replicationFactor = brokerCount)
val followerIndex = if (leaders(0) != brokers.head.config.brokerId) 0 else 1
@ -1544,7 +1480,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testAlterLogDirsAfterDeleteRecords(quorum: String): Unit = {
client = createAdminClient
createTopic(topic, replicationFactor = brokerCount)
@ -1577,7 +1513,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testOffsetsForTimesAfterDeleteRecords(quorum: String): Unit = {
createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
@ -1600,7 +1536,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testConsumeAfterDeleteRecords(quorum: String): Unit = {
val consumer = createConsumer()
subscribeAndWaitForAssignment(topic, consumer)
@ -1624,7 +1560,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testDeleteRecordsWithException(quorum: String): Unit = {
val consumer = createConsumer()
subscribeAndWaitForAssignment(topic, consumer)
@ -1644,7 +1580,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testDescribeConfigsForTopic(quorum: String): Unit = {
createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
client = createAdminClient
@ -1664,7 +1600,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testIncludeDocumentation(quorum: String): Unit = {
createTopic(topic)
client = createAdminClient
@ -1700,7 +1636,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testInvalidAlterConfigs(quorum: String): Unit = {
client = createAdminClient
checkInvalidAlterConfigs(this, client)
@ -1712,7 +1648,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
* when the authorizer is enabled.
*/
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testAclOperations(quorum: String): Unit = {
val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
@ -1729,7 +1665,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
* since they can be done within the timeout. New calls should receive exceptions.
*/
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testDelayedClose(quorum: String): Unit = {
client = createAdminClient
val topics = Seq("mytopic", "mytopic2")
@ -1747,7 +1683,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
* timeouts in progress. The calls should be aborted after the hard shutdown timeout elapses.
*/
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testForceClose(quorum: String): Unit = {
val config = createConfig
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}")
@ -1765,7 +1701,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
* even when the default request timeout is shorter.
*/
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testMinimumRequestTimeouts(quorum: String): Unit = {
val config = createConfig
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}")
@ -1783,7 +1719,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
* Test injecting timeouts for calls that are in flight.
*/
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testCallInFlightTimeouts(quorum: String): Unit = {
val config = createConfig
config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000000")
@ -1803,7 +1739,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
* Test the consumer group APIs.
*/
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testConsumerGroups(quorum: String): Unit = {
val config = createConfig
client = Admin.create(config)
@ -2072,7 +2008,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testDeleteConsumerGroupOffsets(quorum: String): Unit = {
val config = createConfig
client = Admin.create(config)
@ -2295,7 +2231,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testElectPreferredLeaders(quorum: String): Unit = {
client = createAdminClient
@ -2380,11 +2316,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
): Unit = {
val exception = result.partitions.get.get(topicPartition).get
assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass)
if (isKRaftTest()) {
assertEquals(s"No such topic as ${topicPartition.topic()}", exception.getMessage)
} else {
assertEquals("The partition does not exist.", exception.getMessage)
}
assertEquals(s"No such topic as ${topicPartition.topic()}", exception.getMessage)
}
// unknown topic
@ -2423,15 +2355,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
): Unit = {
val exception = result.partitions.get.get(topicPartition).get
assertEquals(classOf[PreferredLeaderNotAvailableException], exception.getClass)
if (isKRaftTest()) {
assertTrue(exception.getMessage.contains(
"The preferred leader was not available."),
s"Unexpected message: ${exception.getMessage}")
} else {
assertTrue(exception.getMessage.contains(
s"Failed to elect leader for partition $topicPartition under strategy PreferredReplicaPartitionLeaderElectionStrategy"),
s"Unexpected message: ${exception.getMessage}")
}
assertTrue(exception.getMessage.contains(
"The preferred leader was not available."),
s"Unexpected message: ${exception.getMessage}")
}
// ... now what happens if we try to elect the preferred leader and it's down?
@ -2454,7 +2380,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testElectUncleanLeadersForOnePartition(quorum: String): Unit = {
// Case: unclean leader election with one topic partition
client = createAdminClient
@ -2482,7 +2408,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testElectUncleanLeadersForManyPartitions(quorum: String): Unit = {
// Case: unclean leader election with many topic partitions
client = createAdminClient
@ -2522,7 +2448,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testElectUncleanLeadersForAllPartitions(quorum: String): Unit = {
// Case: noop unclean leader election and valid unclean leader election for all partitions
client = createAdminClient
@ -2562,7 +2488,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testElectUncleanLeadersForUnknownPartitions(quorum: String): Unit = {
// Case: unclean leader election for unknown topic
client = createAdminClient
@ -2588,7 +2514,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testElectUncleanLeadersWhenNoLiveBrokers(quorum: String): Unit = {
// Case: unclean leader election with no live brokers
client = createAdminClient
@ -2617,7 +2543,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testElectUncleanLeadersNoop(quorum: String): Unit = {
// Case: noop unclean leader election with explicit topic partitions
client = createAdminClient
@ -2645,7 +2571,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testElectUncleanLeadersAndNoop(quorum: String): Unit = {
// Case: one noop unclean leader election and one valid unclean leader election
client = createAdminClient
@ -2685,7 +2611,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testListReassignmentsDoesNotShowNonReassigningPartitions(quorum: String): Unit = {
client = createAdminClient
@ -2702,7 +2628,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testListReassignmentsDoesNotShowDeletedPartitions(quorum: String): Unit = {
client = createAdminClient
@ -2717,7 +2643,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testValidIncrementalAlterConfigs(quorum: String): Unit = {
client = createAdminClient
@ -2827,17 +2753,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
topic1Resource -> topic1AlterConfigs
).asJava, new AlterConfigsOptions().validateOnly(true))
if (isKRaftTest()) {
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException],
Some("Invalid value zip for configuration compression.type"))
} else {
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException],
Some("Invalid config value for resource"))
}
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException],
Some("Invalid value zip for configuration compression.type"))
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testAppendAlreadyExistsConfigsAndSubtractNotExistsConfigs(quorum: String): Unit = {
client = createAdminClient
@ -2878,7 +2799,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testIncrementalAlterConfigsDeleteAndSetBrokerConfigs(quorum: String): Unit = {
client = createAdminClient
val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
@ -2912,7 +2833,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testIncrementalAlterConfigsDeleteBrokerConfigs(quorum: String): Unit = {
client = createAdminClient
val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
@ -2949,7 +2870,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testInvalidIncrementalAlterConfigs(quorum: String): Unit = {
client = createAdminClient
@ -3013,18 +2934,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet)
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException],
if (isKRaftTest()) {
Some("Can't APPEND to key compression.type because its type is not LIST.")
} else {
Some("Config value append is not allowed for config")
})
Some("Can't APPEND to key compression.type because its type is not LIST."))
assertFutureExceptionTypeEquals(alterResult.values().get(topic2Resource), classOf[InvalidConfigurationException],
if (isKRaftTest()) {
Some("Can't SUBTRACT to key compression.type because its type is not LIST.")
} else {
Some("Config value subtract is not allowed for config")
})
Some("Can't SUBTRACT to key compression.type because its type is not LIST."))
// Try to add invalid config
topic1AlterConfigs = Seq(
@ -3037,15 +2950,11 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertEquals(Set(topic1Resource).asJava, alterResult.values.keySet)
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException],
if (isKRaftTest()) {
Some("Invalid value 1.1 for configuration min.cleanable.dirty.ratio: Value must be no more than 1")
} else {
Some("Invalid config value for resource")
})
Some("Invalid value 1.1 for configuration min.cleanable.dirty.ratio: Value must be no more than 1"))
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testInvalidAlterPartitionReassignments(quorum: String): Unit = {
client = createAdminClient
val topic = "alter-reassignments-topic-1"
@ -3085,7 +2994,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testLongTopicNames(quorum: String): Unit = {
val client = createAdminClient
val longTopicName = String.join("", Collections.nCopies(249, "x"));
@ -3105,17 +3014,13 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
// Verify that createTopics and alterConfigs fail with null values
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testNullConfigs(quorum: String): Unit = {
def validateLogConfig(compressionType: String): Unit = {
val logConfig = if (isKRaftTest()) {
ensureConsistentKRaftMetadata()
val topicProps = brokers.head.metadataCache.asInstanceOf[KRaftMetadataCache].topicConfig(topic)
LogConfig.fromProps(Collections.emptyMap[String, AnyRef], topicProps)
} else {
zkClient.getLogConfigs(Set(topic), Collections.emptyMap[String, AnyRef])._1(topic)
}
ensureConsistentKRaftMetadata()
val topicProps = brokers.head.metadataCache.asInstanceOf[KRaftMetadataCache].topicConfig(topic)
val logConfig = LogConfig.fromProps(Collections.emptyMap[String, AnyRef], topicProps)
assertEquals(compressionType, logConfig.originals.get(TopicConfig.COMPRESSION_TYPE_CONFIG))
assertNull(logConfig.originals.get(TopicConfig.RETENTION_BYTES_CONFIG))
@ -3153,7 +3058,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testDescribeConfigsForLog4jLogLevels(quorum: String): Unit = {
client = createAdminClient
LoggerFactory.getLogger("kafka.cluster.Replica").trace("Message to create the logger")
@ -3170,7 +3075,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
@Disabled // To be re-enabled once KAFKA-8779 is resolved
def testIncrementalAlterConfigsForLog4jLogLevels(quorum: String): Unit = {
client = createAdminClient
@ -3235,7 +3140,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
* 5. Ensure the kafka.controller.KafkaController logger's level is ERROR (the current root logger level)
*/
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
@Disabled // To be re-enabled once KAFKA-8779 is resolved
def testIncrementalAlterConfigsForLog4jLogLevelsCanResetLoggerToCurrentRoot(quorum: String): Unit = {
client = createAdminClient
@ -3278,8 +3183,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@Disabled // Zk to be re-enabled once KAFKA-8779 is resolved
@ValueSource(strings = Array("kraft"))
@Disabled // to be re-enabled once KAFKA-8779 is resolved
def testIncrementalAlterConfigsForLog4jLogLevelsCannotResetRootLogger(quorum: String): Unit = {
client = createAdminClient
val deleteRootLoggerEntry = Seq(
@ -3290,7 +3195,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
@Disabled // To be re-enabled once KAFKA-8779 is resolved
def testIncrementalAlterConfigsForLog4jLogLevelsDoesNotWorkWithInvalidConfigs(quorum: String): Unit = {
client = createAdminClient
@ -3335,7 +3240,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
*/
@nowarn("cat=deprecation")
@ParameterizedTest
@ValueSource(strings = Array("kraft")) // Zk to be re-enabled once KAFKA-8779 is resolved
@ValueSource(strings = Array("kraft"))
def testAlterConfigsForLog4jLogLevelsDoesNotWork(quorum: String): Unit = {
client = createAdminClient
@ -3382,13 +3287,13 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testAppendConfigToEmptyDefaultValue(ignored: String): Unit = {
testAppendConfig(new Properties(), "0:0", "0:0")
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testAppendConfigToExistentValue(ignored: String): Unit = {
val props = new Properties()
props.setProperty(QuotaConfigs.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, "1:1")
@ -3450,7 +3355,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
* BaseAdminIntegrationTest.modifyConfigs.
*/
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testCreateTopicsReturnsConfigs(quorum: String): Unit = {
client = Admin.create(super.createConfig)
@ -3464,17 +3369,15 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
TestUtils.incrementalAlterConfigs(brokers, client, newLogCleanerDeleteRetention, perBrokerConfig = true)
.all().get(15, TimeUnit.SECONDS)
if (isKRaftTest()) {
// In KRaft mode, we don't yet support altering configs on controller nodes, except by setting
// default node configs. Therefore, we have to set the dynamic config directly to test this.
val controllerNodeResource = new ConfigResource(ConfigResource.Type.BROKER,
controllerServer.config.nodeId.toString)
controllerServer.controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT,
Collections.singletonMap(controllerNodeResource,
Collections.singletonMap(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP,
new SimpleImmutableEntry(AlterConfigOp.OpType.SET, "34"))), false).get()
ensureConsistentKRaftMetadata()
}
// In KRaft mode, we don't yet support altering configs on controller nodes, except by setting
// default node configs. Therefore, we have to set the dynamic config directly to test this.
val controllerNodeResource = new ConfigResource(ConfigResource.Type.BROKER,
controllerServer.config.nodeId.toString)
controllerServer.controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT,
Collections.singletonMap(controllerNodeResource,
Collections.singletonMap(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP,
new SimpleImmutableEntry(AlterConfigOp.OpType.SET, "34"))), false).get()
ensureConsistentKRaftMetadata()
waitUntilTrue(() => brokers.forall(_.config.originals.getOrDefault(
CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP, "").toString.equals("34")),
@ -3526,11 +3429,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
topicConfigs.get(TopicConfig.SEGMENT_JITTER_MS_CONFIG))
// From static broker config by the synonym LogRollTimeHoursProp.
val segmentMsPropType = if (isKRaftTest()) {
ConfigSource.STATIC_BROKER_CONFIG
} else {
ConfigSource.DEFAULT_CONFIG
}
val segmentMsPropType = ConfigSource.STATIC_BROKER_CONFIG
assertEquals(new ConfigEntry(TopicConfig.SEGMENT_MS_CONFIG, "7200000",
segmentMsPropType, false, false, Collections.emptyList(), null, null),
topicConfigs.get(TopicConfig.SEGMENT_MS_CONFIG))
@ -3636,7 +3535,7 @@ object PlaintextAdminIntegrationTest {
var topicConfigEntries2 = Seq(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy")).asJava
val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, test.brokers.head.config.brokerId.toString)
val brokerConfigEntries = Seq(new ConfigEntry(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")).asJava
val brokerConfigEntries = Seq(new ConfigEntry(ServerConfigs.BROKER_ID_CONFIG, "10")).asJava
// Alter configs: first and third are invalid, second is valid
var alterResult = admin.alterConfigs(Map(

View File

@ -14,9 +14,8 @@ package kafka.api
import java.time
import kafka.security.JaasTestUtils
import kafka.security.authorizer.AclAuthorizer
import kafka.utils.TestUtils._
import kafka.utils.{TestInfoUtils, TestUtils}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.acl._
@ -31,7 +30,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.SecurityUtils
import org.apache.kafka.common.security.token.delegation.DelegationToken
import org.apache.kafka.security.authorizer.AclEntry.{WILDCARD_HOST, WILDCARD_PRINCIPAL_STRING}
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, ServerConfigs, ZkConfigs}
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, ServerConfigs}
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.storage.internals.log.LogConfig
import org.junit.jupiter.api.Assertions._
@ -50,7 +49,6 @@ import scala.util.{Failure, Success, Try}
@Timeout(120)
class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetup {
val clusterResourcePattern = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
val zkAuthorizerClassName = classOf[AclAuthorizer].getName
val kraftAuthorizerClassName = classOf[StandardAuthorizer].getName
val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME)
var superUserAdmin: Admin = _
@ -61,18 +59,11 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
// set this to use delegation token
if (TestInfoUtils.isKRaft(testInfo)) {
this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, kraftAuthorizerClassName)
this.controllerConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, kraftAuthorizerClassName)
// controllers talk to brokers as User:ANONYMOUS therefore it needs to be super user
this.serverConfig.setProperty(StandardAuthorizer.SUPER_USERS_CONFIG, kafkaPrincipal.toString + ";" + KafkaPrincipal.ANONYMOUS.toString)
this.controllerConfig.setProperty(StandardAuthorizer.SUPER_USERS_CONFIG, kafkaPrincipal.toString)
} else {
this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, zkAuthorizerClassName)
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true")
this.serverConfig.setProperty(AclAuthorizer.SuperUsersProp, kafkaPrincipal.toString)
}
this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, kraftAuthorizerClassName)
this.controllerConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, kraftAuthorizerClassName)
// controllers talk to brokers as User:ANONYMOUS therefore it needs to be super user
this.serverConfig.setProperty(StandardAuthorizer.SUPER_USERS_CONFIG, kafkaPrincipal.toString + ";" + KafkaPrincipal.ANONYMOUS.toString)
this.controllerConfig.setProperty(StandardAuthorizer.SUPER_USERS_CONFIG, kafkaPrincipal.toString)
// Enable delegationTokenControlManager
this.serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG, secretKey)
@ -140,7 +131,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
@ParameterizedTest
@Timeout(30)
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testAclOperationsWithOptionTimeoutMs(quorum: String): Unit = {
val config = createConfig
// this will cause timeout connecting to broker
@ -159,7 +150,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
@ParameterizedTest
@Timeout(30)
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testDeleteAclsWithOptionTimeoutMs(quorum: String): Unit = {
val config = createConfig
// this will cause timeout connecting to broker
@ -175,7 +166,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
}
@ParameterizedTest
@ValueSource(strings = Array("zk","kraft"))
@ValueSource(strings = Array("kraft"))
def testExpireDelegationTokenWithOptionExpireTimePeriodMs(quorum: String): Unit = {
client = createAdminClient
val renewer = List(SecurityUtils.parseKafkaPrincipal("User:renewer"))
@ -211,7 +202,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testAclOperations(quorum: String): Unit = {
client = createAdminClient
val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL),
@ -233,7 +224,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testAclOperations2(quorum: String): Unit = {
client = createAdminClient
val results = client.createAcls(List(acl2, acl2, transactionalIdAcl).asJava)
@ -260,7 +251,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testAclDescribe(quorum: String): Unit = {
client = createAdminClient
ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl))
@ -288,7 +279,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testAclDelete(quorum: String): Unit = {
client = createAdminClient
ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl))
@ -356,7 +347,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
//noinspection ScalaDeprecation - test explicitly covers clients using legacy / deprecated constructors
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testLegacyAclOpsNeverAffectOrReturnPrefixed(quorum: String): Unit = {
client = createAdminClient
ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl)) // <-- prefixed exists, but should never be returned.
@ -402,7 +393,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testAttemptToCreateInvalidAcls(quorum: String): Unit = {
client = createAdminClient
val clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "foobar", PatternType.LITERAL),
@ -416,7 +407,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testCreateDelegationTokenWithLargeTimeout(quorum: String): Unit = {
client = createAdminClient
val timeout = Long.MaxValue
@ -429,7 +420,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testExpiredTimeStampLargerThanMaxLifeStamp(quorum: String): Unit = {
client = createAdminClient
val timeout = -1
@ -528,7 +519,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testAclAuthorizationDenied(quorum: String): Unit = {
client = createAdminClient
@ -578,7 +569,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testCreateTopicsResponseMetadataAndConfig(quorum: String): Unit = {
val topic1 = "mytopic1"
val topic2 = "mytopic2"
@ -639,7 +630,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testExpireDelegationToken(quorum: String): Unit = {
client = createAdminClient
val createDelegationTokenOptions = new CreateDelegationTokenOptions()

View File

@ -17,24 +17,28 @@ import java.util.concurrent._
import java.util.Properties
import com.yammer.metrics.core.Gauge
import kafka.security.JaasTestUtils
import kafka.security.authorizer.AclAuthorizer
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.CreateAclsResult
import org.apache.kafka.clients.admin.{AdminClientConfig, CreateAclsResult}
import org.apache.kafka.common.acl._
import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, SecurityProtocol, SslAuthenticationContext}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.server.authorizer._
import org.apache.kafka.common.network.ConnectionMode
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.authorizer.{ClusterMetadataAuthorizer, StandardAuthorizer}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue}
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.api.{AfterEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
import scala.collection.mutable
import scala.collection.{Seq, mutable}
import scala.compat.java8.OptionConverters
object SslAdminIntegrationTest {
@ -45,7 +49,7 @@ object SslAdminIntegrationTest {
val serverUser = "server"
val clientCn = "client"
class TestableAclAuthorizer extends AclAuthorizer {
class TestableStandardAuthorizer extends StandardAuthorizer with ClusterMetadataAuthorizer {
override def createAcls(requestContext: AuthorizableRequestContext,
aclBindings: util.List[AclBinding]): util.List[_ <: CompletionStage[AclCreateResult]] = {
lastUpdateRequestContext = Some(requestContext)
@ -65,11 +69,10 @@ object SslAdminIntegrationTest {
semaphore.foreach(_.acquire())
try {
action.apply().asScala.zip(futures).foreach { case (baseFuture, resultFuture) =>
baseFuture.whenComplete { (result, exception) =>
if (exception != null)
resultFuture.completeExceptionally(exception)
else
resultFuture.complete(result)
try {
resultFuture.complete(baseFuture.toCompletableFuture.get())
} catch {
case e: Throwable => resultFuture.completeExceptionally(e)
}
}
} finally {
@ -110,19 +113,39 @@ object SslAdminIntegrationTest {
}
class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
override val zkAuthorizerClassName: String = classOf[SslAdminIntegrationTest.TestableAclAuthorizer].getName
override val kraftAuthorizerClassName: String = classOf[SslAdminIntegrationTest.TestableStandardAuthorizer].getName
this.serverConfig.setProperty(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required")
this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[SslAdminIntegrationTest.TestPrincipalBuilder].getName)
override protected def securityProtocol = SecurityProtocol.SSL
override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SslAdminIntegrationTest.serverUser)
private val extraControllerSecurityProtocol = SecurityProtocol.SSL
override def setUpSasl(): Unit = {
SslAdminIntegrationTest.semaphore = None
SslAdminIntegrationTest.executor = None
SslAdminIntegrationTest.lastUpdateRequestContext = None
startSasl(jaasSections(List.empty, None, ZkSasl))
startSasl(jaasSections(List.empty, None, KafkaSasl))
}
override def createConfig: util.Map[String, Object] = {
val config = new util.HashMap[String, Object]
config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000")
config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "40000")
config
}
override def kraftControllerConfigs(testInfo: TestInfo): Seq[Properties] = {
val configs = super.kraftControllerConfigs(testInfo)
JaasTestUtils.sslConfigs(ConnectionMode.SERVER, false, OptionConverters.toJava(trustStoreFile), s"controller")
.forEach((k, v) => configs.foreach(c => c.put(k, v)))
configs
}
override def extraControllerSecurityProtocols(): Seq[SecurityProtocol] = {
Seq(extraControllerSecurityProtocol)
}
@AfterEach
@ -135,13 +158,15 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
super.tearDown()
}
@Test
def testAclUpdatesUsingSynchronousAuthorizer(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAclUpdatesUsingSynchronousAuthorizer(quorum: String): Unit = {
verifyAclUpdates()
}
@Test
def testAclUpdatesUsingAsynchronousAuthorizer(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAclUpdatesUsingAsynchronousAuthorizer(quorum: String): Unit = {
SslAdminIntegrationTest.executor = Some(Executors.newSingleThreadExecutor)
verifyAclUpdates()
}
@ -150,31 +175,36 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
* Verify that ACL updates using synchronous authorizer are performed synchronously
* on request threads without any performance overhead introduced by a purgatory.
*/
@Test
def testSynchronousAuthorizerAclUpdatesBlockRequestThreads(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testSynchronousAuthorizerAclUpdatesBlockRequestThreads(quorum: String): Unit = {
val testSemaphore = new Semaphore(0)
SslAdminIntegrationTest.semaphore = Some(testSemaphore)
waitForNoBlockedRequestThreads()
useBoostrapControllers()
// Queue requests until all threads are blocked. ACL create requests are sent to least loaded
// node, so we may need more than `numRequestThreads` requests to block all threads.
val aclFutures = mutable.Buffer[CreateAclsResult]()
while (blockedRequestThreads.size < numRequestThreads) {
// In KRaft mode, ACL creation is handled exclusively by controller servers, not brokers.
// Therefore, only the number of controller I/O threads is relevant in this context.
val numReqThreads = controllerServers.head.config.numIoThreads * controllerServers.size
while (blockedRequestThreads.size < numReqThreads) {
aclFutures += createAdminClient.createAcls(List(acl2).asJava)
assertTrue(aclFutures.size < numRequestThreads * 10,
s"Request threads not blocked numRequestThreads=$numRequestThreads blocked=$blockedRequestThreads")
assertTrue(aclFutures.size < numReqThreads * 10,
s"Request threads not blocked numRequestThreads=$numReqThreads blocked=$blockedRequestThreads aclFutures=${aclFutures.size}")
}
assertEquals(0, purgatoryMetric("NumDelayedOperations"))
assertEquals(0, purgatoryMetric("PurgatorySize"))
// Verify that operations on other clients are blocked
val describeFuture = createAdminClient.describeCluster().clusterId()
assertFalse(describeFuture.isDone)
val listPartitionReassignmentsFuture = createAdminClient.listPartitionReassignments().reassignments()
assertFalse(listPartitionReassignmentsFuture.isDone)
// Release the semaphore and verify that all requests complete
testSemaphore.release(aclFutures.size)
waitForNoBlockedRequestThreads()
assertNotNull(describeFuture.get(10, TimeUnit.SECONDS))
assertNotNull(listPartitionReassignmentsFuture.get(10, TimeUnit.SECONDS))
// If any of the requests time out since we were blocking the threads earlier, retry the request.
val numTimedOut = aclFutures.count { future =>
try {
@ -197,19 +227,25 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
* Verify that ACL updates using an asynchronous authorizer are completed asynchronously
* using a purgatory, enabling other requests to be processed even when ACL updates are blocked.
*/
@Test
def testAsynchronousAuthorizerAclUpdatesDontBlockRequestThreads(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAsynchronousAuthorizerAclUpdatesDontBlockRequestThreads(quorum: String): Unit = {
SslAdminIntegrationTest.executor = Some(Executors.newSingleThreadExecutor)
val testSemaphore = new Semaphore(0)
SslAdminIntegrationTest.semaphore = Some(testSemaphore)
waitForNoBlockedRequestThreads()
val aclFutures = (0 until numRequestThreads).map(_ => createAdminClient.createAcls(List(acl2).asJava))
useBoostrapControllers()
// In KRaft mode, ACL creation is handled exclusively by controller servers, not brokers.
// Therefore, only the number of controller I/O threads is relevant in this context.
val numReqThreads = controllerServers.head.config.numIoThreads * controllerServers.size
val aclFutures = (0 until numReqThreads).map(_ => createAdminClient.createAcls(List(acl2).asJava))
waitForNoBlockedRequestThreads()
assertTrue(aclFutures.forall(future => !future.all.isDone))
// Other requests should succeed even though ACL updates are blocked
assertNotNull(createAdminClient.describeCluster().clusterId().get(10, TimeUnit.SECONDS))
assertNotNull(createAdminClient.listPartitionReassignments().reassignments().get(10, TimeUnit.SECONDS))
TestUtils.waitUntilTrue(() => purgatoryMetric("PurgatorySize") > 0, "PurgatorySize metrics not updated")
TestUtils.waitUntilTrue(() => purgatoryMetric("NumDelayedOperations") > 0, "NumDelayedOperations metrics not updated")
@ -238,6 +274,7 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
val testSemaphore = new Semaphore(0)
SslAdminIntegrationTest.semaphore = Some(testSemaphore)
useBoostrapControllers()
client = createAdminClient
val results = client.createAcls(List(acl2, acl3).asJava).values
assertEquals(Set(acl2, acl3), results.keySet().asScala)
@ -267,7 +304,9 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
requestThreads.filter(_.getState == Thread.State.WAITING).toList
}
private def numRequestThreads = servers.head.config.numIoThreads * servers.size
private def numRequestThreads = {
brokers.head.config.numIoThreads * (brokers.size + controllerServers.size)
}
private def waitForNoBlockedRequestThreads(): Unit = {
val (blockedThreads, _) = TestUtils.computeUntilTrue(blockedRequestThreads)(_.isEmpty)
@ -298,4 +337,17 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
props.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)
props
}
private def useBoostrapControllers(): Unit = {
val controllerListenerName = ListenerName.forSecurityProtocol(extraControllerSecurityProtocol)
val config = controllerServers.map { s =>
val listener = s.config.effectiveAdvertisedControllerListeners
.find(_.listenerName == controllerListenerName)
.getOrElse(throw new IllegalArgumentException(s"Could not find listener with name $controllerListenerName"))
Utils.formatAddress(listener.host, s.socketServer.boundPort(controllerListenerName))
}.mkString(",")
adminClientConfig.remove(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, config)
}
}

View File

@ -314,6 +314,10 @@ abstract class QuorumTestHarness extends Logging {
newKRaftQuorum(new Properties())
}
protected def extraControllerSecurityProtocols(): Seq[SecurityProtocol] = {
Seq.empty
}
protected def newKRaftQuorum(overridingProps: Properties): KRaftQuorumImplementation = {
val propsList = kraftControllerConfigs(testInfo)
if (propsList.size != 1) {
@ -331,9 +335,12 @@ abstract class QuorumTestHarness extends Logging {
val metadataDir = TestUtils.tempDir()
props.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, metadataDir.getAbsolutePath)
val proto = controllerListenerSecurityProtocol.toString
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, s"CONTROLLER:$proto")
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, s"CONTROLLER://localhost:0")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
val securityProtocolMaps = extraControllerSecurityProtocols().map(sc => sc + ":" + sc).mkString(",")
val listeners = extraControllerSecurityProtocols().map(sc => sc + "://localhost:0").mkString(",")
val listenerNames = extraControllerSecurityProtocols().mkString(",")
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, s"CONTROLLER:$proto,$securityProtocolMaps")
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, s"CONTROLLER://localhost:0,$listeners")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, s"CONTROLLER,$listenerNames")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$nodeId@localhost:0")
// Setting the configuration to the same value set on the brokers via TestUtils to keep KRaft based and Zk based controller configs are consistent.
props.setProperty(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, "1000")

View File

@ -399,7 +399,7 @@ object TestUtils extends Logging {
): Admin = {
val adminClientProperties = new Properties()
adminClientProperties.putAll(adminConfig)
if (!adminClientProperties.containsKey(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)) {
if (!adminClientProperties.containsKey(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG) && !adminClientProperties.containsKey(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG)) {
adminClientProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers(brokers, listenerName))
}
Admin.create(adminClientProperties)