mirror of https://github.com/apache/kafka.git
MINOR: Move ClientQuotasRequestTest to server module (#20053)
CI / build (push) Waiting to run
Details
CI / build (push) Waiting to run
Details
1. Move ClientQuotasRequestTest to server module. 2. Rewrite ClientQuotasRequestTest in Java. Reviewers: Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
c162d2eb14
commit
9a2f202a1e
|
@ -1,592 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package kafka.server
|
|
||||||
|
|
||||||
import java.net.InetAddress
|
|
||||||
import java.util
|
|
||||||
import java.util.concurrent.{ExecutionException, TimeUnit}
|
|
||||||
import org.apache.kafka.common.test.api.ClusterTest
|
|
||||||
import kafka.utils.TestUtils
|
|
||||||
import org.apache.kafka.clients.admin.{ScramCredentialInfo, ScramMechanism, UserScramCredentialUpsertion}
|
|
||||||
import org.apache.kafka.common.errors.{InvalidRequestException, UnsupportedVersionException}
|
|
||||||
import org.apache.kafka.common.internals.KafkaFutureImpl
|
|
||||||
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
|
|
||||||
import org.apache.kafka.common.requests.{AlterClientQuotasRequest, AlterClientQuotasResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse}
|
|
||||||
import org.apache.kafka.common.test.ClusterInstance
|
|
||||||
import org.apache.kafka.server.IntegrationTestUtils
|
|
||||||
import org.apache.kafka.server.config.QuotaConfig
|
|
||||||
import org.junit.jupiter.api.Assertions._
|
|
||||||
import org.junit.jupiter.api.Disabled
|
|
||||||
|
|
||||||
import scala.jdk.CollectionConverters._
|
|
||||||
|
|
||||||
class ClientQuotasRequestTest(cluster: ClusterInstance) {
|
|
||||||
@ClusterTest
|
|
||||||
def testAlterClientQuotasRequest(): Unit = {
|
|
||||||
|
|
||||||
val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user", ClientQuotaEntity.CLIENT_ID -> "client-id").asJava)
|
|
||||||
|
|
||||||
// Expect an empty configuration.
|
|
||||||
verifyDescribeEntityQuotas(entity, Map.empty)
|
|
||||||
|
|
||||||
// Add two configuration entries.
|
|
||||||
alterEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(10000.0),
|
|
||||||
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> Some(20000.0)
|
|
||||||
), validateOnly = false)
|
|
||||||
|
|
||||||
verifyDescribeEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0,
|
|
||||||
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0
|
|
||||||
))
|
|
||||||
|
|
||||||
// Update an existing entry.
|
|
||||||
alterEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(15000.0)
|
|
||||||
), validateOnly = false)
|
|
||||||
|
|
||||||
verifyDescribeEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 15000.0,
|
|
||||||
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0
|
|
||||||
))
|
|
||||||
|
|
||||||
// Remove an existing configuration entry.
|
|
||||||
alterEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> None
|
|
||||||
), validateOnly = false)
|
|
||||||
|
|
||||||
verifyDescribeEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0
|
|
||||||
))
|
|
||||||
|
|
||||||
// Remove a non-existent configuration entry. This should make no changes.
|
|
||||||
alterEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> None
|
|
||||||
), validateOnly = false)
|
|
||||||
|
|
||||||
verifyDescribeEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0
|
|
||||||
))
|
|
||||||
|
|
||||||
// Add back a deleted configuration entry.
|
|
||||||
alterEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(5000.0)
|
|
||||||
), validateOnly = false)
|
|
||||||
|
|
||||||
verifyDescribeEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 5000.0,
|
|
||||||
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0
|
|
||||||
))
|
|
||||||
|
|
||||||
// Perform a mixed update.
|
|
||||||
alterEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(20000.0),
|
|
||||||
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> None,
|
|
||||||
QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> Some(12.3)
|
|
||||||
), validateOnly = false)
|
|
||||||
|
|
||||||
verifyDescribeEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0,
|
|
||||||
QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> 12.3
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
@ClusterTest
|
|
||||||
def testAlterClientQuotasRequestValidateOnly(): Unit = {
|
|
||||||
val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user").asJava)
|
|
||||||
|
|
||||||
// Set up a configuration.
|
|
||||||
alterEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(20000.0),
|
|
||||||
QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> Some(23.45)
|
|
||||||
), validateOnly = false)
|
|
||||||
|
|
||||||
verifyDescribeEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0,
|
|
||||||
QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> 23.45
|
|
||||||
))
|
|
||||||
|
|
||||||
// Validate-only addition.
|
|
||||||
alterEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> Some(50000.0)
|
|
||||||
), validateOnly = true)
|
|
||||||
|
|
||||||
verifyDescribeEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0,
|
|
||||||
QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> 23.45
|
|
||||||
))
|
|
||||||
|
|
||||||
// Validate-only modification.
|
|
||||||
alterEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(10000.0)
|
|
||||||
), validateOnly = true)
|
|
||||||
|
|
||||||
verifyDescribeEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0,
|
|
||||||
QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> 23.45
|
|
||||||
))
|
|
||||||
|
|
||||||
// Validate-only removal.
|
|
||||||
alterEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> None
|
|
||||||
), validateOnly = true)
|
|
||||||
|
|
||||||
verifyDescribeEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0,
|
|
||||||
QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> 23.45
|
|
||||||
))
|
|
||||||
|
|
||||||
// Validate-only mixed update.
|
|
||||||
alterEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(10000.0),
|
|
||||||
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> Some(50000.0),
|
|
||||||
QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> None
|
|
||||||
), validateOnly = true)
|
|
||||||
|
|
||||||
verifyDescribeEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0,
|
|
||||||
QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> 23.45
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
@Disabled("TODO: KAFKA-17630 - Convert ClientQuotasRequestTest#testClientQuotasForScramUsers to kraft")
|
|
||||||
@ClusterTest
|
|
||||||
def testClientQuotasForScramUsers(): Unit = {
|
|
||||||
val userName = "user"
|
|
||||||
|
|
||||||
val admin = cluster.admin()
|
|
||||||
try {
|
|
||||||
val results = admin.alterUserScramCredentials(util.Arrays.asList(
|
|
||||||
new UserScramCredentialUpsertion(userName, new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "password")))
|
|
||||||
results.all.get
|
|
||||||
|
|
||||||
val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> userName).asJava)
|
|
||||||
|
|
||||||
verifyDescribeEntityQuotas(entity, Map.empty)
|
|
||||||
|
|
||||||
alterEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(10000.0),
|
|
||||||
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> Some(20000.0)
|
|
||||||
), validateOnly = false)
|
|
||||||
|
|
||||||
verifyDescribeEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0,
|
|
||||||
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0
|
|
||||||
))
|
|
||||||
} finally {
|
|
||||||
admin.close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@ClusterTest
|
|
||||||
def testAlterIpQuotasRequest(): Unit = {
|
|
||||||
val knownHost = "1.2.3.4"
|
|
||||||
val unknownHost = "2.3.4.5"
|
|
||||||
val entity = toIpEntity(Some(knownHost))
|
|
||||||
val defaultEntity = toIpEntity(Some(null))
|
|
||||||
val entityFilter = ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.IP, knownHost)
|
|
||||||
val defaultEntityFilter = ClientQuotaFilterComponent.ofDefaultEntity(ClientQuotaEntity.IP)
|
|
||||||
val allIpEntityFilter = ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP)
|
|
||||||
|
|
||||||
def verifyIpQuotas(entityFilter: ClientQuotaFilterComponent, expectedMatches: Map[ClientQuotaEntity, Double]): Unit = {
|
|
||||||
TestUtils.tryUntilNoAssertionError() {
|
|
||||||
val result = describeClientQuotas(ClientQuotaFilter.containsOnly(List(entityFilter).asJava))
|
|
||||||
assertEquals(expectedMatches.keySet, result.asScala.keySet)
|
|
||||||
result.asScala.foreach { case (entity, props) =>
|
|
||||||
assertEquals(Set(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG), props.asScala.keySet)
|
|
||||||
assertEquals(expectedMatches(entity), props.get(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG))
|
|
||||||
val entityName = entity.entries.get(ClientQuotaEntity.IP)
|
|
||||||
// ClientQuotaEntity with null name maps to default entity
|
|
||||||
val entityIp = if (entityName == null)
|
|
||||||
InetAddress.getByName(unknownHost)
|
|
||||||
else
|
|
||||||
InetAddress.getByName(entityName)
|
|
||||||
var currentServerQuota = 0
|
|
||||||
currentServerQuota = cluster.brokers().values().asScala.head.socketServer.connectionQuotas.connectionRateForIp(entityIp)
|
|
||||||
assertTrue(Math.abs(expectedMatches(entity) - currentServerQuota) < 0.01,
|
|
||||||
s"Connection quota of $entity is not ${expectedMatches(entity)} but $currentServerQuota")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Expect an empty configuration.
|
|
||||||
verifyIpQuotas(allIpEntityFilter, Map.empty)
|
|
||||||
|
|
||||||
// Add a configuration entry.
|
|
||||||
alterEntityQuotas(entity, Map(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG -> Some(100.0)), validateOnly = false)
|
|
||||||
verifyIpQuotas(entityFilter, Map(entity -> 100.0))
|
|
||||||
|
|
||||||
// update existing entry
|
|
||||||
alterEntityQuotas(entity, Map(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG -> Some(150.0)), validateOnly = false)
|
|
||||||
verifyIpQuotas(entityFilter, Map(entity -> 150.0))
|
|
||||||
|
|
||||||
// update default value
|
|
||||||
alterEntityQuotas(defaultEntity, Map(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG -> Some(200.0)), validateOnly = false)
|
|
||||||
verifyIpQuotas(defaultEntityFilter, Map(defaultEntity -> 200.0))
|
|
||||||
|
|
||||||
// describe all IP quotas
|
|
||||||
verifyIpQuotas(allIpEntityFilter, Map(entity -> 150.0, defaultEntity -> 200.0))
|
|
||||||
|
|
||||||
// remove entry
|
|
||||||
alterEntityQuotas(entity, Map(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG -> None), validateOnly = false)
|
|
||||||
verifyIpQuotas(entityFilter, Map.empty)
|
|
||||||
|
|
||||||
// remove default value
|
|
||||||
alterEntityQuotas(defaultEntity, Map(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG -> None), validateOnly = false)
|
|
||||||
verifyIpQuotas(allIpEntityFilter, Map.empty)
|
|
||||||
}
|
|
||||||
|
|
||||||
@ClusterTest
|
|
||||||
def testAlterClientQuotasInvalidRequests(): Unit = {
|
|
||||||
var entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "").asJava)
|
|
||||||
assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> Some(12.34)), validateOnly = true))
|
|
||||||
|
|
||||||
entity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> "").asJava)
|
|
||||||
assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> Some(12.34)), validateOnly = true))
|
|
||||||
|
|
||||||
entity = new ClientQuotaEntity(Map("" -> "name").asJava)
|
|
||||||
assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> Some(12.34)), validateOnly = true))
|
|
||||||
|
|
||||||
entity = new ClientQuotaEntity(Map.empty.asJava)
|
|
||||||
assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(10000.5)), validateOnly = true))
|
|
||||||
|
|
||||||
entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user").asJava)
|
|
||||||
assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map("bad" -> Some(1.0)), validateOnly = true))
|
|
||||||
|
|
||||||
entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user").asJava)
|
|
||||||
assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(10000.5)), validateOnly = true))
|
|
||||||
}
|
|
||||||
|
|
||||||
private def expectInvalidRequestWithMessage(runnable: => Unit, expectedMessage: String): Unit = {
|
|
||||||
val exception = assertThrows(classOf[InvalidRequestException], () => runnable)
|
|
||||||
assertTrue(exception.getMessage.contains(expectedMessage), s"Expected message $exception to contain $expectedMessage")
|
|
||||||
}
|
|
||||||
|
|
||||||
@ClusterTest
|
|
||||||
def testAlterClientQuotasInvalidEntityCombination(): Unit = {
|
|
||||||
val userAndIpEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user", ClientQuotaEntity.IP -> "1.2.3.4").asJava)
|
|
||||||
val clientAndIpEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> "client", ClientQuotaEntity.IP -> "1.2.3.4").asJava)
|
|
||||||
val expectedExceptionMessage = "Invalid quota entity combination"
|
|
||||||
expectInvalidRequestWithMessage(alterEntityQuotas(userAndIpEntity, Map(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> Some(12.34)),
|
|
||||||
validateOnly = true), expectedExceptionMessage)
|
|
||||||
expectInvalidRequestWithMessage(alterEntityQuotas(clientAndIpEntity, Map(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> Some(12.34)),
|
|
||||||
validateOnly = true), expectedExceptionMessage)
|
|
||||||
}
|
|
||||||
|
|
||||||
@ClusterTest
|
|
||||||
def testAlterClientQuotasBadIp(): Unit = {
|
|
||||||
val invalidHostPatternEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> "not a valid host because it has spaces").asJava)
|
|
||||||
val unresolvableHostEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> "RFC2606.invalid").asJava)
|
|
||||||
val expectedExceptionMessage = "not a valid IP"
|
|
||||||
expectInvalidRequestWithMessage(alterEntityQuotas(invalidHostPatternEntity, Map(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG -> Some(50.0)),
|
|
||||||
validateOnly = true), expectedExceptionMessage)
|
|
||||||
expectInvalidRequestWithMessage(alterEntityQuotas(unresolvableHostEntity, Map(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG -> Some(50.0)),
|
|
||||||
validateOnly = true), expectedExceptionMessage)
|
|
||||||
}
|
|
||||||
|
|
||||||
@ClusterTest
|
|
||||||
def testDescribeClientQuotasInvalidFilterCombination(): Unit = {
|
|
||||||
val ipFilterComponent = ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP)
|
|
||||||
val userFilterComponent = ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER)
|
|
||||||
val clientIdFilterComponent = ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID)
|
|
||||||
val expectedExceptionMessage = "Invalid entity filter component combination"
|
|
||||||
expectInvalidRequestWithMessage(describeClientQuotas(ClientQuotaFilter.contains(List(ipFilterComponent, userFilterComponent).asJava)),
|
|
||||||
expectedExceptionMessage)
|
|
||||||
expectInvalidRequestWithMessage(describeClientQuotas(ClientQuotaFilter.contains(List(ipFilterComponent, clientIdFilterComponent).asJava)),
|
|
||||||
expectedExceptionMessage)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Entities to be matched against.
|
|
||||||
private val matchUserClientEntities = List(
|
|
||||||
(Some("user-1"), Some("client-id-1"), 50.50),
|
|
||||||
(Some("user-2"), Some("client-id-1"), 51.51),
|
|
||||||
(Some("user-3"), Some("client-id-2"), 52.52),
|
|
||||||
(Some(null), Some("client-id-1"), 53.53),
|
|
||||||
(Some("user-1"), Some(null), 54.54),
|
|
||||||
(Some("user-3"), Some(null), 55.55),
|
|
||||||
(Some("user-1"), None, 56.56),
|
|
||||||
(Some("user-2"), None, 57.57),
|
|
||||||
(Some("user-3"), None, 58.58),
|
|
||||||
(Some(null), None, 59.59),
|
|
||||||
(None, Some("client-id-2"), 60.60)
|
|
||||||
).map { case (u, c, v) => (toClientEntity(u, c), v) }
|
|
||||||
|
|
||||||
private val matchIpEntities = List(
|
|
||||||
(Some("1.2.3.4"), 10.0),
|
|
||||||
(Some("2.3.4.5"), 20.0)
|
|
||||||
).map { case (ip, quota) => (toIpEntity(ip), quota)}
|
|
||||||
|
|
||||||
private def setupDescribeClientQuotasMatchTest(): Unit = {
|
|
||||||
val userClientQuotas = matchUserClientEntities.map { case (e, v) =>
|
|
||||||
e -> Map((QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Some(v)))
|
|
||||||
}.toMap
|
|
||||||
val ipQuotas = matchIpEntities.map { case (e, v) =>
|
|
||||||
e -> Map((QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Some(v)))
|
|
||||||
}.toMap
|
|
||||||
val result = alterClientQuotas(userClientQuotas ++ ipQuotas, validateOnly = false)
|
|
||||||
(matchUserClientEntities ++ matchIpEntities).foreach(e => result(e._1).get(10, TimeUnit.SECONDS))
|
|
||||||
}
|
|
||||||
|
|
||||||
@ClusterTest
|
|
||||||
def testDescribeClientQuotasMatchExact(): Unit = {
|
|
||||||
setupDescribeClientQuotasMatchTest()
|
|
||||||
|
|
||||||
def matchEntity(entity: ClientQuotaEntity) = {
|
|
||||||
val components = entity.entries.asScala.map { case (entityType, entityName) =>
|
|
||||||
entityName match {
|
|
||||||
case null => ClientQuotaFilterComponent.ofDefaultEntity(entityType)
|
|
||||||
case name => ClientQuotaFilterComponent.ofEntity(entityType, name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
describeClientQuotas(ClientQuotaFilter.containsOnly(components.toList.asJava))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test exact matches.
|
|
||||||
matchUserClientEntities.foreach { case (e, v) =>
|
|
||||||
TestUtils.tryUntilNoAssertionError() {
|
|
||||||
val result = matchEntity(e)
|
|
||||||
assertEquals(1, result.size)
|
|
||||||
assertTrue(result.get(e) != null)
|
|
||||||
val value = result.get(e).get(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG)
|
|
||||||
assertNotNull(value)
|
|
||||||
assertEquals(value, v, 1e-6)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Entities not contained in `matchEntityList`.
|
|
||||||
val notMatchEntities = List(
|
|
||||||
(Some("user-1"), Some("client-id-2")),
|
|
||||||
(Some("user-3"), Some("client-id-1")),
|
|
||||||
(Some("user-2"), Some(null)),
|
|
||||||
(Some("user-4"), None),
|
|
||||||
(Some(null), Some("client-id-2")),
|
|
||||||
(None, Some("client-id-1")),
|
|
||||||
(None, Some("client-id-3")),
|
|
||||||
).map { case (u, c) =>
|
|
||||||
new ClientQuotaEntity((u.map((ClientQuotaEntity.USER, _)) ++
|
|
||||||
c.map((ClientQuotaEntity.CLIENT_ID, _))).toMap.asJava)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify exact matches of the non-matches returns empty.
|
|
||||||
notMatchEntities.foreach { e =>
|
|
||||||
val result = matchEntity(e)
|
|
||||||
assertEquals(0, result.size)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@ClusterTest
|
|
||||||
def testDescribeClientQuotasMatchPartial(): Unit = {
|
|
||||||
setupDescribeClientQuotasMatchTest()
|
|
||||||
|
|
||||||
def testMatchEntities(filter: ClientQuotaFilter, expectedMatchSize: Int, partition: ClientQuotaEntity => Boolean): Unit = {
|
|
||||||
TestUtils.tryUntilNoAssertionError() {
|
|
||||||
val result = describeClientQuotas(filter)
|
|
||||||
val (expectedMatches, _) = (matchUserClientEntities ++ matchIpEntities).partition(e => partition(e._1))
|
|
||||||
assertEquals(expectedMatchSize, expectedMatches.size) // for test verification
|
|
||||||
assertEquals(expectedMatchSize, result.size, s"Failed to match $expectedMatchSize entities for $filter")
|
|
||||||
val expectedMatchesMap = expectedMatches.toMap
|
|
||||||
matchUserClientEntities.foreach { case (entity, expectedValue) =>
|
|
||||||
if (expectedMatchesMap.contains(entity)) {
|
|
||||||
val config = result.get(entity)
|
|
||||||
assertNotNull(config)
|
|
||||||
val value = config.get(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG)
|
|
||||||
assertNotNull(value)
|
|
||||||
assertEquals(expectedValue, value, 1e-6)
|
|
||||||
} else {
|
|
||||||
assertNull(result.get(entity))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
matchIpEntities.foreach { case (entity, expectedValue) =>
|
|
||||||
if (expectedMatchesMap.contains(entity)) {
|
|
||||||
val config = result.get(entity)
|
|
||||||
assertNotNull(config)
|
|
||||||
val value = config.get(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG)
|
|
||||||
assertNotNull(value)
|
|
||||||
assertEquals(expectedValue, value, 1e-6)
|
|
||||||
} else {
|
|
||||||
assertNull(result.get(entity))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Match open-ended existing user.
|
|
||||||
testMatchEntities(
|
|
||||||
ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER, "user-1")).asJava), 3,
|
|
||||||
entity => entity.entries.get(ClientQuotaEntity.USER) == "user-1"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Match open-ended non-existent user.
|
|
||||||
testMatchEntities(
|
|
||||||
ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER, "unknown")).asJava), 0,
|
|
||||||
entity => false
|
|
||||||
)
|
|
||||||
|
|
||||||
// Match open-ended existing client ID.
|
|
||||||
testMatchEntities(
|
|
||||||
ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.CLIENT_ID, "client-id-2")).asJava), 2,
|
|
||||||
entity => entity.entries.get(ClientQuotaEntity.CLIENT_ID) == "client-id-2"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Match open-ended default user.
|
|
||||||
testMatchEntities(
|
|
||||||
ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofDefaultEntity(ClientQuotaEntity.USER)).asJava), 2,
|
|
||||||
entity => entity.entries.containsKey(ClientQuotaEntity.USER) && entity.entries.get(ClientQuotaEntity.USER) == null
|
|
||||||
)
|
|
||||||
|
|
||||||
// Match close-ended existing user.
|
|
||||||
testMatchEntities(
|
|
||||||
ClientQuotaFilter.containsOnly(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER, "user-2")).asJava), 1,
|
|
||||||
entity => entity.entries.get(ClientQuotaEntity.USER) == "user-2" && !entity.entries.containsKey(ClientQuotaEntity.CLIENT_ID)
|
|
||||||
)
|
|
||||||
|
|
||||||
// Match close-ended existing client ID that has no matching entity.
|
|
||||||
testMatchEntities(
|
|
||||||
ClientQuotaFilter.containsOnly(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.CLIENT_ID, "client-id-1")).asJava), 0,
|
|
||||||
entity => false
|
|
||||||
)
|
|
||||||
|
|
||||||
// Match against all entities with the user type in a close-ended match.
|
|
||||||
testMatchEntities(
|
|
||||||
ClientQuotaFilter.containsOnly(List(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER)).asJava), 4,
|
|
||||||
entity => entity.entries.containsKey(ClientQuotaEntity.USER) && !entity.entries.containsKey(ClientQuotaEntity.CLIENT_ID)
|
|
||||||
)
|
|
||||||
|
|
||||||
// Match against all entities with the user type in an open-ended match.
|
|
||||||
testMatchEntities(
|
|
||||||
ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER)).asJava), 10,
|
|
||||||
entity => entity.entries.containsKey(ClientQuotaEntity.USER)
|
|
||||||
)
|
|
||||||
|
|
||||||
// Match against all entities with the client ID type in a close-ended match.
|
|
||||||
testMatchEntities(
|
|
||||||
ClientQuotaFilter.containsOnly(List(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID)).asJava), 1,
|
|
||||||
entity => entity.entries.containsKey(ClientQuotaEntity.CLIENT_ID) && !entity.entries.containsKey(ClientQuotaEntity.USER)
|
|
||||||
)
|
|
||||||
|
|
||||||
// Match against all entities with the client ID type in an open-ended match.
|
|
||||||
testMatchEntities(
|
|
||||||
ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID)).asJava), 7,
|
|
||||||
entity => entity.entries.containsKey(ClientQuotaEntity.CLIENT_ID)
|
|
||||||
)
|
|
||||||
|
|
||||||
// Match against all entities with IP type in an open-ended match.
|
|
||||||
testMatchEntities(
|
|
||||||
ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP)).asJava), 2,
|
|
||||||
entity => entity.entries.containsKey(ClientQuotaEntity.IP)
|
|
||||||
)
|
|
||||||
|
|
||||||
// Match open-ended empty filter list. This should match all entities.
|
|
||||||
testMatchEntities(ClientQuotaFilter.contains(List.empty.asJava), 13, entity => true)
|
|
||||||
|
|
||||||
// Match close-ended empty filter list. This should match no entities.
|
|
||||||
testMatchEntities(ClientQuotaFilter.containsOnly(List.empty.asJava), 0, _ => false)
|
|
||||||
}
|
|
||||||
|
|
||||||
@ClusterTest
|
|
||||||
def testClientQuotasUnsupportedEntityTypes(): Unit = {
|
|
||||||
val entity = new ClientQuotaEntity(Map("other" -> "name").asJava)
|
|
||||||
assertThrows(classOf[UnsupportedVersionException], () => verifyDescribeEntityQuotas(entity, Map.empty))
|
|
||||||
}
|
|
||||||
|
|
||||||
@ClusterTest
|
|
||||||
def testClientQuotasSanitized(): Unit = {
|
|
||||||
// An entity with name that must be sanitized when writing to Zookeeper.
|
|
||||||
val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user with spaces").asJava)
|
|
||||||
|
|
||||||
alterEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(20000.0),
|
|
||||||
), validateOnly = false)
|
|
||||||
|
|
||||||
verifyDescribeEntityQuotas(entity, Map(
|
|
||||||
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
private def verifyDescribeEntityQuotas(entity: ClientQuotaEntity, quotas: Map[String, Double]): Unit = {
|
|
||||||
TestUtils.tryUntilNoAssertionError(waitTime = 5000L) {
|
|
||||||
val components = entity.entries.asScala.map { case (entityType, entityName) =>
|
|
||||||
Option(entityName).map{ name => ClientQuotaFilterComponent.ofEntity(entityType, name)}
|
|
||||||
.getOrElse(ClientQuotaFilterComponent.ofDefaultEntity(entityType)
|
|
||||||
)
|
|
||||||
}
|
|
||||||
val describe = describeClientQuotas(ClientQuotaFilter.containsOnly(components.toList.asJava))
|
|
||||||
if (quotas.isEmpty) {
|
|
||||||
assertEquals(0, describe.size)
|
|
||||||
} else {
|
|
||||||
assertEquals(1, describe.size)
|
|
||||||
val configs = describe.get(entity)
|
|
||||||
assertNotNull(configs)
|
|
||||||
assertEquals(quotas.size, configs.size)
|
|
||||||
quotas.foreach { case (k, v) =>
|
|
||||||
val value = configs.get(k)
|
|
||||||
assertNotNull(value)
|
|
||||||
assertEquals(v, value, 1e-6)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private def toClientEntity(user: Option[String], clientId: Option[String]) =
|
|
||||||
new ClientQuotaEntity((user.map(ClientQuotaEntity.USER -> _) ++ clientId.map(ClientQuotaEntity.CLIENT_ID -> _)).toMap.asJava)
|
|
||||||
|
|
||||||
private def toIpEntity(ip: Option[String]) = new ClientQuotaEntity(ip.map(ClientQuotaEntity.IP -> _).toMap.asJava)
|
|
||||||
|
|
||||||
private def describeClientQuotas(filter: ClientQuotaFilter) = {
|
|
||||||
val result = new KafkaFutureImpl[java.util.Map[ClientQuotaEntity, java.util.Map[String, java.lang.Double]]]
|
|
||||||
sendDescribeClientQuotasRequest(filter).complete(result)
|
|
||||||
try result.get catch {
|
|
||||||
case e: ExecutionException => throw e.getCause
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private def sendDescribeClientQuotasRequest(filter: ClientQuotaFilter): DescribeClientQuotasResponse = {
|
|
||||||
val request = new DescribeClientQuotasRequest.Builder(filter).build()
|
|
||||||
IntegrationTestUtils.connectAndReceive[DescribeClientQuotasResponse](
|
|
||||||
request,
|
|
||||||
cluster.boundPorts().get(0))
|
|
||||||
}
|
|
||||||
|
|
||||||
private def alterEntityQuotas(entity: ClientQuotaEntity, alter: Map[String, Option[Double]], validateOnly: Boolean) =
|
|
||||||
try alterClientQuotas(Map(entity -> alter), validateOnly)(entity).get(10, TimeUnit.SECONDS) catch {
|
|
||||||
case e: ExecutionException => throw e.getCause
|
|
||||||
}
|
|
||||||
|
|
||||||
private def alterClientQuotas(request: Map[ClientQuotaEntity, Map[String, Option[Double]]], validateOnly: Boolean) = {
|
|
||||||
val entries = request.map { case (entity, alter) =>
|
|
||||||
val ops = alter.map { case (key, value) =>
|
|
||||||
new ClientQuotaAlteration.Op(key, value.map(Double.box).orNull)
|
|
||||||
}.asJavaCollection
|
|
||||||
new ClientQuotaAlteration(entity, ops)
|
|
||||||
}
|
|
||||||
|
|
||||||
val response = request.map(e => e._1 -> new KafkaFutureImpl[Void]).asJava
|
|
||||||
sendAlterClientQuotasRequest(entries, validateOnly).complete(response)
|
|
||||||
val result = response.asScala
|
|
||||||
assertEquals(request.size, result.size)
|
|
||||||
request.foreach(e => assertTrue(result.contains(e._1)))
|
|
||||||
result
|
|
||||||
}
|
|
||||||
|
|
||||||
private def sendAlterClientQuotasRequest(entries: Iterable[ClientQuotaAlteration], validateOnly: Boolean): AlterClientQuotasResponse = {
|
|
||||||
val request = new AlterClientQuotasRequest.Builder(entries.asJavaCollection, validateOnly).build()
|
|
||||||
IntegrationTestUtils.connectAndReceive[AlterClientQuotasResponse](
|
|
||||||
request,
|
|
||||||
cluster.boundPorts().get(0))
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,726 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.server.quota;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.admin.Admin;
|
||||||
|
import org.apache.kafka.clients.admin.AlterClientQuotasOptions;
|
||||||
|
import org.apache.kafka.clients.admin.AlterUserScramCredentialsResult;
|
||||||
|
import org.apache.kafka.clients.admin.ScramCredentialInfo;
|
||||||
|
import org.apache.kafka.clients.admin.ScramMechanism;
|
||||||
|
import org.apache.kafka.clients.admin.UserScramCredentialUpsertion;
|
||||||
|
import org.apache.kafka.common.KafkaFuture;
|
||||||
|
import org.apache.kafka.common.errors.InvalidRequestException;
|
||||||
|
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
||||||
|
import org.apache.kafka.common.quota.ClientQuotaAlteration;
|
||||||
|
import org.apache.kafka.common.quota.ClientQuotaEntity;
|
||||||
|
import org.apache.kafka.common.quota.ClientQuotaFilter;
|
||||||
|
import org.apache.kafka.common.quota.ClientQuotaFilterComponent;
|
||||||
|
import org.apache.kafka.common.test.ClusterInstance;
|
||||||
|
import org.apache.kafka.common.test.api.ClusterTest;
|
||||||
|
import org.apache.kafka.server.config.QuotaConfig;
|
||||||
|
import org.apache.kafka.test.TestUtils;
|
||||||
|
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
public class ClientQuotasRequestTest {
|
||||||
|
private final ClusterInstance cluster;
|
||||||
|
|
||||||
|
public ClientQuotasRequestTest(ClusterInstance cluster) {
|
||||||
|
this.cluster = cluster;
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testAlterClientQuotasRequest() throws InterruptedException {
|
||||||
|
ClientQuotaEntity entity = new ClientQuotaEntity(
|
||||||
|
Map.of(ClientQuotaEntity.USER, "user", ClientQuotaEntity.CLIENT_ID, "client-id"));
|
||||||
|
|
||||||
|
// Expect an empty configuration.
|
||||||
|
verifyDescribeEntityQuotas(entity, Map.of());
|
||||||
|
|
||||||
|
// Add two configuration entries.
|
||||||
|
alterEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(10000.0),
|
||||||
|
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(20000.0)
|
||||||
|
), false);
|
||||||
|
|
||||||
|
verifyDescribeEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 10000.0,
|
||||||
|
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0
|
||||||
|
));
|
||||||
|
|
||||||
|
// Update an existing entry.
|
||||||
|
alterEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(15000.0)
|
||||||
|
), false);
|
||||||
|
|
||||||
|
verifyDescribeEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 15000.0,
|
||||||
|
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0
|
||||||
|
));
|
||||||
|
|
||||||
|
// Remove an existing configuration entry.
|
||||||
|
alterEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.empty()
|
||||||
|
), false);
|
||||||
|
|
||||||
|
verifyDescribeEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0
|
||||||
|
));
|
||||||
|
|
||||||
|
// Remove a non-existent configuration entry. This should make no changes.
|
||||||
|
alterEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.empty()
|
||||||
|
), false);
|
||||||
|
|
||||||
|
verifyDescribeEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0
|
||||||
|
));
|
||||||
|
|
||||||
|
// Add back a deleted configuration entry.
|
||||||
|
alterEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(5000.0)
|
||||||
|
), false);
|
||||||
|
|
||||||
|
verifyDescribeEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 5000.0,
|
||||||
|
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0
|
||||||
|
));
|
||||||
|
|
||||||
|
// Perform a mixed update.
|
||||||
|
alterEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(20000.0),
|
||||||
|
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, Optional.empty(),
|
||||||
|
QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.of(12.3)
|
||||||
|
), false);
|
||||||
|
|
||||||
|
verifyDescribeEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0,
|
||||||
|
QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, 12.3
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testAlterClientQuotasRequestValidateOnly() throws InterruptedException {
|
||||||
|
ClientQuotaEntity entity = new ClientQuotaEntity(Map.of(ClientQuotaEntity.USER, "user"));
|
||||||
|
|
||||||
|
// Set up a configuration.
|
||||||
|
alterEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(20000.0),
|
||||||
|
QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.of(23.45)
|
||||||
|
), false);
|
||||||
|
|
||||||
|
verifyDescribeEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0,
|
||||||
|
QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, 23.45
|
||||||
|
));
|
||||||
|
|
||||||
|
// Validate-only addition.
|
||||||
|
alterEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(50000.0)
|
||||||
|
), true);
|
||||||
|
|
||||||
|
verifyDescribeEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0,
|
||||||
|
QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, 23.45
|
||||||
|
));
|
||||||
|
|
||||||
|
// Validate-only modification.
|
||||||
|
alterEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(10000.0)
|
||||||
|
), true);
|
||||||
|
|
||||||
|
verifyDescribeEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0,
|
||||||
|
QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, 23.45
|
||||||
|
));
|
||||||
|
|
||||||
|
// Validate-only removal.
|
||||||
|
alterEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.empty()
|
||||||
|
), true);
|
||||||
|
|
||||||
|
verifyDescribeEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0,
|
||||||
|
QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, 23.45
|
||||||
|
));
|
||||||
|
|
||||||
|
// Validate-only mixed update.
|
||||||
|
alterEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(10000.0),
|
||||||
|
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(50000.0),
|
||||||
|
QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.empty()
|
||||||
|
), true);
|
||||||
|
|
||||||
|
verifyDescribeEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0,
|
||||||
|
QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, 23.45
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testClientQuotasForScramUsers() throws InterruptedException, ExecutionException {
|
||||||
|
final String userName = "user";
|
||||||
|
|
||||||
|
try (Admin admin = cluster.admin()) {
|
||||||
|
AlterUserScramCredentialsResult results = admin.alterUserScramCredentials(List.of(
|
||||||
|
new UserScramCredentialUpsertion(userName, new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "password")));
|
||||||
|
results.all().get();
|
||||||
|
|
||||||
|
ClientQuotaEntity entity = new ClientQuotaEntity(Map.of(ClientQuotaEntity.USER, userName));
|
||||||
|
|
||||||
|
verifyDescribeEntityQuotas(entity, Map.of());
|
||||||
|
|
||||||
|
alterEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(10000.0),
|
||||||
|
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(20000.0)
|
||||||
|
), false);
|
||||||
|
|
||||||
|
verifyDescribeEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 10000.0,
|
||||||
|
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testAlterIpQuotasRequest() throws InterruptedException {
|
||||||
|
final String knownHost = "1.2.3.4";
|
||||||
|
final String unknownHost = "2.3.4.5";
|
||||||
|
ClientQuotaEntity entity = toIpEntity(Optional.of(knownHost));
|
||||||
|
ClientQuotaEntity defaultEntity = toIpEntity(Optional.empty());
|
||||||
|
ClientQuotaFilterComponent entityFilter = ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.IP, knownHost);
|
||||||
|
ClientQuotaFilterComponent defaultEntityFilter = ClientQuotaFilterComponent.ofDefaultEntity(ClientQuotaEntity.IP);
|
||||||
|
ClientQuotaFilterComponent allIpEntityFilter = ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP);
|
||||||
|
|
||||||
|
// Expect an empty configuration.
|
||||||
|
verifyIpQuotas(allIpEntityFilter, Map.of(), unknownHost);
|
||||||
|
|
||||||
|
// Add a configuration entry.
|
||||||
|
alterEntityQuotas(entity, Map.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Optional.of(100.0)), false);
|
||||||
|
verifyIpQuotas(entityFilter, Map.of(entity, 100.0), unknownHost);
|
||||||
|
|
||||||
|
// update existing entry
|
||||||
|
alterEntityQuotas(entity, Map.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Optional.of(150.0)), false);
|
||||||
|
verifyIpQuotas(entityFilter, Map.of(entity, 150.0), unknownHost);
|
||||||
|
|
||||||
|
// update default value
|
||||||
|
alterEntityQuotas(defaultEntity, Map.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Optional.of(200.0)), false);
|
||||||
|
verifyIpQuotas(defaultEntityFilter, Map.of(defaultEntity, 200.0), unknownHost);
|
||||||
|
|
||||||
|
// describe all IP quotas
|
||||||
|
verifyIpQuotas(allIpEntityFilter, Map.of(entity, 150.0, defaultEntity, 200.0), unknownHost);
|
||||||
|
|
||||||
|
// remove entry
|
||||||
|
alterEntityQuotas(entity, Map.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Optional.empty()), false);
|
||||||
|
verifyIpQuotas(entityFilter, Map.of(), unknownHost);
|
||||||
|
|
||||||
|
// remove default value
|
||||||
|
alterEntityQuotas(defaultEntity, Map.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Optional.empty()), false);
|
||||||
|
verifyIpQuotas(allIpEntityFilter, Map.of(), unknownHost);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyIpQuotas(ClientQuotaFilterComponent entityFilter, Map<ClientQuotaEntity, Double> expectedMatches,
|
||||||
|
String unknownHost) throws InterruptedException {
|
||||||
|
|
||||||
|
TestUtils.retryOnExceptionWithTimeout(5000L, () -> {
|
||||||
|
Map<ClientQuotaEntity, Map<String, Double>> result = describeClientQuotas(
|
||||||
|
ClientQuotaFilter.containsOnly(List.of(entityFilter))).get();
|
||||||
|
assertEquals(expectedMatches.keySet(), result.keySet());
|
||||||
|
|
||||||
|
for (Map.Entry<ClientQuotaEntity, Map<String, Double>> entry : result.entrySet()) {
|
||||||
|
ClientQuotaEntity entity = entry.getKey();
|
||||||
|
Map<String, Double> props = entry.getValue();
|
||||||
|
assertEquals(Set.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG), props.keySet());
|
||||||
|
assertEquals(expectedMatches.get(entity), props.get(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG));
|
||||||
|
String entityName = entity.entries().get(ClientQuotaEntity.IP);
|
||||||
|
// ClientQuotaEntity with null name maps to default entity
|
||||||
|
InetAddress entityIp = entityName == null
|
||||||
|
? InetAddress.getByName(unknownHost)
|
||||||
|
: InetAddress.getByName(entityName);
|
||||||
|
int currentServerQuota = cluster.brokers()
|
||||||
|
.values()
|
||||||
|
.iterator()
|
||||||
|
.next()
|
||||||
|
.socketServer()
|
||||||
|
.connectionQuotas()
|
||||||
|
.connectionRateForIp(entityIp);
|
||||||
|
assertTrue(Math.abs(expectedMatches.get(entity) - currentServerQuota) < 0.01,
|
||||||
|
String.format("Connection quota of %s is not %s but %s", entity, expectedMatches.get(entity), currentServerQuota));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testAlterClientQuotasInvalidRequests() {
|
||||||
|
final ClientQuotaEntity entity1 = new ClientQuotaEntity(Map.of(ClientQuotaEntity.USER, ""));
|
||||||
|
TestUtils.assertFutureThrows(InvalidRequestException.class,
|
||||||
|
alterEntityQuotas(entity1, Map.of(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.of(12.34)), true));
|
||||||
|
|
||||||
|
final ClientQuotaEntity entity2 = new ClientQuotaEntity(Map.of(ClientQuotaEntity.CLIENT_ID, ""));
|
||||||
|
TestUtils.assertFutureThrows(InvalidRequestException.class,
|
||||||
|
alterEntityQuotas(entity2, Map.of(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.of(12.34)), true));
|
||||||
|
|
||||||
|
final ClientQuotaEntity entity3 = new ClientQuotaEntity(Map.of("", "name"));
|
||||||
|
TestUtils.assertFutureThrows(InvalidRequestException.class,
|
||||||
|
alterEntityQuotas(entity3, Map.of(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.of(12.34)), true));
|
||||||
|
|
||||||
|
final ClientQuotaEntity entity4 = new ClientQuotaEntity(Map.of());
|
||||||
|
TestUtils.assertFutureThrows(InvalidRequestException.class,
|
||||||
|
alterEntityQuotas(entity4, Map.of(QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(10000.5)), true));
|
||||||
|
|
||||||
|
final ClientQuotaEntity entity5 = new ClientQuotaEntity(Map.of(ClientQuotaEntity.USER, "user"));
|
||||||
|
TestUtils.assertFutureThrows(InvalidRequestException.class,
|
||||||
|
alterEntityQuotas(entity5, Map.of("bad", Optional.of(1.0)), true));
|
||||||
|
|
||||||
|
final ClientQuotaEntity entity6 = new ClientQuotaEntity(Map.of(ClientQuotaEntity.USER, "user"));
|
||||||
|
TestUtils.assertFutureThrows(InvalidRequestException.class,
|
||||||
|
alterEntityQuotas(entity6, Map.of(QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(10000.5)), true));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void expectInvalidRequestWithMessage(Future<?> future, String expectedMessage) {
|
||||||
|
InvalidRequestException exception = TestUtils.assertFutureThrows(InvalidRequestException.class, future);
|
||||||
|
assertNotNull(exception);
|
||||||
|
assertTrue(
|
||||||
|
exception.getMessage().contains(expectedMessage),
|
||||||
|
String.format("Expected message %s to contain %s", exception, expectedMessage)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testAlterClientQuotasInvalidEntityCombination() {
|
||||||
|
ClientQuotaEntity userAndIpEntity = new ClientQuotaEntity(
|
||||||
|
Map.of(ClientQuotaEntity.USER, "user", ClientQuotaEntity.IP, "1.2.3.4")
|
||||||
|
);
|
||||||
|
ClientQuotaEntity clientAndIpEntity = new ClientQuotaEntity(
|
||||||
|
Map.of(ClientQuotaEntity.CLIENT_ID, "client", ClientQuotaEntity.IP, "1.2.3.4")
|
||||||
|
);
|
||||||
|
final String expectedExceptionMessage = "Invalid quota entity combination";
|
||||||
|
|
||||||
|
expectInvalidRequestWithMessage(
|
||||||
|
alterEntityQuotas(userAndIpEntity, Map.of(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.of(12.34)), true),
|
||||||
|
expectedExceptionMessage
|
||||||
|
);
|
||||||
|
|
||||||
|
expectInvalidRequestWithMessage(
|
||||||
|
alterEntityQuotas(clientAndIpEntity, Map.of(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.of(12.34)), true),
|
||||||
|
expectedExceptionMessage
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testAlterClientQuotasBadIp() {
|
||||||
|
ClientQuotaEntity invalidHostPatternEntity = new ClientQuotaEntity(
|
||||||
|
Map.of(ClientQuotaEntity.IP, "not a valid host because it has spaces")
|
||||||
|
);
|
||||||
|
ClientQuotaEntity unresolvableHostEntity = new ClientQuotaEntity(
|
||||||
|
Map.of(ClientQuotaEntity.IP, "RFC2606.invalid")
|
||||||
|
);
|
||||||
|
final String expectedExceptionMessage = "not a valid IP";
|
||||||
|
|
||||||
|
expectInvalidRequestWithMessage(
|
||||||
|
alterEntityQuotas(invalidHostPatternEntity, Map.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Optional.of(50.0)), true),
|
||||||
|
expectedExceptionMessage
|
||||||
|
);
|
||||||
|
|
||||||
|
expectInvalidRequestWithMessage(
|
||||||
|
alterEntityQuotas(unresolvableHostEntity, Map.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Optional.of(50.0)), true),
|
||||||
|
expectedExceptionMessage
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testDescribeClientQuotasInvalidFilterCombination() {
|
||||||
|
ClientQuotaFilterComponent ipFilterComponent = ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP);
|
||||||
|
ClientQuotaFilterComponent userFilterComponent = ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER);
|
||||||
|
ClientQuotaFilterComponent clientIdFilterComponent = ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID);
|
||||||
|
final String expectedExceptionMessage = "Invalid entity filter component combination";
|
||||||
|
|
||||||
|
expectInvalidRequestWithMessage(
|
||||||
|
describeClientQuotas(ClientQuotaFilter.contains(List.of(ipFilterComponent, userFilterComponent))),
|
||||||
|
expectedExceptionMessage
|
||||||
|
);
|
||||||
|
expectInvalidRequestWithMessage(
|
||||||
|
describeClientQuotas(ClientQuotaFilter.contains(List.of(ipFilterComponent, clientIdFilterComponent))),
|
||||||
|
expectedExceptionMessage
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Entities to be matched against.
|
||||||
|
private final Map<ClientQuotaEntity, Double> matchUserClientEntities = new HashMap<>(Map.ofEntries(
|
||||||
|
Map.entry(toClientEntity(toUserMap("user-1"), toClientIdMap("client-id-1")), 50.50),
|
||||||
|
Map.entry(toClientEntity(toUserMap("user-2"), toClientIdMap("client-id-1")), 51.51),
|
||||||
|
Map.entry(toClientEntity(toUserMap("user-3"), toClientIdMap("client-id-2")), 52.52),
|
||||||
|
Map.entry(toClientEntity(toUserMap(null), toClientIdMap("client-id-1")), 53.53),
|
||||||
|
Map.entry(toClientEntity(toUserMap("user-1"), toClientIdMap(null)), 54.54),
|
||||||
|
Map.entry(toClientEntity(toUserMap("user-3"), toClientIdMap(null)), 55.55),
|
||||||
|
Map.entry(toClientEntity(toUserMap("user-1")), 56.56),
|
||||||
|
Map.entry(toClientEntity(toUserMap("user-2")), 57.57),
|
||||||
|
Map.entry(toClientEntity(toUserMap("user-3")), 58.58),
|
||||||
|
Map.entry(toClientEntity(toUserMap(null)), 59.59),
|
||||||
|
Map.entry(toClientEntity(toClientIdMap("client-id-2")), 60.60)
|
||||||
|
));
|
||||||
|
|
||||||
|
private final Map<ClientQuotaEntity, Double> matchIpEntities = Map.of(
|
||||||
|
toIpEntity(Optional.of("1.2.3.4")), 10.0,
|
||||||
|
toIpEntity(Optional.of("2.3.4.5")), 20.0
|
||||||
|
);
|
||||||
|
|
||||||
|
private void setupDescribeClientQuotasMatchTest() {
|
||||||
|
Map<ClientQuotaEntity, Map<String, Optional<Double>>> userClientQuotas = matchUserClientEntities.entrySet()
|
||||||
|
.stream()
|
||||||
|
.collect(Collectors.toMap(Map.Entry::getKey,
|
||||||
|
e -> Map.of(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.of(e.getValue()))));
|
||||||
|
|
||||||
|
Map<ClientQuotaEntity, Map<String, Optional<Double>>> ipQuotas = matchIpEntities.entrySet()
|
||||||
|
.stream()
|
||||||
|
.collect(Collectors.toMap(Map.Entry::getKey,
|
||||||
|
e -> Map.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Optional.of(e.getValue()))));
|
||||||
|
|
||||||
|
Map<ClientQuotaEntity, Map<String, Optional<Double>>> allQuotas = new HashMap<>();
|
||||||
|
allQuotas.putAll(userClientQuotas);
|
||||||
|
allQuotas.putAll(ipQuotas);
|
||||||
|
Map<ClientQuotaEntity, KafkaFuture<Void>> result = alterClientQuotas(allQuotas, false);
|
||||||
|
|
||||||
|
matchUserClientEntities.forEach((entity, value) -> {
|
||||||
|
try {
|
||||||
|
result.get(entity).get(10, TimeUnit.SECONDS);
|
||||||
|
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
matchIpEntities.forEach((entity, value) -> {
|
||||||
|
try {
|
||||||
|
result.get(entity).get(10, TimeUnit.SECONDS);
|
||||||
|
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<ClientQuotaEntity, Map<String, Double>> matchEntity(ClientQuotaEntity entity)
|
||||||
|
throws ExecutionException, InterruptedException {
|
||||||
|
List<ClientQuotaFilterComponent> components = entity.entries().entrySet().stream().map(entry -> {
|
||||||
|
if (entry.getValue() == null) {
|
||||||
|
return ClientQuotaFilterComponent.ofDefaultEntity(entry.getKey());
|
||||||
|
} else {
|
||||||
|
return ClientQuotaFilterComponent.ofEntity(entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
|
}).toList();
|
||||||
|
|
||||||
|
return describeClientQuotas(ClientQuotaFilter.containsOnly(components)).get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testDescribeClientQuotasMatchExact() throws ExecutionException, InterruptedException {
|
||||||
|
setupDescribeClientQuotasMatchTest();
|
||||||
|
|
||||||
|
// Test exact matches.
|
||||||
|
matchUserClientEntities.forEach((e, v) -> {
|
||||||
|
try {
|
||||||
|
TestUtils.retryOnExceptionWithTimeout(5000L, () -> {
|
||||||
|
Map<ClientQuotaEntity, Map<String, Double>> result = matchEntity(e);
|
||||||
|
assertEquals(1, result.size());
|
||||||
|
assertNotNull(result.get(e));
|
||||||
|
double value = result.get(e).get(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG);
|
||||||
|
assertEquals(value, v, 1e-6);
|
||||||
|
});
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
throw new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Entities not contained in `matchEntityList`.
|
||||||
|
List<ClientQuotaEntity> notMatchEntities = List.of(
|
||||||
|
toClientEntity(toUserMap("user-1"), toClientIdMap("client-id-2")),
|
||||||
|
toClientEntity(toUserMap("user-3"), toClientIdMap("client-id-1")),
|
||||||
|
toClientEntity(toUserMap("user-2"), toClientIdMap(null)),
|
||||||
|
toClientEntity(toUserMap("user-4")),
|
||||||
|
toClientEntity(toUserMap(null), toClientIdMap("client-id-2")),
|
||||||
|
toClientEntity(toClientIdMap("client-id-1")),
|
||||||
|
toClientEntity(toClientIdMap("client-id-3"))
|
||||||
|
);
|
||||||
|
|
||||||
|
// Verify exact matches of the non-matches returns empty.
|
||||||
|
for (ClientQuotaEntity e : notMatchEntities) {
|
||||||
|
Map<ClientQuotaEntity, Map<String, Double>> result = matchEntity(e);
|
||||||
|
assertEquals(0, result.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testMatchEntities(ClientQuotaFilter filter, int expectedMatchSize, Predicate<ClientQuotaEntity> partition)
|
||||||
|
throws InterruptedException {
|
||||||
|
TestUtils.retryOnExceptionWithTimeout(5000L, () -> {
|
||||||
|
Map<ClientQuotaEntity, Map<String, Double>> result = describeClientQuotas(filter).get();
|
||||||
|
List<Map.Entry<ClientQuotaEntity, Double>> expectedMatches = matchUserClientEntities.entrySet()
|
||||||
|
.stream()
|
||||||
|
.collect(Collectors.partitioningBy(entry -> partition.test(entry.getKey())))
|
||||||
|
.get(true);
|
||||||
|
expectedMatches.addAll(matchIpEntities.entrySet()
|
||||||
|
.stream()
|
||||||
|
.collect(Collectors.partitioningBy(entry -> partition.test(entry.getKey())))
|
||||||
|
.get(true));
|
||||||
|
|
||||||
|
// for test verification
|
||||||
|
assertEquals(expectedMatchSize, expectedMatches.size());
|
||||||
|
assertEquals(expectedMatchSize, result.size(),
|
||||||
|
"Failed to match " + expectedMatchSize + "entities for " + filter);
|
||||||
|
Map<Object, Object> expectedMatchesMap = Map.ofEntries(expectedMatches.toArray(new Map.Entry[0]));
|
||||||
|
matchUserClientEntities.forEach((entity, expectedValue) -> {
|
||||||
|
if (expectedMatchesMap.containsKey(entity)) {
|
||||||
|
Map<String, Double> config = result.get(entity);
|
||||||
|
assertNotNull(config);
|
||||||
|
Double value = config.get(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG);
|
||||||
|
assertNotNull(value);
|
||||||
|
assertEquals(expectedValue, value, 1e-6);
|
||||||
|
} else {
|
||||||
|
assertNull(result.get(entity));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
matchIpEntities.forEach((entity, expectedValue) -> {
|
||||||
|
if (expectedMatchesMap.containsKey(entity)) {
|
||||||
|
Map<String, Double> config = result.get(entity);
|
||||||
|
assertNotNull(config);
|
||||||
|
Double value = config.get(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG);
|
||||||
|
assertNotNull(value);
|
||||||
|
assertEquals(expectedValue, value, 1e-6);
|
||||||
|
} else {
|
||||||
|
assertNull(result.get(entity));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testDescribeClientQuotasMatchPartial() throws InterruptedException {
|
||||||
|
setupDescribeClientQuotasMatchTest();
|
||||||
|
|
||||||
|
// Match open-ended existing user.
|
||||||
|
testMatchEntities(
|
||||||
|
ClientQuotaFilter.contains(List.of(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER, "user-1"))),
|
||||||
|
3,
|
||||||
|
entity -> Objects.equals(entity.entries().get(ClientQuotaEntity.USER), "user-1")
|
||||||
|
);
|
||||||
|
|
||||||
|
// Match open-ended non-existent user.
|
||||||
|
testMatchEntities(
|
||||||
|
ClientQuotaFilter.contains(List.of(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER, "unknown"))),
|
||||||
|
0,
|
||||||
|
entity -> false
|
||||||
|
);
|
||||||
|
|
||||||
|
// Match open-ended existing client ID.
|
||||||
|
testMatchEntities(
|
||||||
|
ClientQuotaFilter.contains(List.of(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.CLIENT_ID, "client-id-2"))),
|
||||||
|
2,
|
||||||
|
entity -> Objects.equals(entity.entries().get(ClientQuotaEntity.CLIENT_ID), "client-id-2")
|
||||||
|
);
|
||||||
|
|
||||||
|
// Match open-ended default user.
|
||||||
|
testMatchEntities(
|
||||||
|
ClientQuotaFilter.contains(List.of(ClientQuotaFilterComponent.ofDefaultEntity(ClientQuotaEntity.USER))),
|
||||||
|
2,
|
||||||
|
entity -> entity.entries().containsKey(ClientQuotaEntity.USER) && entity.entries().get(ClientQuotaEntity.USER) == null
|
||||||
|
);
|
||||||
|
|
||||||
|
// Match close-ended existing user.
|
||||||
|
testMatchEntities(
|
||||||
|
ClientQuotaFilter.containsOnly(List.of(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER, "user-2"))),
|
||||||
|
1,
|
||||||
|
entity -> Objects.equals(entity.entries().get(ClientQuotaEntity.USER), "user-2") && !entity.entries().containsKey(ClientQuotaEntity.CLIENT_ID)
|
||||||
|
);
|
||||||
|
|
||||||
|
// Match close-ended existing client ID that has no matching entity.
|
||||||
|
testMatchEntities(
|
||||||
|
ClientQuotaFilter.containsOnly(List.of(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.CLIENT_ID, "client-id-1"))),
|
||||||
|
0,
|
||||||
|
entity -> false
|
||||||
|
);
|
||||||
|
|
||||||
|
// Match against all entities with the user type in a close-ended match.
|
||||||
|
testMatchEntities(
|
||||||
|
ClientQuotaFilter.containsOnly(List.of(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER))),
|
||||||
|
4,
|
||||||
|
entity -> entity.entries().containsKey(ClientQuotaEntity.USER) && !entity.entries().containsKey(ClientQuotaEntity.CLIENT_ID)
|
||||||
|
);
|
||||||
|
|
||||||
|
// Match against all entities with the user type in an open-ended match.
|
||||||
|
testMatchEntities(
|
||||||
|
ClientQuotaFilter.contains(List.of(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER))),
|
||||||
|
10,
|
||||||
|
entity -> entity.entries().containsKey(ClientQuotaEntity.USER)
|
||||||
|
);
|
||||||
|
|
||||||
|
// Match against all entities with the client ID type in a close-ended match.
|
||||||
|
testMatchEntities(
|
||||||
|
ClientQuotaFilter.containsOnly(List.of(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID))),
|
||||||
|
1,
|
||||||
|
entity -> entity.entries().containsKey(ClientQuotaEntity.CLIENT_ID) && !entity.entries().containsKey(ClientQuotaEntity.USER)
|
||||||
|
);
|
||||||
|
|
||||||
|
// Match against all entities with the client ID type in an open-ended match.
|
||||||
|
testMatchEntities(
|
||||||
|
ClientQuotaFilter.contains(List.of(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID))),
|
||||||
|
7,
|
||||||
|
entity -> entity.entries().containsKey(ClientQuotaEntity.CLIENT_ID)
|
||||||
|
);
|
||||||
|
|
||||||
|
// Match against all entities with IP type in an open-ended match.
|
||||||
|
testMatchEntities(
|
||||||
|
ClientQuotaFilter.contains(List.of(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP))),
|
||||||
|
2,
|
||||||
|
entity -> entity.entries().containsKey(ClientQuotaEntity.IP)
|
||||||
|
);
|
||||||
|
|
||||||
|
// Match open-ended empty filter List. This should match all entities.
|
||||||
|
testMatchEntities(ClientQuotaFilter.contains(List.of()), 13, entity -> true);
|
||||||
|
|
||||||
|
// Match close-ended empty filter List. This should match no entities.
|
||||||
|
testMatchEntities(ClientQuotaFilter.containsOnly(List.of()), 0, entity -> false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testClientQuotasUnsupportedEntityTypes() {
|
||||||
|
ClientQuotaEntity entity = new ClientQuotaEntity(Map.of("other", "name"));
|
||||||
|
KafkaFuture<Map<ClientQuotaEntity, Map<String, Double>>> future = describeClientQuotas(
|
||||||
|
ClientQuotaFilter.containsOnly(getComponents(entity)));
|
||||||
|
|
||||||
|
TestUtils.assertFutureThrows(UnsupportedVersionException.class, future);
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testClientQuotasSanitized() throws InterruptedException {
|
||||||
|
// An entity with name that must be sanitized when writing to Zookeeper.
|
||||||
|
ClientQuotaEntity entity = new ClientQuotaEntity(Map.of(ClientQuotaEntity.USER, "user with spaces"));
|
||||||
|
|
||||||
|
alterEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(20000.0)
|
||||||
|
), false);
|
||||||
|
|
||||||
|
verifyDescribeEntityQuotas(entity, Map.of(
|
||||||
|
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, String> toUserMap(String user) {
|
||||||
|
// Uses Collections.singletonMap instead of Map.of to support null user parameter.
|
||||||
|
return Collections.singletonMap(ClientQuotaEntity.USER, user);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, String> toClientIdMap(String clientId) {
|
||||||
|
// Uses Collections.singletonMap instead of Map.of to support null client-id parameter.
|
||||||
|
return Collections.singletonMap(ClientQuotaEntity.CLIENT_ID, clientId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SafeVarargs
|
||||||
|
private ClientQuotaEntity toClientEntity(Map<String, String>... entries) {
|
||||||
|
Map<String, String> entityMap = new HashMap<>();
|
||||||
|
for (Map<String, String> entry : entries) {
|
||||||
|
entityMap.putAll(entry);
|
||||||
|
}
|
||||||
|
return new ClientQuotaEntity(entityMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ClientQuotaEntity toIpEntity(Optional<String> ip) {
|
||||||
|
return new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.IP, ip.orElse(null)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyDescribeEntityQuotas(ClientQuotaEntity entity, Map<String, Double> quotas)
|
||||||
|
throws InterruptedException {
|
||||||
|
TestUtils.retryOnExceptionWithTimeout(5000L, () -> {
|
||||||
|
|
||||||
|
Map<ClientQuotaEntity, Map<String, Double>> describe = describeClientQuotas(
|
||||||
|
ClientQuotaFilter.containsOnly(getComponents(entity))).get();
|
||||||
|
if (quotas.isEmpty()) {
|
||||||
|
assertEquals(0, describe.size());
|
||||||
|
} else {
|
||||||
|
assertEquals(1, describe.size());
|
||||||
|
Map<String, Double> configs = describe.get(entity);
|
||||||
|
assertNotNull(configs);
|
||||||
|
assertEquals(quotas.size(), configs.size());
|
||||||
|
|
||||||
|
quotas.forEach((k, v) -> {
|
||||||
|
Double value = configs.get(k);
|
||||||
|
assertNotNull(value);
|
||||||
|
assertEquals(v, value, 1e-6);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<ClientQuotaFilterComponent> getComponents(ClientQuotaEntity entity) {
|
||||||
|
return entity.entries().entrySet().stream().map(entry -> {
|
||||||
|
String entityType = entry.getKey();
|
||||||
|
String entityName = entry.getValue();
|
||||||
|
return Optional.ofNullable(entityName)
|
||||||
|
.map(name -> ClientQuotaFilterComponent.ofEntity(entityType, name))
|
||||||
|
.orElseGet(() -> ClientQuotaFilterComponent.ofDefaultEntity(entityType));
|
||||||
|
}).toList();
|
||||||
|
}
|
||||||
|
|
||||||
|
private KafkaFuture<Map<ClientQuotaEntity, Map<String, Double>>> describeClientQuotas(ClientQuotaFilter filter) {
|
||||||
|
try (Admin admin = cluster.admin()) {
|
||||||
|
return admin.describeClientQuotas(filter).entities();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private KafkaFuture<Void> alterEntityQuotas(ClientQuotaEntity entity, Map<String, Optional<Double>> alter, boolean validateOnly) {
|
||||||
|
|
||||||
|
return alterClientQuotas(Map.of(entity, alter), validateOnly).get(entity);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<ClientQuotaEntity, KafkaFuture<Void>> alterClientQuotas(Map<ClientQuotaEntity, Map<String,
|
||||||
|
Optional<Double>>> request, boolean validateOnly) {
|
||||||
|
|
||||||
|
List<ClientQuotaAlteration> entries = request.entrySet().stream().map(entry -> {
|
||||||
|
ClientQuotaEntity entity = entry.getKey();
|
||||||
|
Map<String, Optional<Double>> alter = entry.getValue();
|
||||||
|
|
||||||
|
List<ClientQuotaAlteration.Op> ops = alter.entrySet()
|
||||||
|
.stream()
|
||||||
|
.map(configEntry -> new ClientQuotaAlteration.Op(configEntry.getKey(),
|
||||||
|
configEntry.getValue().orElse(null)))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
return new ClientQuotaAlteration(entity, ops);
|
||||||
|
}).collect(Collectors.toList());
|
||||||
|
|
||||||
|
try (Admin admin = cluster.admin()) {
|
||||||
|
Map<ClientQuotaEntity, KafkaFuture<Void>> result = admin.alterClientQuotas(entries,
|
||||||
|
new AlterClientQuotasOptions().validateOnly(validateOnly)).values();
|
||||||
|
assertEquals(request.size(), result.size());
|
||||||
|
request.forEach((e, r) -> assertTrue(result.containsKey(e)));
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue