MINOR: Extend Quota Tests for ShareFetch requests (#20163)

### Summary
Extends RequestQuotaTest to include ShareFetch API quota testing,
ensuring compliance with KIP-932.

### Key Changes
- New test: testShareFetchUsesSameFetchSensor() - Verifies ShareFetch
and Fetch use the same FETCH quota sensor
- New test:
testResponseThrottleTimeWhenBothShareFetchAndRequestQuotasViolated() -
Tests ShareFetch throttling behaviour
- Request builder: Added ApiKeys.SHARE_FETCH case with proper ShareFetch
request construction
- Some minor cleanup wrt use of Collections

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
Sanskar Jhajharia 2025-07-15 01:58:25 +05:30 committed by GitHub
parent a64f5bf6ab
commit 9f092420f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 130 additions and 50 deletions

View File

@ -48,7 +48,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import java.net.InetAddress
import java.util
import java.util.concurrent.{Executors, Future, TimeUnit}
import java.util.{Collections, Optional, Properties}
import java.util.{Optional, Properties}
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
@ -151,6 +151,52 @@ class RequestQuotaTest extends BaseRequestTest {
waitAndCheckResults()
}
@Test
def testResponseThrottleTimeWhenBothShareFetchAndRequestQuotasViolated(): Unit = {
submitTest(ApiKeys.SHARE_FETCH, () => checkSmallQuotaShareFetchRequestThrottleTime())
waitAndCheckResults()
}
@Test
def testShareFetchUsesSameFetchSensor(): Unit = {
// This test verifies that ShareFetch and Fetch use the same FETCH quota sensor per KIP-932
val testClientId = "same-sensor-test-client"
val quotaProps = new Properties()
quotaProps.put(QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, "1") // Very small quota
quotaProps.put(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "0.01") // Very small request quota
changeClientIdConfig(Sanitizer.sanitize(testClientId), quotaProps)
TestUtils.retry(20000) {
val consumeQuotaManager = brokers.head.dataPlaneRequestProcessor.quotas.fetch
assertEquals(Quota.upperBound(1), consumeQuotaManager.quota("some-user", testClientId),
s"Consumer quota override not set")
}
// First, make a Fetch request and verify it uses FETCH quota
val fetchClient = Client(testClientId, ApiKeys.FETCH)
val fetchThrottled = fetchClient.runUntil(_.throttleTimeMs > 0)
assertTrue(fetchThrottled, "Fetch should be throttled")
// Check quota types to verify which one is being used
val fetchThrottleTimeAfterFetch = throttleTimeMetricValueForQuotaType(testClientId, QuotaType.FETCH)
// Now make a ShareFetch request and verify it ALSO uses FETCH quota sensor
val shareFetchClient = Client(testClientId, ApiKeys.SHARE_FETCH)
val shareFetchThrottled = shareFetchClient.runUntil(_.throttleTimeMs > 0)
assertTrue(shareFetchThrottled, "ShareFetch should be throttled")
// Check quota types after ShareFetch
val fetchThrottleTimeAfterShareFetch = throttleTimeMetricValueForQuotaType(testClientId, QuotaType.FETCH)
// Verify both requests use FETCH quota (not REQUEST quota)
assertTrue(!fetchThrottleTimeAfterFetch.isNaN && fetchThrottleTimeAfterFetch > 0,
s"Fetch should use FETCH quota sensor: $fetchThrottleTimeAfterFetch")
assertTrue(!fetchThrottleTimeAfterShareFetch.isNaN && fetchThrottleTimeAfterShareFetch > 0,
s"ShareFetch should use FETCH quota sensor: $fetchThrottleTimeAfterShareFetch")
}
@Test
def testUnthrottledClient(): Unit = {
for (apiKey <- clientActions) {
@ -234,8 +280,8 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.PRODUCE =>
requests.ProduceRequest.builder(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(
Collections.singletonList(new ProduceRequestData.TopicProduceData()
.setTopicId(getTopicIds().get(tp.topic()).get).setPartitionData(Collections.singletonList(
util.List.of(new ProduceRequestData.TopicProduceData()
.setTopicId(getTopicIds().get(tp.topic()).get).setPartitionData(util.List.of(
new ProduceRequestData.PartitionProduceData()
.setIndex(tp.partition())
.setRecords(MemoryRecords.withRecords(Compression.NONE, new SimpleRecord("test".getBytes))))))
@ -249,17 +295,17 @@ class RequestQuotaTest extends BaseRequestTest {
FetchRequest.Builder.forConsumer(ApiKeys.FETCH.latestVersion, 0, 0, partitionMap)
case ApiKeys.METADATA =>
new MetadataRequest.Builder(List(topic).asJava, true)
new MetadataRequest.Builder(util.List.of(topic), true)
case ApiKeys.LIST_OFFSETS =>
val topic = new ListOffsetsTopic()
.setName(tp.topic)
.setPartitions(List(new ListOffsetsPartition()
.setPartitions(util.List.of(new ListOffsetsPartition()
.setPartitionIndex(tp.partition)
.setTimestamp(0L)
.setCurrentLeaderEpoch(15)).asJava)
.setCurrentLeaderEpoch(15)))
ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
.setTargetTimes(List(topic).asJava)
.setTargetTimes(util.List.of(topic))
case ApiKeys.OFFSET_COMMIT =>
OffsetCommitRequest.Builder.forTopicNames(
@ -268,11 +314,11 @@ class RequestQuotaTest extends BaseRequestTest {
.setGenerationIdOrMemberEpoch(1)
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
.setTopics(
Collections.singletonList(
util.List.of(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName(topic)
.setPartitions(
Collections.singletonList(
util.List.of(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
@ -286,15 +332,15 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.OFFSET_FETCH =>
OffsetFetchRequest.Builder.forTopicNames(
new OffsetFetchRequestData()
.setGroups(List(
.setGroups(util.List.of(
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("test-group")
.setTopics(List(
.setTopics(util.List.of(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName(tp.topic)
.setPartitionIndexes(List[Integer](tp.partition).asJava)
).asJava)
).asJava),
.setPartitionIndexes(util.List.of[Integer](tp.partition))
))
)),
false
)
@ -302,7 +348,7 @@ class RequestQuotaTest extends BaseRequestTest {
new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id)
.setCoordinatorKeys(Collections.singletonList("test-group")))
.setCoordinatorKeys(util.List.of("test-group")))
case ApiKeys.JOIN_GROUP =>
new JoinGroupRequest.Builder(
@ -314,7 +360,7 @@ class RequestQuotaTest extends BaseRequestTest {
.setProtocolType("consumer")
.setProtocols(
new JoinGroupRequestProtocolCollection(
Collections.singletonList(new JoinGroupRequestData.JoinGroupRequestProtocol()
util.List.of(new JoinGroupRequestData.JoinGroupRequestProtocol()
.setName("consumer-range")
.setMetadata("test".getBytes())).iterator()
)
@ -333,7 +379,7 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.LEAVE_GROUP =>
new LeaveGroupRequest.Builder(
"test-leave-group",
Collections.singletonList(
util.List.of(
new MemberIdentity()
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID))
)
@ -344,11 +390,11 @@ class RequestQuotaTest extends BaseRequestTest {
.setGroupId("test-sync-group")
.setGenerationId(1)
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
.setAssignments(Collections.emptyList())
.setAssignments(util.List.of)
)
case ApiKeys.DESCRIBE_GROUPS =>
new DescribeGroupsRequest.Builder(new DescribeGroupsRequestData().setGroups(List("test-group").asJava))
new DescribeGroupsRequest.Builder(new DescribeGroupsRequestData().setGroups(util.List.of("test-group")))
case ApiKeys.LIST_GROUPS =>
new ListGroupsRequest.Builder(new ListGroupsRequestData())
@ -365,23 +411,23 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.CREATE_TOPICS =>
new CreateTopicsRequest.Builder(
new CreateTopicsRequestData().setTopics(
new CreatableTopicCollection(Collections.singleton(
new CreatableTopicCollection(util.Set.of(
new CreatableTopic().setName("topic-2").setNumPartitions(1).
setReplicationFactor(1.toShort)).iterator())))
case ApiKeys.DELETE_TOPICS =>
new DeleteTopicsRequest.Builder(
new DeleteTopicsRequestData()
.setTopicNames(Collections.singletonList("topic-2"))
.setTopicNames(util.List.of("topic-2"))
.setTimeoutMs(5000))
case ApiKeys.DELETE_RECORDS =>
new DeleteRecordsRequest.Builder(
new DeleteRecordsRequestData()
.setTimeoutMs(5000)
.setTopics(Collections.singletonList(new DeleteRecordsRequestData.DeleteRecordsTopic()
.setTopics(util.List.of(new DeleteRecordsRequestData.DeleteRecordsTopic()
.setName(tp.topic())
.setPartitions(Collections.singletonList(new DeleteRecordsRequestData.DeleteRecordsPartition()
.setPartitions(util.List.of(new DeleteRecordsRequestData.DeleteRecordsPartition()
.setPartitionIndex(tp.partition())
.setOffset(0L))))))
@ -395,14 +441,14 @@ class RequestQuotaTest extends BaseRequestTest {
val epochs = new OffsetForLeaderTopicCollection()
epochs.add(new OffsetForLeaderTopic()
.setTopic(tp.topic())
.setPartitions(List(new OffsetForLeaderPartition()
.setPartitions(util.List.of(new OffsetForLeaderPartition()
.setPartition(tp.partition())
.setLeaderEpoch(0)
.setCurrentLeaderEpoch(15)).asJava))
.setCurrentLeaderEpoch(15))))
OffsetsForLeaderEpochRequest.Builder.forConsumer(epochs)
case ApiKeys.ADD_PARTITIONS_TO_TXN =>
AddPartitionsToTxnRequest.Builder.forClient("test-transactional-id", 1, 0, List(tp).asJava)
AddPartitionsToTxnRequest.Builder.forClient("test-transactional-id", 1, 0, util.List.of(tp))
case ApiKeys.ADD_OFFSETS_TO_TXN =>
new AddOffsetsToTxnRequest.Builder(new AddOffsetsToTxnRequestData()
@ -430,7 +476,7 @@ class RequestQuotaTest extends BaseRequestTest {
"test-txn-group",
2,
0,
Map.empty[TopicPartition, TxnOffsetCommitRequest.CommittedOffset].asJava,
util.Map.of[TopicPartition, TxnOffsetCommitRequest.CommittedOffset],
true
)
@ -438,7 +484,7 @@ class RequestQuotaTest extends BaseRequestTest {
new DescribeAclsRequest.Builder(AclBindingFilter.ANY)
case ApiKeys.CREATE_ACLS =>
new CreateAclsRequest.Builder(new CreateAclsRequestData().setCreations(Collections.singletonList(
new CreateAclsRequest.Builder(new CreateAclsRequestData().setCreations(util.List.of(
new CreateAclsRequestData.AclCreation()
.setResourceType(AdminResourceType.TOPIC.code)
.setResourceName("mytopic")
@ -448,7 +494,7 @@ class RequestQuotaTest extends BaseRequestTest {
.setOperation(AclOperation.WRITE.code)
.setPermissionType(AclPermissionType.DENY.code))))
case ApiKeys.DELETE_ACLS =>
new DeleteAclsRequest.Builder(new DeleteAclsRequestData().setFilters(Collections.singletonList(
new DeleteAclsRequest.Builder(new DeleteAclsRequestData().setFilters(util.List.of(
new DeleteAclsRequestData.DeleteAclsFilter()
.setResourceTypeFilter(AdminResourceType.TOPIC.code)
.setResourceNameFilter(null)
@ -459,14 +505,14 @@ class RequestQuotaTest extends BaseRequestTest {
.setPermissionType(AclPermissionType.DENY.code))))
case ApiKeys.DESCRIBE_CONFIGS =>
new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData()
.setResources(Collections.singletonList(new DescribeConfigsRequestData.DescribeConfigsResource()
.setResources(util.List.of(new DescribeConfigsRequestData.DescribeConfigsResource()
.setResourceType(ConfigResource.Type.TOPIC.id)
.setResourceName(tp.topic))))
case ApiKeys.ALTER_CONFIGS =>
new AlterConfigsRequest.Builder(
Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic),
new AlterConfigsRequest.Config(Collections.singleton(
util.Map.of(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic),
new AlterConfigsRequest.Config(util.Set.of(
new AlterConfigsRequest.ConfigEntry(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "1000000")
))), true)
@ -475,7 +521,7 @@ class RequestQuotaTest extends BaseRequestTest {
.setPath(logDir)
dir.topics.add(new AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopic()
.setName(tp.topic)
.setPartitions(Collections.singletonList(tp.partition)))
.setPartitions(util.List.of(tp.partition)))
val data = new AlterReplicaLogDirsRequestData()
data.dirs.add(dir)
new AlterReplicaLogDirsRequest.Builder(data)
@ -484,7 +530,7 @@ class RequestQuotaTest extends BaseRequestTest {
val data = new DescribeLogDirsRequestData()
data.topics.add(new DescribeLogDirsRequestData.DescribableLogDirTopic()
.setTopic(tp.topic)
.setPartitions(Collections.singletonList(tp.partition)))
.setPartitions(util.List.of(tp.partition)))
new DescribeLogDirsRequest.Builder(data)
case ApiKeys.CREATE_PARTITIONS =>
@ -497,7 +543,7 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.CREATE_DELEGATION_TOKEN =>
new CreateDelegationTokenRequest.Builder(
new CreateDelegationTokenRequestData()
.setRenewers(Collections.singletonList(new CreateDelegationTokenRequestData.CreatableRenewers()
.setRenewers(util.List.of(new CreateDelegationTokenRequestData.CreatableRenewers()
.setPrincipalType("User")
.setPrincipalName("test")))
.setMaxLifetimeMs(1000)
@ -510,7 +556,7 @@ class RequestQuotaTest extends BaseRequestTest {
.setExpiryTimePeriodMs(1000L))
case ApiKeys.DESCRIBE_DELEGATION_TOKEN =>
new DescribeDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")))
new DescribeDelegationTokenRequest.Builder(util.List.of(SecurityUtils.parseKafkaPrincipal("User:test")))
case ApiKeys.RENEW_DELEGATION_TOKEN =>
new RenewDelegationTokenRequest.Builder(
@ -520,12 +566,12 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.DELETE_GROUPS =>
new DeleteGroupsRequest.Builder(new DeleteGroupsRequestData()
.setGroupsNames(Collections.singletonList("test-group")))
.setGroupsNames(util.List.of("test-group")))
case ApiKeys.ELECT_LEADERS =>
new ElectLeadersRequest.Builder(
ElectionType.PREFERRED,
Collections.singletonList(new TopicPartition("my_topic", 0)),
util.List.of(new TopicPartition("my_topic", 0)),
0
)
@ -548,9 +594,9 @@ class RequestQuotaTest extends BaseRequestTest {
new OffsetDeleteRequestData()
.setGroupId("test-group")
.setTopics(new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(
Collections.singletonList(new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
util.List.of(new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
.setName("test-topic")
.setPartitions(Collections.singletonList(
.setPartitions(util.List.of(
new OffsetDeleteRequestData.OffsetDeleteRequestPartition()
.setPartitionIndex(0)))).iterator())))
@ -558,7 +604,7 @@ class RequestQuotaTest extends BaseRequestTest {
new DescribeClientQuotasRequest.Builder(ClientQuotaFilter.all())
case ApiKeys.ALTER_CLIENT_QUOTAS =>
new AlterClientQuotasRequest.Builder(List.empty.asJava, false)
new AlterClientQuotasRequest.Builder(util.List.of, false)
case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS =>
new DescribeUserScramCredentialsRequest.Builder(new DescribeUserScramCredentialsRequestData())
@ -574,7 +620,7 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.END_QUORUM_EPOCH =>
new EndQuorumEpochRequest.Builder(EndQuorumEpochRequest.singletonRequest(
tp, 10, 5, Collections.singletonList(3)))
tp, 10, 5, util.List.of(3)))
case ApiKeys.DESCRIBE_QUORUM =>
new DescribeQuorumRequest.Builder(DescribeQuorumRequest.singletonRequest(
@ -593,7 +639,7 @@ class RequestQuotaTest extends BaseRequestTest {
"client-id",
0
)
val embedRequestData = new AlterClientQuotasRequest.Builder(List.empty.asJava, false).build()
val embedRequestData = new AlterClientQuotasRequest.Builder(util.List.of, false).build()
.serializeWithHeader(requestHeader)
new EnvelopeRequest.Builder(embedRequestData, new Array[Byte](0),
InetAddress.getByName("192.168.1.1").getAddress)
@ -603,9 +649,9 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.DESCRIBE_PRODUCERS =>
new DescribeProducersRequest.Builder(new DescribeProducersRequestData()
.setTopics(List(new DescribeProducersRequestData.TopicRequest()
.setTopics(util.List.of(new DescribeProducersRequestData.TopicRequest()
.setName("test-topic")
.setPartitionIndexes(List(1, 2, 3).map(Int.box).asJava)).asJava))
.setPartitionIndexes(util.List.of[Integer](1, 2, 3)))))
case ApiKeys.BROKER_REGISTRATION =>
new BrokerRegistrationRequest.Builder(new BrokerRegistrationRequestData())
@ -618,7 +664,7 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.DESCRIBE_TRANSACTIONS =>
new DescribeTransactionsRequest.Builder(new DescribeTransactionsRequestData()
.setTransactionalIds(List("test-transactional-id").asJava))
.setTransactionalIds(util.List.of("test-transactional-id")))
case ApiKeys.LIST_TRANSACTIONS =>
new ListTransactionsRequest.Builder(new ListTransactionsRequestData())
@ -654,7 +700,23 @@ class RequestQuotaTest extends BaseRequestTest {
new ShareGroupDescribeRequest.Builder(new ShareGroupDescribeRequestData())
case ApiKeys.SHARE_FETCH =>
new ShareFetchRequest.Builder(new ShareFetchRequestData())
new ShareFetchRequest.Builder(
new ShareFetchRequestData()
.setGroupId("test-share-group")
.setMemberId(Uuid.randomUuid().toString)
.setShareSessionEpoch(0)
.setMaxWaitMs(0)
.setMinBytes(1)
.setMaxBytes(1000000)
.setTopics(new ShareFetchRequestData.FetchTopicCollection(
util.List.of(new ShareFetchRequestData.FetchTopic()
.setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID))
.setPartitions(
new ShareFetchRequestData.FetchPartitionCollection(
util.List.of(new ShareFetchRequestData.FetchPartition()
.setPartitionIndex(tp.partition)
).iterator))
).iterator)))
case ApiKeys.SHARE_ACKNOWLEDGE =>
new ShareAcknowledgeRequest.Builder(new ShareAcknowledgeRequestData())
@ -792,6 +854,21 @@ class RequestQuotaTest extends BaseRequestTest {
s"Throttle time metrics for request quota updated: $smallQuotaConsumerClient")
}
private def checkSmallQuotaShareFetchRequestThrottleTime(): Unit = {
// Request until throttled using client-id with default small consumer quota
// This test verifies ShareFetch is throttled similarly to Fetch (KIP-932)
val smallQuotaShareFetchClient = Client(smallQuotaConsumerClientId, ApiKeys.SHARE_FETCH)
val throttled = smallQuotaShareFetchClient.runUntil(_.throttleTimeMs > 0)
assertTrue(throttled, s"ShareFetch response not throttled: $smallQuotaShareFetchClient")
// KIP-932: ShareFetch should use the same quota and sensors as Fetch
// Since the implementation uses the same quota mechanisms, we verify throttling occurs
assertTrue(throttleTimeMetricValueForQuotaType(smallQuotaConsumerClientId, QuotaType.FETCH) > 0,
s"ShareFetch should be throttled using FETCH quota sensors: $smallQuotaShareFetchClient")
assertTrue(throttleTimeMetricValueForQuotaType(smallQuotaConsumerClientId, QuotaType.REQUEST).isNaN,
s"Throttle time metrics for request quota updated: $smallQuotaShareFetchClient")
}
private def checkUnthrottledClient(apiKey: ApiKeys): Unit = {
// Test that request from client with large quota is not throttled
@ -832,9 +909,12 @@ object RequestQuotaTest {
class KraftTestAuthorizer extends StandardAuthorizer {
override def authorize(requestContext: AuthorizableRequestContext, actions: util.List[Action]): util.List[AuthorizationResult] = {
actions.asScala.map { _ =>
if (requestContext.principal != UnauthorizedPrincipal) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED
}.asJava
val results = new util.ArrayList[AuthorizationResult]()
actions.forEach(_ => {
val result = if (requestContext.principal != UnauthorizedPrincipal) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED
results.add(result)
})
results
}
}