KAFKA-15289: Support KRaft mode in RequestQuotaTest (#14201)

Enable kraft mode for RequestQuotaTest, there are 2 works left to be done.

Reviewers: dengziming <dengziming1993@gmail.com>
This commit is contained in:
vveicc 2023-08-14 17:04:15 +08:00 committed by GitHub
parent d91c9bd2b5
commit 43751d8d05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 115 additions and 43 deletions

View File

@ -298,6 +298,10 @@ public enum ApiKeys {
return apisForListener(ApiMessageType.ListenerType.ZK_BROKER);
}
public static EnumSet<ApiKeys> kraftBrokerApis() {
return apisForListener(ApiMessageType.ListenerType.BROKER);
}
public static EnumSet<ApiKeys> controllerApis() {
return apisForListener(ApiMessageType.ListenerType.CONTROLLER);
}

View File

@ -14,43 +14,45 @@
package kafka.server
import java.net.InetAddress
import java.util
import java.util.concurrent.{Executors, Future, TimeUnit}
import java.util.{Collections, Optional, Properties}
import kafka.api.LeaderAndIsr
import kafka.network.RequestChannel.Session
import kafka.security.authorizer.AclAuthorizer
import kafka.utils.TestUtils
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.common._
import org.apache.kafka.common.acl._
import org.apache.kafka.common.config.internals.QuotaConfigs
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic, OffsetForLeaderTopicCollection}
import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState}
import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState}
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, _}
import org.apache.kafka.common.message._
import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.quota.ClientQuotaFilter
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter}
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.{PatternType, ResourceType => AdminResourceType}
import org.apache.kafka.common.security.auth._
import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils}
import org.apache.kafka.common._
import org.apache.kafka.common.config.internals.QuotaConfigs
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.net.InetAddress
import java.util
import java.util.concurrent.{Executors, Future, TimeUnit}
import java.util.{Collections, Optional, Properties}
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
@ -66,7 +68,7 @@ class RequestQuotaTest extends BaseRequestTest {
private val smallQuotaProducerClientId = "small-quota-producer-client"
private val smallQuotaConsumerClientId = "small-quota-consumer-client"
private val brokerId: Integer = 0
private var leaderNode: KafkaServer = _
private var leaderNode: KafkaBroker = _
// Run tests concurrently since a throttle could be up to 1 second because quota percentage allocated is very low
case class Task(apiKey: ApiKeys, future: Future[_])
@ -79,9 +81,19 @@ class RequestQuotaTest extends BaseRequestTest {
properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp, "100")
properties.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[RequestQuotaTest.TestAuthorizer].getName)
properties.put(KafkaConfig.PrincipalBuilderClassProp, classOf[RequestQuotaTest.TestPrincipalBuilder].getName)
properties.put(KafkaConfig.UnstableApiVersionsEnableProp, "true")
if (isKRaftTest()) {
properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[RequestQuotaTest.KraftTestAuthorizer].getName)
} else {
properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[RequestQuotaTest.ZkTestAuthorizer].getName)
}
}
override def kraftControllerConfigs(): Seq[Properties] = {
val properties = new Properties()
properties.put(KafkaConfig.PrincipalBuilderClassProp, classOf[RequestQuotaTest.TestPrincipalBuilder].getName)
Seq(properties)
}
@BeforeEach
@ -90,76 +102,95 @@ class RequestQuotaTest extends BaseRequestTest {
super.setUp(testInfo)
createTopic(topic, numPartitions)
leaderNode = servers.head
leaderNode = brokers.head
// Change default client-id request quota to a small value and a single unthrottledClient with a large quota
val quotaProps = new Properties()
quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "0.01")
quotaProps.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "2000")
quotaProps.put(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, "2000")
adminZkClient.changeClientIdConfig("<default>", quotaProps)
changeClientIdConfig("<default>", quotaProps)
quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "2000")
adminZkClient.changeClientIdConfig(Sanitizer.sanitize(unthrottledClientId), quotaProps)
changeClientIdConfig(Sanitizer.sanitize(unthrottledClientId), quotaProps)
// Client ids with small producer and consumer (fetch) quotas. Quota values were picked so that both
// producer/consumer and request quotas are violated on the first produce/consume operation, and the delay due to
// producer/consumer quota violation will be longer than the delay due to request quota violation.
quotaProps.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "1")
quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "0.01")
adminZkClient.changeClientIdConfig(Sanitizer.sanitize(smallQuotaProducerClientId), quotaProps)
changeClientIdConfig(Sanitizer.sanitize(smallQuotaProducerClientId), quotaProps)
quotaProps.put(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, "1")
quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "0.01")
adminZkClient.changeClientIdConfig(Sanitizer.sanitize(smallQuotaConsumerClientId), quotaProps)
changeClientIdConfig(Sanitizer.sanitize(smallQuotaConsumerClientId), quotaProps)
TestUtils.retry(20000) {
val quotaManager = servers.head.dataPlaneRequestProcessor.quotas.request
val quotaManager = brokers.head.dataPlaneRequestProcessor.quotas.request
assertEquals(Quota.upperBound(0.01), quotaManager.quota("some-user", "some-client"), s"Default request quota not set")
assertEquals(Quota.upperBound(2000), quotaManager.quota("some-user", unthrottledClientId), s"Request quota override not set")
val produceQuotaManager = servers.head.dataPlaneRequestProcessor.quotas.produce
val produceQuotaManager = brokers.head.dataPlaneRequestProcessor.quotas.produce
assertEquals(Quota.upperBound(1), produceQuotaManager.quota("some-user", smallQuotaProducerClientId), s"Produce quota override not set")
val consumeQuotaManager = servers.head.dataPlaneRequestProcessor.quotas.fetch
val consumeQuotaManager = brokers.head.dataPlaneRequestProcessor.quotas.fetch
assertEquals(Quota.upperBound(1), consumeQuotaManager.quota("some-user", smallQuotaConsumerClientId), s"Consume quota override not set")
}
}
private def changeClientIdConfig(sanitizedClientId: String, configs: Properties): Unit = {
if (isKRaftTest()) {
val admin = createAdminClient()
admin.alterClientQuotas(Collections.singleton(
new ClientQuotaAlteration(
new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> (if (sanitizedClientId == "<default>") null else sanitizedClientId)).asJava),
configs.asScala.map { case (key, value) => new ClientQuotaAlteration.Op(key, value.toDouble) }.toList.asJava)
)).all().get()
} else {
adminZkClient.changeClientIdConfig(sanitizedClientId, configs)
}
}
@AfterEach
override def tearDown(): Unit = {
try executor.shutdownNow()
finally super.tearDown()
}
@Test
def testResponseThrottleTime(): Unit = {
for (apiKey <- RequestQuotaTest.ClientActions ++ RequestQuotaTest.ClusterActionsWithThrottle)
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testResponseThrottleTime(quorum: String): Unit = {
for (apiKey <- clientActions ++ clusterActionsWithThrottleForBroker)
submitTest(apiKey, () => checkRequestThrottleTime(apiKey))
waitAndCheckResults()
}
@Test
def testResponseThrottleTimeWhenBothProduceAndRequestQuotasViolated(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testResponseThrottleTimeWhenBothProduceAndRequestQuotasViolated(quorum: String): Unit = {
submitTest(ApiKeys.PRODUCE, () => checkSmallQuotaProducerRequestThrottleTime())
waitAndCheckResults()
}
@Test
def testResponseThrottleTimeWhenBothFetchAndRequestQuotasViolated(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testResponseThrottleTimeWhenBothFetchAndRequestQuotasViolated(quorum: String): Unit = {
submitTest(ApiKeys.FETCH, () => checkSmallQuotaConsumerRequestThrottleTime())
waitAndCheckResults()
}
@Test
def testUnthrottledClient(): Unit = {
for (apiKey <- RequestQuotaTest.ClientActions) {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testUnthrottledClient(quorum: String): Unit = {
for (apiKey <- clientActions) {
submitTest(apiKey, () => checkUnthrottledClient(apiKey))
}
waitAndCheckResults()
}
@Test
def testExemptRequestTime(): Unit = {
val actions = RequestQuotaTest.ClusterActions -- RequestQuotaTest.ClusterActionsWithThrottle -- RequestQuotaTest.Envelope
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testExemptRequestTime(quorum: String): Unit = {
// Exclude `DESCRIBE_QUORUM`, maybe it shouldn't be a cluster action
val actions = clusterActions -- clusterActionsWithThrottleForBroker -- RequestQuotaTest.Envelope - ApiKeys.DESCRIBE_QUORUM
for (apiKey <- actions) {
submitTest(apiKey, () => checkExemptRequestMetric(apiKey))
}
@ -167,17 +198,44 @@ class RequestQuotaTest extends BaseRequestTest {
waitAndCheckResults()
}
@Test
def testUnauthorizedThrottle(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testUnauthorizedThrottle(quorum: String): Unit = {
RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal
for (apiKey <- ApiKeys.zkBrokerApis.asScala.toSet -- RequestQuotaTest.Envelope) {
val apiKeys = if (isKRaftTest()) ApiKeys.kraftBrokerApis else ApiKeys.zkBrokerApis
for (apiKey <- apiKeys.asScala.toSet -- RequestQuotaTest.Envelope) {
submitTest(apiKey, () => checkUnauthorizedRequestThrottle(apiKey))
}
waitAndCheckResults()
}
private def clientActions: Set[ApiKeys] = {
if (isKRaftTest()) {
ApiKeys.kraftBrokerApis.asScala.toSet -- clusterActions -- RequestQuotaTest.SaslActions -- RequestQuotaTest.Envelope
} else {
ApiKeys.zkBrokerApis.asScala.toSet -- clusterActions -- RequestQuotaTest.SaslActions -- RequestQuotaTest.Envelope
}
}
private def clusterActions: Set[ApiKeys] = {
if (isKRaftTest()) {
ApiKeys.kraftBrokerApis.asScala.filter(_.clusterAction).toSet
} else {
ApiKeys.zkBrokerApis.asScala.filter(_.clusterAction).toSet
}
}
private def clusterActionsWithThrottleForBroker: Set[ApiKeys] = {
if (isKRaftTest()) {
// Exclude `ALLOCATE_PRODUCER_IDS`, it is enabled for kraft controller instead of broker
Set(ApiKeys.UPDATE_FEATURES)
} else {
Set(ApiKeys.ALLOCATE_PRODUCER_IDS, ApiKeys.UPDATE_FEATURES)
}
}
def session(user: String): Session = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), null)
private def throttleTimeMetricValue(clientId: String): Double = {
@ -599,6 +657,10 @@ class RequestQuotaTest extends BaseRequestTest {
new EndQuorumEpochRequest.Builder(EndQuorumEpochRequest.singletonRequest(
tp, 10, 5, Collections.singletonList(3)))
case ApiKeys.DESCRIBE_QUORUM =>
new DescribeQuorumRequest.Builder(DescribeQuorumRequest.singletonRequest(
Topic.CLUSTER_METADATA_TOPIC_PARTITION))
case ApiKeys.ALTER_PARTITION =>
new AlterPartitionRequest.Builder(new AlterPartitionRequestData(), true)
@ -767,23 +829,29 @@ class RequestQuotaTest extends BaseRequestTest {
}
object RequestQuotaTest {
val ClusterActions = ApiKeys.zkBrokerApis.asScala.filter(_.clusterAction).toSet
val ClusterActionsWithThrottle = Set(ApiKeys.ALLOCATE_PRODUCER_IDS, ApiKeys.UPDATE_FEATURES)
val SaslActions = Set(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE)
val Envelope = Set(ApiKeys.ENVELOPE)
val ClientActions = ApiKeys.zkBrokerApis.asScala.toSet -- ClusterActions -- SaslActions -- Envelope
val UnauthorizedPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "Unauthorized")
// Principal used for all client connections. This is modified by tests which
// check unauthorized code path
var principal = KafkaPrincipal.ANONYMOUS
class TestAuthorizer extends AclAuthorizer {
class ZkTestAuthorizer extends AclAuthorizer {
override def authorize(requestContext: AuthorizableRequestContext, actions: util.List[Action]): util.List[AuthorizationResult] = {
actions.asScala.map { _ =>
if (requestContext.principal != UnauthorizedPrincipal) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED
}.asJava
}
}
class KraftTestAuthorizer extends StandardAuthorizer {
override def authorize(requestContext: AuthorizableRequestContext, actions: util.List[Action]): util.List[AuthorizationResult] = {
actions.asScala.map { _ =>
if (requestContext.principal != UnauthorizedPrincipal) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED
}.asJava
}
}
class TestPrincipalBuilder extends KafkaPrincipalBuilder with KafkaPrincipalSerde {
override def build(context: AuthenticationContext): KafkaPrincipal = {
principal