From 43751d8d0521b1440a823a9430fdb0659ce7c436 Mon Sep 17 00:00:00 2001 From: vveicc Date: Mon, 14 Aug 2023 17:04:15 +0800 Subject: [PATCH] KAFKA-15289: Support KRaft mode in RequestQuotaTest (#14201) Enable kraft mode for RequestQuotaTest, there are 2 works left to be done. Reviewers: dengziming --- .../apache/kafka/common/protocol/ApiKeys.java | 4 + .../unit/kafka/server/RequestQuotaTest.scala | 154 +++++++++++++----- 2 files changed, 115 insertions(+), 43 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 3ba8faaba29..97c382ca87f 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -298,6 +298,10 @@ public enum ApiKeys { return apisForListener(ApiMessageType.ListenerType.ZK_BROKER); } + public static EnumSet kraftBrokerApis() { + return apisForListener(ApiMessageType.ListenerType.BROKER); + } + public static EnumSet controllerApis() { return apisForListener(ApiMessageType.ListenerType.CONTROLLER); } diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index c06bde91f91..6e09a3a90c1 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -14,43 +14,45 @@ package kafka.server -import java.net.InetAddress -import java.util -import java.util.concurrent.{Executors, Future, TimeUnit} -import java.util.{Collections, Optional, Properties} import kafka.api.LeaderAndIsr import kafka.network.RequestChannel.Session import kafka.security.authorizer.AclAuthorizer -import kafka.utils.TestUtils +import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.common._ import org.apache.kafka.common.acl._ +import org.apache.kafka.common.config.internals.QuotaConfigs import org.apache.kafka.common.config.{ConfigResource, TopicConfig} +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection} import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} -import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition -import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic -import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic, OffsetForLeaderTopicCollection} import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState} import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState} -import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, _} +import org.apache.kafka.common.message._ import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.ApiKeys -import org.apache.kafka.common.quota.ClientQuotaFilter +import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter} import org.apache.kafka.common.record._ import org.apache.kafka.common.requests._ import org.apache.kafka.common.resource.{PatternType, ResourceType => AdminResourceType} import org.apache.kafka.common.security.auth._ import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils} -import org.apache.kafka.common._ -import org.apache.kafka.common.config.internals.QuotaConfigs +import org.apache.kafka.metadata.authorizer.StandardAuthorizer import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult} import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} +import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource +import java.net.InetAddress +import java.util +import java.util.concurrent.{Executors, Future, TimeUnit} +import java.util.{Collections, Optional, Properties} import scala.collection.mutable.ListBuffer import scala.jdk.CollectionConverters._ @@ -66,7 +68,7 @@ class RequestQuotaTest extends BaseRequestTest { private val smallQuotaProducerClientId = "small-quota-producer-client" private val smallQuotaConsumerClientId = "small-quota-consumer-client" private val brokerId: Integer = 0 - private var leaderNode: KafkaServer = _ + private var leaderNode: KafkaBroker = _ // Run tests concurrently since a throttle could be up to 1 second because quota percentage allocated is very low case class Task(apiKey: ApiKeys, future: Future[_]) @@ -79,9 +81,19 @@ class RequestQuotaTest extends BaseRequestTest { properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") properties.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0") - properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[RequestQuotaTest.TestAuthorizer].getName) properties.put(KafkaConfig.PrincipalBuilderClassProp, classOf[RequestQuotaTest.TestPrincipalBuilder].getName) properties.put(KafkaConfig.UnstableApiVersionsEnableProp, "true") + if (isKRaftTest()) { + properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[RequestQuotaTest.KraftTestAuthorizer].getName) + } else { + properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[RequestQuotaTest.ZkTestAuthorizer].getName) + } + } + + override def kraftControllerConfigs(): Seq[Properties] = { + val properties = new Properties() + properties.put(KafkaConfig.PrincipalBuilderClassProp, classOf[RequestQuotaTest.TestPrincipalBuilder].getName) + Seq(properties) } @BeforeEach @@ -90,76 +102,95 @@ class RequestQuotaTest extends BaseRequestTest { super.setUp(testInfo) createTopic(topic, numPartitions) - leaderNode = servers.head + leaderNode = brokers.head // Change default client-id request quota to a small value and a single unthrottledClient with a large quota val quotaProps = new Properties() quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "0.01") quotaProps.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "2000") quotaProps.put(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, "2000") - adminZkClient.changeClientIdConfig("", quotaProps) + changeClientIdConfig("", quotaProps) quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "2000") - adminZkClient.changeClientIdConfig(Sanitizer.sanitize(unthrottledClientId), quotaProps) + changeClientIdConfig(Sanitizer.sanitize(unthrottledClientId), quotaProps) // Client ids with small producer and consumer (fetch) quotas. Quota values were picked so that both // producer/consumer and request quotas are violated on the first produce/consume operation, and the delay due to // producer/consumer quota violation will be longer than the delay due to request quota violation. quotaProps.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "1") quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "0.01") - adminZkClient.changeClientIdConfig(Sanitizer.sanitize(smallQuotaProducerClientId), quotaProps) + changeClientIdConfig(Sanitizer.sanitize(smallQuotaProducerClientId), quotaProps) quotaProps.put(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, "1") quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "0.01") - adminZkClient.changeClientIdConfig(Sanitizer.sanitize(smallQuotaConsumerClientId), quotaProps) + changeClientIdConfig(Sanitizer.sanitize(smallQuotaConsumerClientId), quotaProps) TestUtils.retry(20000) { - val quotaManager = servers.head.dataPlaneRequestProcessor.quotas.request + val quotaManager = brokers.head.dataPlaneRequestProcessor.quotas.request assertEquals(Quota.upperBound(0.01), quotaManager.quota("some-user", "some-client"), s"Default request quota not set") assertEquals(Quota.upperBound(2000), quotaManager.quota("some-user", unthrottledClientId), s"Request quota override not set") - val produceQuotaManager = servers.head.dataPlaneRequestProcessor.quotas.produce + val produceQuotaManager = brokers.head.dataPlaneRequestProcessor.quotas.produce assertEquals(Quota.upperBound(1), produceQuotaManager.quota("some-user", smallQuotaProducerClientId), s"Produce quota override not set") - val consumeQuotaManager = servers.head.dataPlaneRequestProcessor.quotas.fetch + val consumeQuotaManager = brokers.head.dataPlaneRequestProcessor.quotas.fetch assertEquals(Quota.upperBound(1), consumeQuotaManager.quota("some-user", smallQuotaConsumerClientId), s"Consume quota override not set") } } + private def changeClientIdConfig(sanitizedClientId: String, configs: Properties): Unit = { + if (isKRaftTest()) { + val admin = createAdminClient() + admin.alterClientQuotas(Collections.singleton( + new ClientQuotaAlteration( + new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> (if (sanitizedClientId == "") null else sanitizedClientId)).asJava), + configs.asScala.map { case (key, value) => new ClientQuotaAlteration.Op(key, value.toDouble) }.toList.asJava) + )).all().get() + } else { + adminZkClient.changeClientIdConfig(sanitizedClientId, configs) + } + } + @AfterEach override def tearDown(): Unit = { try executor.shutdownNow() finally super.tearDown() } - @Test - def testResponseThrottleTime(): Unit = { - for (apiKey <- RequestQuotaTest.ClientActions ++ RequestQuotaTest.ClusterActionsWithThrottle) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testResponseThrottleTime(quorum: String): Unit = { + for (apiKey <- clientActions ++ clusterActionsWithThrottleForBroker) submitTest(apiKey, () => checkRequestThrottleTime(apiKey)) waitAndCheckResults() } - @Test - def testResponseThrottleTimeWhenBothProduceAndRequestQuotasViolated(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testResponseThrottleTimeWhenBothProduceAndRequestQuotasViolated(quorum: String): Unit = { submitTest(ApiKeys.PRODUCE, () => checkSmallQuotaProducerRequestThrottleTime()) waitAndCheckResults() } - @Test - def testResponseThrottleTimeWhenBothFetchAndRequestQuotasViolated(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testResponseThrottleTimeWhenBothFetchAndRequestQuotasViolated(quorum: String): Unit = { submitTest(ApiKeys.FETCH, () => checkSmallQuotaConsumerRequestThrottleTime()) waitAndCheckResults() } - @Test - def testUnthrottledClient(): Unit = { - for (apiKey <- RequestQuotaTest.ClientActions) { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testUnthrottledClient(quorum: String): Unit = { + for (apiKey <- clientActions) { submitTest(apiKey, () => checkUnthrottledClient(apiKey)) } waitAndCheckResults() } - @Test - def testExemptRequestTime(): Unit = { - val actions = RequestQuotaTest.ClusterActions -- RequestQuotaTest.ClusterActionsWithThrottle -- RequestQuotaTest.Envelope + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testExemptRequestTime(quorum: String): Unit = { + // Exclude `DESCRIBE_QUORUM`, maybe it shouldn't be a cluster action + val actions = clusterActions -- clusterActionsWithThrottleForBroker -- RequestQuotaTest.Envelope - ApiKeys.DESCRIBE_QUORUM for (apiKey <- actions) { submitTest(apiKey, () => checkExemptRequestMetric(apiKey)) } @@ -167,17 +198,44 @@ class RequestQuotaTest extends BaseRequestTest { waitAndCheckResults() } - @Test - def testUnauthorizedThrottle(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testUnauthorizedThrottle(quorum: String): Unit = { RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal - for (apiKey <- ApiKeys.zkBrokerApis.asScala.toSet -- RequestQuotaTest.Envelope) { + val apiKeys = if (isKRaftTest()) ApiKeys.kraftBrokerApis else ApiKeys.zkBrokerApis + for (apiKey <- apiKeys.asScala.toSet -- RequestQuotaTest.Envelope) { submitTest(apiKey, () => checkUnauthorizedRequestThrottle(apiKey)) } waitAndCheckResults() } + private def clientActions: Set[ApiKeys] = { + if (isKRaftTest()) { + ApiKeys.kraftBrokerApis.asScala.toSet -- clusterActions -- RequestQuotaTest.SaslActions -- RequestQuotaTest.Envelope + } else { + ApiKeys.zkBrokerApis.asScala.toSet -- clusterActions -- RequestQuotaTest.SaslActions -- RequestQuotaTest.Envelope + } + } + + private def clusterActions: Set[ApiKeys] = { + if (isKRaftTest()) { + ApiKeys.kraftBrokerApis.asScala.filter(_.clusterAction).toSet + } else { + ApiKeys.zkBrokerApis.asScala.filter(_.clusterAction).toSet + } + } + + private def clusterActionsWithThrottleForBroker: Set[ApiKeys] = { + if (isKRaftTest()) { + // Exclude `ALLOCATE_PRODUCER_IDS`, it is enabled for kraft controller instead of broker + Set(ApiKeys.UPDATE_FEATURES) + } else { + Set(ApiKeys.ALLOCATE_PRODUCER_IDS, ApiKeys.UPDATE_FEATURES) + } + } + def session(user: String): Session = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), null) private def throttleTimeMetricValue(clientId: String): Double = { @@ -599,6 +657,10 @@ class RequestQuotaTest extends BaseRequestTest { new EndQuorumEpochRequest.Builder(EndQuorumEpochRequest.singletonRequest( tp, 10, 5, Collections.singletonList(3))) + case ApiKeys.DESCRIBE_QUORUM => + new DescribeQuorumRequest.Builder(DescribeQuorumRequest.singletonRequest( + Topic.CLUSTER_METADATA_TOPIC_PARTITION)) + case ApiKeys.ALTER_PARTITION => new AlterPartitionRequest.Builder(new AlterPartitionRequestData(), true) @@ -767,23 +829,29 @@ class RequestQuotaTest extends BaseRequestTest { } object RequestQuotaTest { - val ClusterActions = ApiKeys.zkBrokerApis.asScala.filter(_.clusterAction).toSet - val ClusterActionsWithThrottle = Set(ApiKeys.ALLOCATE_PRODUCER_IDS, ApiKeys.UPDATE_FEATURES) val SaslActions = Set(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE) val Envelope = Set(ApiKeys.ENVELOPE) - val ClientActions = ApiKeys.zkBrokerApis.asScala.toSet -- ClusterActions -- SaslActions -- Envelope val UnauthorizedPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "Unauthorized") // Principal used for all client connections. This is modified by tests which // check unauthorized code path var principal = KafkaPrincipal.ANONYMOUS - class TestAuthorizer extends AclAuthorizer { + class ZkTestAuthorizer extends AclAuthorizer { override def authorize(requestContext: AuthorizableRequestContext, actions: util.List[Action]): util.List[AuthorizationResult] = { actions.asScala.map { _ => if (requestContext.principal != UnauthorizedPrincipal) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED }.asJava } } + + class KraftTestAuthorizer extends StandardAuthorizer { + override def authorize(requestContext: AuthorizableRequestContext, actions: util.List[Action]): util.List[AuthorizationResult] = { + actions.asScala.map { _ => + if (requestContext.principal != UnauthorizedPrincipal) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED + }.asJava + } + } + class TestPrincipalBuilder extends KafkaPrincipalBuilder with KafkaPrincipalSerde { override def build(context: AuthenticationContext): KafkaPrincipal = { principal