From 9f092420f117eab18c26a54f7687a331e01e6f50 Mon Sep 17 00:00:00 2001 From: Sanskar Jhajharia Date: Tue, 15 Jul 2025 01:58:25 +0530 Subject: [PATCH] MINOR: Extend Quota Tests for ShareFetch requests (#20163) ### Summary Extends RequestQuotaTest to include ShareFetch API quota testing, ensuring compliance with KIP-932. ### Key Changes - New test: testShareFetchUsesSameFetchSensor() - Verifies ShareFetch and Fetch use the same FETCH quota sensor - New test: testResponseThrottleTimeWhenBothShareFetchAndRequestQuotasViolated() - Tests ShareFetch throttling behaviour - Request builder: Added ApiKeys.SHARE_FETCH case with proper ShareFetch request construction - Some minor cleanup wrt use of Collections Reviewers: Apoorv Mittal --- .../unit/kafka/server/RequestQuotaTest.scala | 180 +++++++++++++----- 1 file changed, 130 insertions(+), 50 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index b39debd4196..dfa3e959536 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -48,7 +48,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} import java.net.InetAddress import java.util import java.util.concurrent.{Executors, Future, TimeUnit} -import java.util.{Collections, Optional, Properties} +import java.util.{Optional, Properties} import scala.collection.mutable.ListBuffer import scala.jdk.CollectionConverters._ @@ -151,6 +151,52 @@ class RequestQuotaTest extends BaseRequestTest { waitAndCheckResults() } + @Test + def testResponseThrottleTimeWhenBothShareFetchAndRequestQuotasViolated(): Unit = { + submitTest(ApiKeys.SHARE_FETCH, () => checkSmallQuotaShareFetchRequestThrottleTime()) + waitAndCheckResults() + } + + + @Test + def testShareFetchUsesSameFetchSensor(): Unit = { + // This test verifies that ShareFetch and Fetch use the same FETCH quota sensor per KIP-932 + val testClientId = "same-sensor-test-client" + + val quotaProps = new Properties() + quotaProps.put(QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, "1") // Very small quota + quotaProps.put(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "0.01") // Very small request quota + changeClientIdConfig(Sanitizer.sanitize(testClientId), quotaProps) + + TestUtils.retry(20000) { + val consumeQuotaManager = brokers.head.dataPlaneRequestProcessor.quotas.fetch + assertEquals(Quota.upperBound(1), consumeQuotaManager.quota("some-user", testClientId), + s"Consumer quota override not set") + } + + // First, make a Fetch request and verify it uses FETCH quota + val fetchClient = Client(testClientId, ApiKeys.FETCH) + val fetchThrottled = fetchClient.runUntil(_.throttleTimeMs > 0) + assertTrue(fetchThrottled, "Fetch should be throttled") + + // Check quota types to verify which one is being used + val fetchThrottleTimeAfterFetch = throttleTimeMetricValueForQuotaType(testClientId, QuotaType.FETCH) + + // Now make a ShareFetch request and verify it ALSO uses FETCH quota sensor + val shareFetchClient = Client(testClientId, ApiKeys.SHARE_FETCH) + val shareFetchThrottled = shareFetchClient.runUntil(_.throttleTimeMs > 0) + assertTrue(shareFetchThrottled, "ShareFetch should be throttled") + + // Check quota types after ShareFetch + val fetchThrottleTimeAfterShareFetch = throttleTimeMetricValueForQuotaType(testClientId, QuotaType.FETCH) + + // Verify both requests use FETCH quota (not REQUEST quota) + assertTrue(!fetchThrottleTimeAfterFetch.isNaN && fetchThrottleTimeAfterFetch > 0, + s"Fetch should use FETCH quota sensor: $fetchThrottleTimeAfterFetch") + assertTrue(!fetchThrottleTimeAfterShareFetch.isNaN && fetchThrottleTimeAfterShareFetch > 0, + s"ShareFetch should use FETCH quota sensor: $fetchThrottleTimeAfterShareFetch") + } + @Test def testUnthrottledClient(): Unit = { for (apiKey <- clientActions) { @@ -234,8 +280,8 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.PRODUCE => requests.ProduceRequest.builder(new ProduceRequestData() .setTopicData(new ProduceRequestData.TopicProduceDataCollection( - Collections.singletonList(new ProduceRequestData.TopicProduceData() - .setTopicId(getTopicIds().get(tp.topic()).get).setPartitionData(Collections.singletonList( + util.List.of(new ProduceRequestData.TopicProduceData() + .setTopicId(getTopicIds().get(tp.topic()).get).setPartitionData(util.List.of( new ProduceRequestData.PartitionProduceData() .setIndex(tp.partition()) .setRecords(MemoryRecords.withRecords(Compression.NONE, new SimpleRecord("test".getBytes)))))) @@ -249,17 +295,17 @@ class RequestQuotaTest extends BaseRequestTest { FetchRequest.Builder.forConsumer(ApiKeys.FETCH.latestVersion, 0, 0, partitionMap) case ApiKeys.METADATA => - new MetadataRequest.Builder(List(topic).asJava, true) + new MetadataRequest.Builder(util.List.of(topic), true) case ApiKeys.LIST_OFFSETS => val topic = new ListOffsetsTopic() .setName(tp.topic) - .setPartitions(List(new ListOffsetsPartition() + .setPartitions(util.List.of(new ListOffsetsPartition() .setPartitionIndex(tp.partition) .setTimestamp(0L) - .setCurrentLeaderEpoch(15)).asJava) + .setCurrentLeaderEpoch(15))) ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED) - .setTargetTimes(List(topic).asJava) + .setTargetTimes(util.List.of(topic)) case ApiKeys.OFFSET_COMMIT => OffsetCommitRequest.Builder.forTopicNames( @@ -268,11 +314,11 @@ class RequestQuotaTest extends BaseRequestTest { .setGenerationIdOrMemberEpoch(1) .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID) .setTopics( - Collections.singletonList( + util.List.of( new OffsetCommitRequestData.OffsetCommitRequestTopic() .setName(topic) .setPartitions( - Collections.singletonList( + util.List.of( new OffsetCommitRequestData.OffsetCommitRequestPartition() .setPartitionIndex(0) .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH) @@ -286,15 +332,15 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.OFFSET_FETCH => OffsetFetchRequest.Builder.forTopicNames( new OffsetFetchRequestData() - .setGroups(List( + .setGroups(util.List.of( new OffsetFetchRequestData.OffsetFetchRequestGroup() .setGroupId("test-group") - .setTopics(List( + .setTopics(util.List.of( new OffsetFetchRequestData.OffsetFetchRequestTopics() .setName(tp.topic) - .setPartitionIndexes(List[Integer](tp.partition).asJava) - ).asJava) - ).asJava), + .setPartitionIndexes(util.List.of[Integer](tp.partition)) + )) + )), false ) @@ -302,7 +348,7 @@ class RequestQuotaTest extends BaseRequestTest { new FindCoordinatorRequest.Builder( new FindCoordinatorRequestData() .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id) - .setCoordinatorKeys(Collections.singletonList("test-group"))) + .setCoordinatorKeys(util.List.of("test-group"))) case ApiKeys.JOIN_GROUP => new JoinGroupRequest.Builder( @@ -314,7 +360,7 @@ class RequestQuotaTest extends BaseRequestTest { .setProtocolType("consumer") .setProtocols( new JoinGroupRequestProtocolCollection( - Collections.singletonList(new JoinGroupRequestData.JoinGroupRequestProtocol() + util.List.of(new JoinGroupRequestData.JoinGroupRequestProtocol() .setName("consumer-range") .setMetadata("test".getBytes())).iterator() ) @@ -333,7 +379,7 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.LEAVE_GROUP => new LeaveGroupRequest.Builder( "test-leave-group", - Collections.singletonList( + util.List.of( new MemberIdentity() .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)) ) @@ -344,11 +390,11 @@ class RequestQuotaTest extends BaseRequestTest { .setGroupId("test-sync-group") .setGenerationId(1) .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID) - .setAssignments(Collections.emptyList()) + .setAssignments(util.List.of) ) case ApiKeys.DESCRIBE_GROUPS => - new DescribeGroupsRequest.Builder(new DescribeGroupsRequestData().setGroups(List("test-group").asJava)) + new DescribeGroupsRequest.Builder(new DescribeGroupsRequestData().setGroups(util.List.of("test-group"))) case ApiKeys.LIST_GROUPS => new ListGroupsRequest.Builder(new ListGroupsRequestData()) @@ -365,23 +411,23 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.CREATE_TOPICS => new CreateTopicsRequest.Builder( new CreateTopicsRequestData().setTopics( - new CreatableTopicCollection(Collections.singleton( + new CreatableTopicCollection(util.Set.of( new CreatableTopic().setName("topic-2").setNumPartitions(1). setReplicationFactor(1.toShort)).iterator()))) case ApiKeys.DELETE_TOPICS => new DeleteTopicsRequest.Builder( new DeleteTopicsRequestData() - .setTopicNames(Collections.singletonList("topic-2")) + .setTopicNames(util.List.of("topic-2")) .setTimeoutMs(5000)) case ApiKeys.DELETE_RECORDS => new DeleteRecordsRequest.Builder( new DeleteRecordsRequestData() .setTimeoutMs(5000) - .setTopics(Collections.singletonList(new DeleteRecordsRequestData.DeleteRecordsTopic() + .setTopics(util.List.of(new DeleteRecordsRequestData.DeleteRecordsTopic() .setName(tp.topic()) - .setPartitions(Collections.singletonList(new DeleteRecordsRequestData.DeleteRecordsPartition() + .setPartitions(util.List.of(new DeleteRecordsRequestData.DeleteRecordsPartition() .setPartitionIndex(tp.partition()) .setOffset(0L)))))) @@ -395,14 +441,14 @@ class RequestQuotaTest extends BaseRequestTest { val epochs = new OffsetForLeaderTopicCollection() epochs.add(new OffsetForLeaderTopic() .setTopic(tp.topic()) - .setPartitions(List(new OffsetForLeaderPartition() + .setPartitions(util.List.of(new OffsetForLeaderPartition() .setPartition(tp.partition()) .setLeaderEpoch(0) - .setCurrentLeaderEpoch(15)).asJava)) + .setCurrentLeaderEpoch(15)))) OffsetsForLeaderEpochRequest.Builder.forConsumer(epochs) case ApiKeys.ADD_PARTITIONS_TO_TXN => - AddPartitionsToTxnRequest.Builder.forClient("test-transactional-id", 1, 0, List(tp).asJava) + AddPartitionsToTxnRequest.Builder.forClient("test-transactional-id", 1, 0, util.List.of(tp)) case ApiKeys.ADD_OFFSETS_TO_TXN => new AddOffsetsToTxnRequest.Builder(new AddOffsetsToTxnRequestData() @@ -430,7 +476,7 @@ class RequestQuotaTest extends BaseRequestTest { "test-txn-group", 2, 0, - Map.empty[TopicPartition, TxnOffsetCommitRequest.CommittedOffset].asJava, + util.Map.of[TopicPartition, TxnOffsetCommitRequest.CommittedOffset], true ) @@ -438,7 +484,7 @@ class RequestQuotaTest extends BaseRequestTest { new DescribeAclsRequest.Builder(AclBindingFilter.ANY) case ApiKeys.CREATE_ACLS => - new CreateAclsRequest.Builder(new CreateAclsRequestData().setCreations(Collections.singletonList( + new CreateAclsRequest.Builder(new CreateAclsRequestData().setCreations(util.List.of( new CreateAclsRequestData.AclCreation() .setResourceType(AdminResourceType.TOPIC.code) .setResourceName("mytopic") @@ -448,7 +494,7 @@ class RequestQuotaTest extends BaseRequestTest { .setOperation(AclOperation.WRITE.code) .setPermissionType(AclPermissionType.DENY.code)))) case ApiKeys.DELETE_ACLS => - new DeleteAclsRequest.Builder(new DeleteAclsRequestData().setFilters(Collections.singletonList( + new DeleteAclsRequest.Builder(new DeleteAclsRequestData().setFilters(util.List.of( new DeleteAclsRequestData.DeleteAclsFilter() .setResourceTypeFilter(AdminResourceType.TOPIC.code) .setResourceNameFilter(null) @@ -459,14 +505,14 @@ class RequestQuotaTest extends BaseRequestTest { .setPermissionType(AclPermissionType.DENY.code)))) case ApiKeys.DESCRIBE_CONFIGS => new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData() - .setResources(Collections.singletonList(new DescribeConfigsRequestData.DescribeConfigsResource() + .setResources(util.List.of(new DescribeConfigsRequestData.DescribeConfigsResource() .setResourceType(ConfigResource.Type.TOPIC.id) .setResourceName(tp.topic)))) case ApiKeys.ALTER_CONFIGS => new AlterConfigsRequest.Builder( - Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic), - new AlterConfigsRequest.Config(Collections.singleton( + util.Map.of(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic), + new AlterConfigsRequest.Config(util.Set.of( new AlterConfigsRequest.ConfigEntry(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "1000000") ))), true) @@ -475,7 +521,7 @@ class RequestQuotaTest extends BaseRequestTest { .setPath(logDir) dir.topics.add(new AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopic() .setName(tp.topic) - .setPartitions(Collections.singletonList(tp.partition))) + .setPartitions(util.List.of(tp.partition))) val data = new AlterReplicaLogDirsRequestData() data.dirs.add(dir) new AlterReplicaLogDirsRequest.Builder(data) @@ -484,7 +530,7 @@ class RequestQuotaTest extends BaseRequestTest { val data = new DescribeLogDirsRequestData() data.topics.add(new DescribeLogDirsRequestData.DescribableLogDirTopic() .setTopic(tp.topic) - .setPartitions(Collections.singletonList(tp.partition))) + .setPartitions(util.List.of(tp.partition))) new DescribeLogDirsRequest.Builder(data) case ApiKeys.CREATE_PARTITIONS => @@ -497,7 +543,7 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.CREATE_DELEGATION_TOKEN => new CreateDelegationTokenRequest.Builder( new CreateDelegationTokenRequestData() - .setRenewers(Collections.singletonList(new CreateDelegationTokenRequestData.CreatableRenewers() + .setRenewers(util.List.of(new CreateDelegationTokenRequestData.CreatableRenewers() .setPrincipalType("User") .setPrincipalName("test"))) .setMaxLifetimeMs(1000) @@ -510,7 +556,7 @@ class RequestQuotaTest extends BaseRequestTest { .setExpiryTimePeriodMs(1000L)) case ApiKeys.DESCRIBE_DELEGATION_TOKEN => - new DescribeDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test"))) + new DescribeDelegationTokenRequest.Builder(util.List.of(SecurityUtils.parseKafkaPrincipal("User:test"))) case ApiKeys.RENEW_DELEGATION_TOKEN => new RenewDelegationTokenRequest.Builder( @@ -520,12 +566,12 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.DELETE_GROUPS => new DeleteGroupsRequest.Builder(new DeleteGroupsRequestData() - .setGroupsNames(Collections.singletonList("test-group"))) + .setGroupsNames(util.List.of("test-group"))) case ApiKeys.ELECT_LEADERS => new ElectLeadersRequest.Builder( ElectionType.PREFERRED, - Collections.singletonList(new TopicPartition("my_topic", 0)), + util.List.of(new TopicPartition("my_topic", 0)), 0 ) @@ -548,9 +594,9 @@ class RequestQuotaTest extends BaseRequestTest { new OffsetDeleteRequestData() .setGroupId("test-group") .setTopics(new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection( - Collections.singletonList(new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + util.List.of(new OffsetDeleteRequestData.OffsetDeleteRequestTopic() .setName("test-topic") - .setPartitions(Collections.singletonList( + .setPartitions(util.List.of( new OffsetDeleteRequestData.OffsetDeleteRequestPartition() .setPartitionIndex(0)))).iterator()))) @@ -558,7 +604,7 @@ class RequestQuotaTest extends BaseRequestTest { new DescribeClientQuotasRequest.Builder(ClientQuotaFilter.all()) case ApiKeys.ALTER_CLIENT_QUOTAS => - new AlterClientQuotasRequest.Builder(List.empty.asJava, false) + new AlterClientQuotasRequest.Builder(util.List.of, false) case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => new DescribeUserScramCredentialsRequest.Builder(new DescribeUserScramCredentialsRequestData()) @@ -574,7 +620,7 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.END_QUORUM_EPOCH => new EndQuorumEpochRequest.Builder(EndQuorumEpochRequest.singletonRequest( - tp, 10, 5, Collections.singletonList(3))) + tp, 10, 5, util.List.of(3))) case ApiKeys.DESCRIBE_QUORUM => new DescribeQuorumRequest.Builder(DescribeQuorumRequest.singletonRequest( @@ -593,7 +639,7 @@ class RequestQuotaTest extends BaseRequestTest { "client-id", 0 ) - val embedRequestData = new AlterClientQuotasRequest.Builder(List.empty.asJava, false).build() + val embedRequestData = new AlterClientQuotasRequest.Builder(util.List.of, false).build() .serializeWithHeader(requestHeader) new EnvelopeRequest.Builder(embedRequestData, new Array[Byte](0), InetAddress.getByName("192.168.1.1").getAddress) @@ -603,9 +649,9 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.DESCRIBE_PRODUCERS => new DescribeProducersRequest.Builder(new DescribeProducersRequestData() - .setTopics(List(new DescribeProducersRequestData.TopicRequest() + .setTopics(util.List.of(new DescribeProducersRequestData.TopicRequest() .setName("test-topic") - .setPartitionIndexes(List(1, 2, 3).map(Int.box).asJava)).asJava)) + .setPartitionIndexes(util.List.of[Integer](1, 2, 3))))) case ApiKeys.BROKER_REGISTRATION => new BrokerRegistrationRequest.Builder(new BrokerRegistrationRequestData()) @@ -618,7 +664,7 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.DESCRIBE_TRANSACTIONS => new DescribeTransactionsRequest.Builder(new DescribeTransactionsRequestData() - .setTransactionalIds(List("test-transactional-id").asJava)) + .setTransactionalIds(util.List.of("test-transactional-id"))) case ApiKeys.LIST_TRANSACTIONS => new ListTransactionsRequest.Builder(new ListTransactionsRequestData()) @@ -654,7 +700,23 @@ class RequestQuotaTest extends BaseRequestTest { new ShareGroupDescribeRequest.Builder(new ShareGroupDescribeRequestData()) case ApiKeys.SHARE_FETCH => - new ShareFetchRequest.Builder(new ShareFetchRequestData()) + new ShareFetchRequest.Builder( + new ShareFetchRequestData() + .setGroupId("test-share-group") + .setMemberId(Uuid.randomUuid().toString) + .setShareSessionEpoch(0) + .setMaxWaitMs(0) + .setMinBytes(1) + .setMaxBytes(1000000) + .setTopics(new ShareFetchRequestData.FetchTopicCollection( + util.List.of(new ShareFetchRequestData.FetchTopic() + .setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID)) + .setPartitions( + new ShareFetchRequestData.FetchPartitionCollection( + util.List.of(new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(tp.partition) + ).iterator)) + ).iterator))) case ApiKeys.SHARE_ACKNOWLEDGE => new ShareAcknowledgeRequest.Builder(new ShareAcknowledgeRequestData()) @@ -792,6 +854,21 @@ class RequestQuotaTest extends BaseRequestTest { s"Throttle time metrics for request quota updated: $smallQuotaConsumerClient") } + private def checkSmallQuotaShareFetchRequestThrottleTime(): Unit = { + // Request until throttled using client-id with default small consumer quota + // This test verifies ShareFetch is throttled similarly to Fetch (KIP-932) + val smallQuotaShareFetchClient = Client(smallQuotaConsumerClientId, ApiKeys.SHARE_FETCH) + val throttled = smallQuotaShareFetchClient.runUntil(_.throttleTimeMs > 0) + + assertTrue(throttled, s"ShareFetch response not throttled: $smallQuotaShareFetchClient") + // KIP-932: ShareFetch should use the same quota and sensors as Fetch + // Since the implementation uses the same quota mechanisms, we verify throttling occurs + assertTrue(throttleTimeMetricValueForQuotaType(smallQuotaConsumerClientId, QuotaType.FETCH) > 0, + s"ShareFetch should be throttled using FETCH quota sensors: $smallQuotaShareFetchClient") + assertTrue(throttleTimeMetricValueForQuotaType(smallQuotaConsumerClientId, QuotaType.REQUEST).isNaN, + s"Throttle time metrics for request quota updated: $smallQuotaShareFetchClient") + } + private def checkUnthrottledClient(apiKey: ApiKeys): Unit = { // Test that request from client with large quota is not throttled @@ -832,9 +909,12 @@ object RequestQuotaTest { class KraftTestAuthorizer extends StandardAuthorizer { override def authorize(requestContext: AuthorizableRequestContext, actions: util.List[Action]): util.List[AuthorizationResult] = { - actions.asScala.map { _ => - if (requestContext.principal != UnauthorizedPrincipal) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED - }.asJava + val results = new util.ArrayList[AuthorizationResult]() + actions.forEach(_ => { + val result = if (requestContext.principal != UnauthorizedPrincipal) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED + results.add(result) + }) + results } }