mirror of https://github.com/apache/kafka.git
KAFKA-18225 ClientQuotaCallback#updateClusterMetadata is unsupported by kraft (#18196)
This commit ensures that the ClientQuotaCallback#updateClusterMetadata method is executed in KRaft mode. This method is triggered whenever a topic or cluster metadata change occurs. However, in KRaft mode, the current implementation of the updateClusterMetadata API is inefficient due to the requirement of creating a full Cluster object. To address this, a follow-up issue (KAFKA-18239) has been created to explore more efficient mechanisms for providing cluster information to the ClientQuotaCallback without incurring the overhead of a full Cluster object creation. Reviewers: Mickael Maison <mickael.maison@gmail.com>, TaiJuWu <tjwu1217@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
c3ca56e66b
commit
ad6db0952b
|
@ -23,7 +23,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Quota callback interface for brokers that enables customization of client quota computation.
|
* Quota callback interface for brokers and controllers that enables customization of client quota computation.
|
||||||
*/
|
*/
|
||||||
public interface ClientQuotaCallback extends Configurable {
|
public interface ClientQuotaCallback extends Configurable {
|
||||||
|
|
||||||
|
@ -89,8 +89,9 @@ public interface ClientQuotaCallback extends Configurable {
|
||||||
boolean quotaResetRequired(ClientQuotaType quotaType);
|
boolean quotaResetRequired(ClientQuotaType quotaType);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Metadata update callback that is invoked whenever UpdateMetadata request is received from
|
* This callback is invoked whenever there are changes in the cluster metadata, such as
|
||||||
* the controller. This is useful if quota computation takes partitions into account.
|
* brokers being added or removed, topics being created or deleted, or partition leadership updates.
|
||||||
|
* This is useful if quota computation takes partitions into account.
|
||||||
* Topics that are being deleted will not be included in `cluster`.
|
* Topics that are being deleted will not be included in `cluster`.
|
||||||
*
|
*
|
||||||
* @param cluster Cluster metadata including partitions and their leaders if known
|
* @param cluster Cluster metadata including partitions and their leaders if known
|
||||||
|
|
|
@ -509,7 +509,15 @@ class BrokerServer(
|
||||||
config,
|
config,
|
||||||
sharedServer.metadataPublishingFaultHandler,
|
sharedServer.metadataPublishingFaultHandler,
|
||||||
"broker",
|
"broker",
|
||||||
clientQuotaMetadataManager),
|
clientQuotaMetadataManager,
|
||||||
|
),
|
||||||
|
new DynamicTopicClusterQuotaPublisher(
|
||||||
|
clusterId,
|
||||||
|
config,
|
||||||
|
sharedServer.metadataPublishingFaultHandler,
|
||||||
|
"broker",
|
||||||
|
quotaManagers,
|
||||||
|
),
|
||||||
new ScramPublisher(
|
new ScramPublisher(
|
||||||
config,
|
config,
|
||||||
sharedServer.metadataPublishingFaultHandler,
|
sharedServer.metadataPublishingFaultHandler,
|
||||||
|
|
|
@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager
|
||||||
import kafka.server.QuotaFactory.QuotaManagers
|
import kafka.server.QuotaFactory.QuotaManagers
|
||||||
|
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher}
|
import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher}
|
||||||
import kafka.utils.{CoreUtils, Logging}
|
import kafka.utils.{CoreUtils, Logging}
|
||||||
import org.apache.kafka.common.message.ApiMessageType.ListenerType
|
import org.apache.kafka.common.message.ApiMessageType.ListenerType
|
||||||
import org.apache.kafka.common.network.ListenerName
|
import org.apache.kafka.common.network.ListenerName
|
||||||
|
@ -326,7 +326,17 @@ class ControllerServer(
|
||||||
config,
|
config,
|
||||||
sharedServer.metadataPublishingFaultHandler,
|
sharedServer.metadataPublishingFaultHandler,
|
||||||
"controller",
|
"controller",
|
||||||
clientQuotaMetadataManager))
|
clientQuotaMetadataManager
|
||||||
|
))
|
||||||
|
|
||||||
|
// Set up the DynamicTopicClusterQuotaPublisher. This will enable quotas for the cluster and topics.
|
||||||
|
metadataPublishers.add(new DynamicTopicClusterQuotaPublisher(
|
||||||
|
clusterId,
|
||||||
|
config,
|
||||||
|
sharedServer.metadataPublishingFaultHandler,
|
||||||
|
"controller",
|
||||||
|
quotaManagers,
|
||||||
|
))
|
||||||
|
|
||||||
// Set up the SCRAM publisher.
|
// Set up the SCRAM publisher.
|
||||||
metadataPublishers.add(new ScramPublisher(
|
metadataPublishers.add(new ScramPublisher(
|
||||||
|
|
|
@ -19,15 +19,20 @@ package kafka.server
|
||||||
|
|
||||||
import kafka.server.metadata.KRaftMetadataCache
|
import kafka.server.metadata.KRaftMetadataCache
|
||||||
import org.apache.kafka.admin.BrokerMetadata
|
import org.apache.kafka.admin.BrokerMetadata
|
||||||
|
import org.apache.kafka.common.internals.Topic
|
||||||
import org.apache.kafka.common.message.MetadataResponseData
|
import org.apache.kafka.common.message.MetadataResponseData
|
||||||
import org.apache.kafka.common.network.ListenerName
|
import org.apache.kafka.common.network.ListenerName
|
||||||
import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
|
import org.apache.kafka.common._
|
||||||
import org.apache.kafka.metadata.LeaderAndIsr
|
import org.apache.kafka.image.MetadataImage
|
||||||
|
import org.apache.kafka.metadata.{BrokerRegistration, LeaderAndIsr, PartitionRegistration}
|
||||||
import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion}
|
import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion}
|
||||||
|
|
||||||
import java.util
|
import java.util
|
||||||
|
import java.util.Collections
|
||||||
|
import java.util.concurrent.ThreadLocalRandom
|
||||||
import java.util.function.Supplier
|
import java.util.function.Supplier
|
||||||
import scala.collection._
|
import scala.collection._
|
||||||
|
import scala.jdk.CollectionConverters.CollectionHasAsScala
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to represent the controller id cached in the metadata cache of the broker. This trait is
|
* Used to represent the controller id cached in the metadata cache of the broker. This trait is
|
||||||
|
@ -123,4 +128,95 @@ object MetadataCache {
|
||||||
): KRaftMetadataCache = {
|
): KRaftMetadataCache = {
|
||||||
new KRaftMetadataCache(brokerId, kraftVersionSupplier)
|
new KRaftMetadataCache(brokerId, kraftVersionSupplier)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def toCluster(clusterId: String, image: MetadataImage): Cluster = {
|
||||||
|
val brokerToNodes = new util.HashMap[Integer, util.List[Node]]
|
||||||
|
image.cluster().brokers()
|
||||||
|
.values().stream()
|
||||||
|
.filter(broker => !broker.fenced())
|
||||||
|
.forEach { broker => brokerToNodes.put(broker.id(), broker.nodes()) }
|
||||||
|
|
||||||
|
def getNodes(id: Int): util.List[Node] = brokerToNodes.get(id)
|
||||||
|
|
||||||
|
val partitionInfos = new util.ArrayList[PartitionInfo]
|
||||||
|
val internalTopics = new util.HashSet[String]
|
||||||
|
|
||||||
|
def toArray(replicas: Array[Int]): Array[Node] = {
|
||||||
|
util.Arrays.stream(replicas)
|
||||||
|
.mapToObj(replica => getNodes(replica))
|
||||||
|
.flatMap(replica => replica.stream()).toArray(size => new Array[Node](size))
|
||||||
|
}
|
||||||
|
|
||||||
|
val topicImages = image.topics().topicsByName().values()
|
||||||
|
if (topicImages != null) {
|
||||||
|
topicImages.forEach { topic =>
|
||||||
|
topic.partitions().forEach { (key, value) =>
|
||||||
|
val partitionId = key
|
||||||
|
val partition = value
|
||||||
|
val nodes = getNodes(partition.leader)
|
||||||
|
if (nodes != null) {
|
||||||
|
nodes.forEach(node => {
|
||||||
|
partitionInfos.add(new PartitionInfo(topic.name(),
|
||||||
|
partitionId,
|
||||||
|
node,
|
||||||
|
toArray(partition.replicas),
|
||||||
|
toArray(partition.isr),
|
||||||
|
getOfflineReplicas(image, partition).stream()
|
||||||
|
.map(replica => getNodes(replica))
|
||||||
|
.flatMap(replica => replica.stream()).toArray(size => new Array[Node](size))))
|
||||||
|
})
|
||||||
|
if (Topic.isInternal(topic.name())) {
|
||||||
|
internalTopics.add(topic.name())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val controllerNode = getNodes(getRandomAliveBroker(image).getOrElse(-1)) match {
|
||||||
|
case null => Node.noNode()
|
||||||
|
case nodes => nodes.get(0)
|
||||||
|
}
|
||||||
|
// Note: the constructor of Cluster does not allow us to reference unregistered nodes.
|
||||||
|
// So, for example, if partition foo-0 has replicas [1, 2] but broker 2 is not
|
||||||
|
// registered, we pass its replicas as [1, -1]. This doesn't make a lot of sense, but
|
||||||
|
// we are duplicating the behavior of ZkMetadataCache, for now.
|
||||||
|
new Cluster(clusterId, brokerToNodes.values().stream().flatMap(n => n.stream()).collect(util.stream.Collectors.toList()),
|
||||||
|
partitionInfos, Collections.emptySet(), internalTopics, controllerNode)
|
||||||
|
}
|
||||||
|
|
||||||
|
private def getOfflineReplicas(image: MetadataImage,
|
||||||
|
partition: PartitionRegistration,
|
||||||
|
listenerName: ListenerName = null): util.List[Integer] = {
|
||||||
|
val offlineReplicas = new util.ArrayList[Integer](0)
|
||||||
|
for (brokerId <- partition.replicas) {
|
||||||
|
Option(image.cluster().broker(brokerId)) match {
|
||||||
|
case None => offlineReplicas.add(brokerId)
|
||||||
|
case Some(broker) => if (listenerName == null || isReplicaOffline(partition, listenerName, broker)) {
|
||||||
|
offlineReplicas.add(brokerId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
offlineReplicas
|
||||||
|
}
|
||||||
|
|
||||||
|
private def isReplicaOffline(partition: PartitionRegistration, listenerName: ListenerName, broker: BrokerRegistration) =
|
||||||
|
broker.fenced() || !broker.listeners().containsKey(listenerName.value()) || isReplicaInOfflineDir(broker, partition)
|
||||||
|
|
||||||
|
private def isReplicaInOfflineDir(broker: BrokerRegistration, partition: PartitionRegistration): Boolean =
|
||||||
|
!broker.hasOnlineDir(partition.directory(broker.id()))
|
||||||
|
|
||||||
|
private def getRandomAliveBroker(image: MetadataImage): Option[Int] = {
|
||||||
|
val aliveBrokers = getAliveBrokers(image).toList
|
||||||
|
if (aliveBrokers.isEmpty) {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(aliveBrokers(ThreadLocalRandom.current().nextInt(aliveBrokers.size)).id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private def getAliveBrokers(image: MetadataImage): Iterable[BrokerMetadata] = {
|
||||||
|
image.cluster().brokers().values().asScala.filterNot(_.fenced()).
|
||||||
|
map(b => new BrokerMetadata(b.id, b.rack))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,6 +71,7 @@ class BrokerMetadataPublisher(
|
||||||
shareCoordinator: Option[ShareCoordinator],
|
shareCoordinator: Option[ShareCoordinator],
|
||||||
var dynamicConfigPublisher: DynamicConfigPublisher,
|
var dynamicConfigPublisher: DynamicConfigPublisher,
|
||||||
dynamicClientQuotaPublisher: DynamicClientQuotaPublisher,
|
dynamicClientQuotaPublisher: DynamicClientQuotaPublisher,
|
||||||
|
dynamicTopicClusterQuotaPublisher: DynamicTopicClusterQuotaPublisher,
|
||||||
scramPublisher: ScramPublisher,
|
scramPublisher: ScramPublisher,
|
||||||
delegationTokenPublisher: DelegationTokenPublisher,
|
delegationTokenPublisher: DelegationTokenPublisher,
|
||||||
aclPublisher: AclPublisher,
|
aclPublisher: AclPublisher,
|
||||||
|
@ -199,6 +200,9 @@ class BrokerMetadataPublisher(
|
||||||
// Apply client quotas delta.
|
// Apply client quotas delta.
|
||||||
dynamicClientQuotaPublisher.onMetadataUpdate(delta, newImage)
|
dynamicClientQuotaPublisher.onMetadataUpdate(delta, newImage)
|
||||||
|
|
||||||
|
// Apply topic or cluster quotas delta.
|
||||||
|
dynamicTopicClusterQuotaPublisher.onMetadataUpdate(delta, newImage)
|
||||||
|
|
||||||
// Apply SCRAM delta.
|
// Apply SCRAM delta.
|
||||||
scramPublisher.onMetadataUpdate(delta, newImage)
|
scramPublisher.onMetadataUpdate(delta, newImage)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,71 @@
|
||||||
|
/**
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
**/
|
||||||
|
package kafka.server.metadata
|
||||||
|
|
||||||
|
import kafka.server.{KafkaConfig, MetadataCache}
|
||||||
|
import kafka.server.QuotaFactory.QuotaManagers
|
||||||
|
import kafka.utils.Logging
|
||||||
|
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
|
||||||
|
import org.apache.kafka.image.loader.LoaderManifest
|
||||||
|
import org.apache.kafka.server.fault.FaultHandler
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Publishing dynamic topic or cluster changes to the client quota manager.
|
||||||
|
* Temporary solution since Cluster objects are immutable and costly to update for every metadata change.
|
||||||
|
* See KAFKA-18239 to trace the issue.
|
||||||
|
*/
|
||||||
|
class DynamicTopicClusterQuotaPublisher (
|
||||||
|
clusterId: String,
|
||||||
|
conf: KafkaConfig,
|
||||||
|
faultHandler: FaultHandler,
|
||||||
|
nodeType: String,
|
||||||
|
quotaManagers: QuotaManagers
|
||||||
|
) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
|
||||||
|
logIdent = s"[${name()}] "
|
||||||
|
|
||||||
|
override def name(): String = s"DynamicTopicClusterQuotaPublisher $nodeType id=${conf.nodeId}"
|
||||||
|
|
||||||
|
override def onMetadataUpdate(
|
||||||
|
delta: MetadataDelta,
|
||||||
|
newImage: MetadataImage,
|
||||||
|
manifest: LoaderManifest
|
||||||
|
): Unit = {
|
||||||
|
onMetadataUpdate(delta, newImage)
|
||||||
|
}
|
||||||
|
|
||||||
|
def onMetadataUpdate(
|
||||||
|
delta: MetadataDelta,
|
||||||
|
newImage: MetadataImage,
|
||||||
|
): Unit = {
|
||||||
|
try {
|
||||||
|
quotaManagers.clientQuotaCallback().ifPresent(clientQuotaCallback => {
|
||||||
|
if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
|
||||||
|
val cluster = MetadataCache.toCluster(clusterId, newImage)
|
||||||
|
if (clientQuotaCallback.updateClusterMetadata(cluster)) {
|
||||||
|
quotaManagers.fetch.updateQuotaMetricConfigs()
|
||||||
|
quotaManagers.produce.updateQuotaMetricConfigs()
|
||||||
|
quotaManagers.request.updateQuotaMetricConfigs()
|
||||||
|
quotaManagers.controllerMutation.updateQuotaMetricConfigs()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
} catch {
|
||||||
|
case t: Throwable =>
|
||||||
|
val deltaName = s"MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}"
|
||||||
|
faultHandler.handleFault("Uncaught exception while " +
|
||||||
|
s"publishing dynamic topic or cluster changes from $deltaName", t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,131 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package kafka.test.api;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.admin.Admin;
|
||||||
|
import org.apache.kafka.clients.admin.NewTopic;
|
||||||
|
import org.apache.kafka.common.Cluster;
|
||||||
|
import org.apache.kafka.common.security.auth.KafkaPrincipal;
|
||||||
|
import org.apache.kafka.common.test.ClusterInstance;
|
||||||
|
import org.apache.kafka.common.test.TestUtils;
|
||||||
|
import org.apache.kafka.common.test.api.ClusterConfigProperty;
|
||||||
|
import org.apache.kafka.common.test.api.ClusterTest;
|
||||||
|
import org.apache.kafka.common.test.api.ClusterTestDefaults;
|
||||||
|
import org.apache.kafka.common.test.api.Type;
|
||||||
|
import org.apache.kafka.common.test.junit.ClusterTestExtensions;
|
||||||
|
import org.apache.kafka.server.config.QuotaConfig;
|
||||||
|
import org.apache.kafka.server.quota.ClientQuotaCallback;
|
||||||
|
import org.apache.kafka.server.quota.ClientQuotaEntity;
|
||||||
|
import org.apache.kafka.server.quota.ClientQuotaType;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
@ClusterTestDefaults(controllers = 3,
|
||||||
|
types = {Type.KRAFT},
|
||||||
|
serverProperties = {
|
||||||
|
@ClusterConfigProperty(id = 3000, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "kafka.test.api.CustomQuotaCallbackTest$CustomQuotaCallback"),
|
||||||
|
@ClusterConfigProperty(id = 3001, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "kafka.test.api.CustomQuotaCallbackTest$CustomQuotaCallback"),
|
||||||
|
@ClusterConfigProperty(id = 3002, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "kafka.test.api.CustomQuotaCallbackTest$CustomQuotaCallback"),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
@ExtendWith(ClusterTestExtensions.class)
|
||||||
|
public class CustomQuotaCallbackTest {
|
||||||
|
|
||||||
|
private final ClusterInstance cluster;
|
||||||
|
|
||||||
|
public CustomQuotaCallbackTest(ClusterInstance clusterInstance) {
|
||||||
|
this.cluster = clusterInstance;
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testCustomQuotaCallbackWithControllerServer() throws InterruptedException {
|
||||||
|
|
||||||
|
try (Admin admin = cluster.admin(Map.of())) {
|
||||||
|
admin.createTopics(List.of(new NewTopic("topic", 1, (short) 1)));
|
||||||
|
TestUtils.waitForCondition(
|
||||||
|
() -> CustomQuotaCallback.COUNTERS.size() == 3
|
||||||
|
&& CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> counter.get() > 0),
|
||||||
|
"The CustomQuotaCallback not triggered in all controllers. "
|
||||||
|
);
|
||||||
|
|
||||||
|
// Reset the counters, and we expect the callback to be triggered again in all controllers
|
||||||
|
CustomQuotaCallback.COUNTERS.clear();
|
||||||
|
|
||||||
|
admin.deleteTopics(List.of("topic"));
|
||||||
|
TestUtils.waitForCondition(
|
||||||
|
() -> CustomQuotaCallback.COUNTERS.size() == 3
|
||||||
|
&& CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> counter.get() > 0),
|
||||||
|
"The CustomQuotaCallback not triggered in all controllers. "
|
||||||
|
);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static class CustomQuotaCallback implements ClientQuotaCallback {
|
||||||
|
|
||||||
|
public static final Map<String, AtomicInteger> COUNTERS = new ConcurrentHashMap<>();
|
||||||
|
private String nodeId;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, String> quotaMetricTags(ClientQuotaType quotaType, KafkaPrincipal principal, String clientId) {
|
||||||
|
return Map.of();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Double quotaLimit(ClientQuotaType quotaType, Map<String, String> metricTags) {
|
||||||
|
return Double.MAX_VALUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateQuota(ClientQuotaType quotaType, ClientQuotaEntity quotaEntity, double newValue) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeQuota(ClientQuotaType quotaType, ClientQuotaEntity quotaEntity) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean quotaResetRequired(ClientQuotaType quotaType) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean updateClusterMetadata(Cluster cluster) {
|
||||||
|
COUNTERS.computeIfAbsent(nodeId, k -> new AtomicInteger()).incrementAndGet();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(Map<String, ?> configs) {
|
||||||
|
nodeId = (String) configs.get("node.id");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -26,11 +26,13 @@ import org.apache.kafka.common.config.SaslConfigs
|
||||||
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
|
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
|
||||||
import org.apache.kafka.common.network.ListenerName
|
import org.apache.kafka.common.network.ListenerName
|
||||||
import org.apache.kafka.common.security.auth._
|
import org.apache.kafka.common.security.auth._
|
||||||
|
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
|
||||||
import org.apache.kafka.common.{Cluster, Reconfigurable}
|
import org.apache.kafka.common.{Cluster, Reconfigurable}
|
||||||
|
import org.apache.kafka.metadata.storage.Formatter
|
||||||
import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs}
|
import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs}
|
||||||
import org.apache.kafka.server.quota._
|
import org.apache.kafka.server.quota._
|
||||||
import org.junit.jupiter.api.Assertions._
|
import org.junit.jupiter.api.Assertions._
|
||||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
|
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
|
||||||
import org.junit.jupiter.params.ParameterizedTest
|
import org.junit.jupiter.params.ParameterizedTest
|
||||||
import org.junit.jupiter.params.provider.MethodSource
|
import org.junit.jupiter.params.provider.MethodSource
|
||||||
|
|
||||||
|
@ -38,11 +40,10 @@ import java.util.Properties
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
|
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
|
||||||
import java.{lang, util}
|
import java.{lang, util}
|
||||||
import scala.collection.Seq
|
|
||||||
import scala.collection.mutable.ArrayBuffer
|
import scala.collection.mutable.ArrayBuffer
|
||||||
import scala.jdk.CollectionConverters._
|
import scala.jdk.CollectionConverters._
|
||||||
|
import scala.util.Using
|
||||||
|
|
||||||
@Disabled("KAFKA-18213")
|
|
||||||
class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
|
class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
|
||||||
|
|
||||||
override protected def securityProtocol = SecurityProtocol.SASL_SSL
|
override protected def securityProtocol = SecurityProtocol.SASL_SSL
|
||||||
|
@ -70,6 +71,8 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
|
||||||
this.serverConfig.setProperty(s"${listenerName.configPrefix}${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG}",
|
this.serverConfig.setProperty(s"${listenerName.configPrefix}${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG}",
|
||||||
classOf[GroupedUserPrincipalBuilder].getName)
|
classOf[GroupedUserPrincipalBuilder].getName)
|
||||||
this.serverConfig.setProperty(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, "true")
|
this.serverConfig.setProperty(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, "true")
|
||||||
|
val superuserLoginContext = jaasAdminLoginModule(kafkaClientSaslMechanism)
|
||||||
|
superuserClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, superuserLoginContext)
|
||||||
super.setUp(testInfo)
|
super.setUp(testInfo)
|
||||||
|
|
||||||
producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG,
|
producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG,
|
||||||
|
@ -85,13 +88,23 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
|
||||||
closeSasl()
|
closeSasl()
|
||||||
}
|
}
|
||||||
|
|
||||||
override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = {
|
override def configureSecurityAfterServersStart(): Unit = {
|
||||||
super.configureSecurityBeforeServersStart(testInfo)
|
super.configureSecurityAfterServersStart()
|
||||||
createScramCredentials(createAdminClient(), JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD)
|
createScramCredentials(createAdminClient(), JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override def addFormatterSettings(formatter: Formatter): Unit = {
|
||||||
|
formatter.setScramArguments(
|
||||||
|
util.List.of(s"SCRAM-SHA-256=[name=${JaasTestUtils.KAFKA_SCRAM_ADMIN},password=${JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD}]"))
|
||||||
|
}
|
||||||
|
|
||||||
|
override def createPrivilegedAdminClient() = {
|
||||||
|
createAdminClient(bootstrapServers(), securityProtocol, trustStoreFile, clientSaslProperties,
|
||||||
|
kafkaClientSaslMechanism, JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD)
|
||||||
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
|
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
|
||||||
def testCustomQuotaCallback(quorum: String, groupProtocol: String): Unit = {
|
def testCustomQuotaCallback(quorum: String, groupProtocol: String): Unit = {
|
||||||
// Large quota override, should not throttle
|
// Large quota override, should not throttle
|
||||||
var brokerId = 0
|
var brokerId = 0
|
||||||
|
@ -141,6 +154,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
|
||||||
|
|
||||||
// Remove quota override and test default quota applied with scaling based on partitions
|
// Remove quota override and test default quota applied with scaling based on partitions
|
||||||
user = addUser("group1_user2", brokerId)
|
user = addUser("group1_user2", brokerId)
|
||||||
|
user.removeQuotaOverrides()
|
||||||
user.waitForQuotaUpdate(defaultProduceQuota / 100, defaultConsumeQuota / 100, defaultRequestQuota)
|
user.waitForQuotaUpdate(defaultProduceQuota / 100, defaultConsumeQuota / 100, defaultRequestQuota)
|
||||||
user.removeThrottleMetrics() // since group was throttled before
|
user.removeThrottleMetrics() // since group was throttled before
|
||||||
user.produceConsume(expectProduceThrottle = false, expectConsumeThrottle = false)
|
user.produceConsume(expectProduceThrottle = false, expectConsumeThrottle = false)
|
||||||
|
@ -178,14 +192,34 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
|
||||||
user
|
user
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override def deleteTopic(
|
||||||
|
topic: String,
|
||||||
|
listenerName: ListenerName = listenerName
|
||||||
|
): Unit = {
|
||||||
|
Using.resource(createAdminClient()) { admin =>
|
||||||
|
TestUtils.deleteTopicWithAdmin(
|
||||||
|
admin = admin,
|
||||||
|
topic = topic,
|
||||||
|
brokers = aliveBrokers,
|
||||||
|
controllers = controllerServers)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private def createTopic(topic: String, numPartitions: Int, leader: Int): Unit = {
|
private def createTopic(topic: String, numPartitions: Int, leader: Int): Unit = {
|
||||||
// TODO createTopic
|
val assignment = (0 until numPartitions).map { i => i -> Seq(leader) }.toMap
|
||||||
|
TestUtils.createTopicWithAdmin(
|
||||||
|
createAdminClient(),
|
||||||
|
topic,
|
||||||
|
brokers,
|
||||||
|
controllerServers,
|
||||||
|
numPartitions,
|
||||||
|
replicaAssignment = assignment
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def createAdminClient(): Admin = {
|
private def createAdminClient(): Admin = {
|
||||||
val config = new util.HashMap[String, Object]
|
val config = new util.HashMap[String, Object]
|
||||||
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
|
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
|
||||||
TestUtils.bootstrapServers(servers, new ListenerName("BROKER")))
|
|
||||||
clientSecurityProps("admin-client").asInstanceOf[util.Map[Object, Object]].forEach { (key, value) =>
|
clientSecurityProps("admin-client").asInstanceOf[util.Map[Object, Object]].forEach { (key, value) =>
|
||||||
config.put(key.toString, value)
|
config.put(key.toString, value)
|
||||||
}
|
}
|
||||||
|
@ -230,11 +264,11 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
|
||||||
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, s"$user-group")
|
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, s"$user-group")
|
||||||
consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, JaasModule.scramLoginModule(user, password).toString)
|
consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, JaasModule.scramLoginModule(user, password).toString)
|
||||||
|
|
||||||
GroupedUser(user, userGroup, topic, servers(leader), producerClientId, consumerClientId,
|
GroupedUser(user, userGroup, topic, brokerServers(leader), producerClientId, consumerClientId,
|
||||||
createProducer(), createConsumer(), adminClient)
|
createProducer(), createConsumer(), adminClient)
|
||||||
}
|
}
|
||||||
|
|
||||||
case class GroupedUser(user: String, userGroup: String, topic: String, leaderNode: KafkaBroker,
|
case class GroupedUser(user: String, userGroup: String, topic: String, leaderNode: BrokerServer,
|
||||||
producerClientId: String, consumerClientId: String,
|
producerClientId: String, consumerClientId: String,
|
||||||
override val producer: KafkaProducer[Array[Byte], Array[Byte]],
|
override val producer: KafkaProducer[Array[Byte], Array[Byte]],
|
||||||
override val consumer: Consumer[Array[Byte], Array[Byte]],
|
override val consumer: Consumer[Array[Byte], Array[Byte]],
|
||||||
|
@ -315,7 +349,7 @@ object GroupedUserPrincipalBuilder {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class GroupedUserPrincipalBuilder extends KafkaPrincipalBuilder {
|
class GroupedUserPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
|
||||||
override def build(context: AuthenticationContext): KafkaPrincipal = {
|
override def build(context: AuthenticationContext): KafkaPrincipal = {
|
||||||
val securityProtocol = context.securityProtocol
|
val securityProtocol = context.securityProtocol
|
||||||
if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {
|
if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {
|
||||||
|
@ -395,7 +429,15 @@ class GroupedUserQuotaCallback extends ClientQuotaCallback with Reconfigurable w
|
||||||
}
|
}
|
||||||
|
|
||||||
override def quotaMetricTags(quotaType: ClientQuotaType, principal: KafkaPrincipal, clientId: String): util.Map[String, String] = {
|
override def quotaMetricTags(quotaType: ClientQuotaType, principal: KafkaPrincipal, clientId: String): util.Map[String, String] = {
|
||||||
principal match {
|
val user = principal.getName
|
||||||
|
val userGroup = group(user)
|
||||||
|
val newPrincipal = {
|
||||||
|
if (userGroup.isEmpty)
|
||||||
|
principal
|
||||||
|
else
|
||||||
|
GroupedUserPrincipal(user, userGroup)
|
||||||
|
}
|
||||||
|
newPrincipal match {
|
||||||
case groupPrincipal: GroupedUserPrincipal =>
|
case groupPrincipal: GroupedUserPrincipal =>
|
||||||
val userGroup = groupPrincipal.userGroup
|
val userGroup = groupPrincipal.userGroup
|
||||||
val quotaLimit = quotaOrDefault(userGroup, quotaType)
|
val quotaLimit = quotaOrDefault(userGroup, quotaType)
|
||||||
|
@ -468,5 +510,3 @@ class GroupedUserQuotaCallback extends ClientQuotaCallback with Reconfigurable w
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -198,6 +198,7 @@ class BrokerMetadataPublisherTest {
|
||||||
Some(mock(classOf[ShareCoordinator])),
|
Some(mock(classOf[ShareCoordinator])),
|
||||||
mock(classOf[DynamicConfigPublisher]),
|
mock(classOf[DynamicConfigPublisher]),
|
||||||
mock(classOf[DynamicClientQuotaPublisher]),
|
mock(classOf[DynamicClientQuotaPublisher]),
|
||||||
|
mock(classOf[DynamicTopicClusterQuotaPublisher]),
|
||||||
mock(classOf[ScramPublisher]),
|
mock(classOf[ScramPublisher]),
|
||||||
mock(classOf[DelegationTokenPublisher]),
|
mock(classOf[DelegationTokenPublisher]),
|
||||||
mock(classOf[AclPublisher]),
|
mock(classOf[AclPublisher]),
|
||||||
|
|
|
@ -236,6 +236,10 @@ public class BrokerRegistration {
|
||||||
return Optional.of(new Node(id, endpoint.host(), endpoint.port(), rack.orElse(null), fenced));
|
return Optional.of(new Node(id, endpoint.host(), endpoint.port(), rack.orElse(null), fenced));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public List<Node> nodes() {
|
||||||
|
return listeners.keySet().stream().flatMap(l -> node(l).stream()).toList();
|
||||||
|
}
|
||||||
|
|
||||||
public Map<String, VersionRange> supportedFeatures() {
|
public Map<String, VersionRange> supportedFeatures() {
|
||||||
return supportedFeatures;
|
return supportedFeatures;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue