mirror of https://github.com/apache/kafka.git
				
				
				
			KAFKA-18225 ClientQuotaCallback#updateClusterMetadata is unsupported by kraft (#18196)
This commit ensures that the ClientQuotaCallback#updateClusterMetadata method is executed in KRaft mode. This method is triggered whenever a topic or cluster metadata change occurs. However, in KRaft mode, the current implementation of the updateClusterMetadata API is inefficient due to the requirement of creating a full Cluster object. To address this, a follow-up issue (KAFKA-18239) has been created to explore more efficient mechanisms for providing cluster information to the ClientQuotaCallback without incurring the overhead of a full Cluster object creation. Reviewers: Mickael Maison <mickael.maison@gmail.com>, TaiJuWu <tjwu1217@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
		
							parent
							
								
									d0f4c2f844
								
							
						
					
					
						commit
						70adf746c4
					
				|  | @ -23,7 +23,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal; | |||
| import java.util.Map; | ||||
| 
 | ||||
| /** | ||||
|  * Quota callback interface for brokers that enables customization of client quota computation. | ||||
|  * Quota callback interface for brokers and controllers that enables customization of client quota computation. | ||||
|  */ | ||||
| public interface ClientQuotaCallback extends Configurable { | ||||
| 
 | ||||
|  | @ -89,8 +89,9 @@ public interface ClientQuotaCallback extends Configurable { | |||
|     boolean quotaResetRequired(ClientQuotaType quotaType); | ||||
| 
 | ||||
|     /** | ||||
|      * Metadata update callback that is invoked whenever UpdateMetadata request is received from | ||||
|      * the controller. This is useful if quota computation takes partitions into account. | ||||
|      * This callback is invoked whenever there are changes in the cluster metadata, such as  | ||||
|      * brokers being added or removed, topics being created or deleted, or partition leadership updates. | ||||
|      * This is useful if quota computation takes partitions into account. | ||||
|      * Topics that are being deleted will not be included in `cluster`. | ||||
|      * | ||||
|      * @param cluster Cluster metadata including partitions and their leaders if known | ||||
|  |  | |||
|  | @ -507,7 +507,15 @@ class BrokerServer( | |||
|           config, | ||||
|           sharedServer.metadataPublishingFaultHandler, | ||||
|           "broker", | ||||
|           clientQuotaMetadataManager), | ||||
|           clientQuotaMetadataManager, | ||||
|         ), | ||||
|         new DynamicTopicClusterQuotaPublisher( | ||||
|           clusterId, | ||||
|           config, | ||||
|           sharedServer.metadataPublishingFaultHandler, | ||||
|           "broker", | ||||
|           quotaManagers, | ||||
|         ), | ||||
|         new ScramPublisher( | ||||
|           config, | ||||
|           sharedServer.metadataPublishingFaultHandler, | ||||
|  |  | |||
|  | @ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager | |||
| import kafka.server.QuotaFactory.QuotaManagers | ||||
| 
 | ||||
| import scala.collection.immutable | ||||
| import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher} | ||||
| import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher} | ||||
| import kafka.utils.{CoreUtils, Logging} | ||||
| import org.apache.kafka.common.message.ApiMessageType.ListenerType | ||||
| import org.apache.kafka.common.network.ListenerName | ||||
|  | @ -332,7 +332,17 @@ class ControllerServer( | |||
|         config, | ||||
|         sharedServer.metadataPublishingFaultHandler, | ||||
|         "controller", | ||||
|         clientQuotaMetadataManager)) | ||||
|         clientQuotaMetadataManager | ||||
|       )) | ||||
| 
 | ||||
|       // Set up the DynamicTopicClusterQuotaPublisher. This will enable quotas for the cluster and topics. | ||||
|       metadataPublishers.add(new DynamicTopicClusterQuotaPublisher( | ||||
|         clusterId, | ||||
|         config, | ||||
|         sharedServer.metadataPublishingFaultHandler, | ||||
|         "controller", | ||||
|         quotaManagers, | ||||
|       )) | ||||
| 
 | ||||
|       // Set up the SCRAM publisher. | ||||
|       metadataPublishers.add(new ScramPublisher( | ||||
|  |  | |||
|  | @ -19,15 +19,20 @@ package kafka.server | |||
| 
 | ||||
| import kafka.server.metadata.KRaftMetadataCache | ||||
| import org.apache.kafka.admin.BrokerMetadata | ||||
| import org.apache.kafka.common.internals.Topic | ||||
| import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, DescribeClientQuotasResponseData, DescribeTopicPartitionsResponseData, DescribeUserScramCredentialsRequestData, DescribeUserScramCredentialsResponseData, MetadataResponseData} | ||||
| import org.apache.kafka.common.network.ListenerName | ||||
| import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid} | ||||
| import org.apache.kafka.metadata.{ConfigRepository, LeaderAndIsr} | ||||
| import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} | ||||
| import org.apache.kafka.image.MetadataImage | ||||
| import org.apache.kafka.metadata.{BrokerRegistration, ConfigRepository, LeaderAndIsr, PartitionRegistration} | ||||
| import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion} | ||||
| 
 | ||||
| import java.util | ||||
| import java.util.Collections | ||||
| import java.util.concurrent.ThreadLocalRandom | ||||
| import java.util.function.Supplier | ||||
| import scala.collection._ | ||||
| import scala.jdk.CollectionConverters.CollectionHasAsScala | ||||
| 
 | ||||
| trait MetadataCache extends ConfigRepository { | ||||
|   /** | ||||
|  | @ -144,4 +149,95 @@ object MetadataCache { | |||
|   ): KRaftMetadataCache = { | ||||
|     new KRaftMetadataCache(brokerId, kraftVersionSupplier) | ||||
|   } | ||||
| 
 | ||||
|   def toCluster(clusterId: String, image: MetadataImage): Cluster = { | ||||
|     val brokerToNodes = new util.HashMap[Integer, util.List[Node]] | ||||
|     image.cluster().brokers() | ||||
|       .values().stream() | ||||
|       .filter(broker => !broker.fenced()) | ||||
|       .forEach { broker => brokerToNodes.put(broker.id(), broker.nodes()) } | ||||
| 
 | ||||
|     def getNodes(id: Int): util.List[Node] = brokerToNodes.get(id) | ||||
| 
 | ||||
|     val partitionInfos = new util.ArrayList[PartitionInfo] | ||||
|     val internalTopics = new util.HashSet[String] | ||||
| 
 | ||||
|     def toArray(replicas: Array[Int]): Array[Node] = { | ||||
|       util.Arrays.stream(replicas) | ||||
|         .mapToObj(replica => getNodes(replica)) | ||||
|         .flatMap(replica => replica.stream()).toArray(size => new Array[Node](size)) | ||||
|     } | ||||
| 
 | ||||
|     val topicImages = image.topics().topicsByName().values() | ||||
|     if (topicImages != null) { | ||||
|       topicImages.forEach { topic => | ||||
|         topic.partitions().forEach { (key, value) => | ||||
|           val partitionId = key | ||||
|           val partition = value | ||||
|           val nodes = getNodes(partition.leader) | ||||
|           if (nodes != null) { | ||||
|             nodes.forEach(node => { | ||||
|               partitionInfos.add(new PartitionInfo(topic.name(), | ||||
|                 partitionId, | ||||
|                 node, | ||||
|                 toArray(partition.replicas), | ||||
|                 toArray(partition.isr), | ||||
|                 getOfflineReplicas(image, partition).stream() | ||||
|                   .map(replica => getNodes(replica)) | ||||
|                   .flatMap(replica => replica.stream()).toArray(size => new Array[Node](size)))) | ||||
|             }) | ||||
|             if (Topic.isInternal(topic.name())) { | ||||
|               internalTopics.add(topic.name()) | ||||
|             } | ||||
|           } | ||||
|         } | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|     val controllerNode = getNodes(getRandomAliveBroker(image).getOrElse(-1)) match { | ||||
|       case null => Node.noNode() | ||||
|       case nodes => nodes.get(0) | ||||
|     } | ||||
|     // Note: the constructor of Cluster does not allow us to reference unregistered nodes. | ||||
|     // So, for example, if partition foo-0 has replicas [1, 2] but broker 2 is not | ||||
|     // registered, we pass its replicas as [1, -1]. This doesn't make a lot of sense, but | ||||
|     // we are duplicating the behavior of ZkMetadataCache, for now. | ||||
|     new Cluster(clusterId, brokerToNodes.values().stream().flatMap(n => n.stream()).collect(util.stream.Collectors.toList()), | ||||
|       partitionInfos, Collections.emptySet(), internalTopics, controllerNode) | ||||
|   } | ||||
| 
 | ||||
|   private def getOfflineReplicas(image: MetadataImage, | ||||
|                                  partition: PartitionRegistration, | ||||
|                                  listenerName: ListenerName = null): util.List[Integer] = { | ||||
|     val offlineReplicas = new util.ArrayList[Integer](0) | ||||
|     for (brokerId <- partition.replicas) { | ||||
|       Option(image.cluster().broker(brokerId)) match { | ||||
|         case None => offlineReplicas.add(brokerId) | ||||
|         case Some(broker) => if (listenerName == null || isReplicaOffline(partition, listenerName, broker)) { | ||||
|           offlineReplicas.add(brokerId) | ||||
|         } | ||||
|       } | ||||
|     } | ||||
|     offlineReplicas | ||||
|   } | ||||
| 
 | ||||
|   private def isReplicaOffline(partition: PartitionRegistration, listenerName: ListenerName, broker: BrokerRegistration) = | ||||
|     broker.fenced() || !broker.listeners().containsKey(listenerName.value()) || isReplicaInOfflineDir(broker, partition) | ||||
| 
 | ||||
|   private def isReplicaInOfflineDir(broker: BrokerRegistration, partition: PartitionRegistration): Boolean = | ||||
|     !broker.hasOnlineDir(partition.directory(broker.id())) | ||||
| 
 | ||||
|   private def getRandomAliveBroker(image: MetadataImage): Option[Int] = { | ||||
|     val aliveBrokers = getAliveBrokers(image).toList | ||||
|     if (aliveBrokers.isEmpty) { | ||||
|       None | ||||
|     } else { | ||||
|       Some(aliveBrokers(ThreadLocalRandom.current().nextInt(aliveBrokers.size)).id) | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   private def getAliveBrokers(image: MetadataImage): Iterable[BrokerMetadata] = { | ||||
|     image.cluster().brokers().values().asScala.filterNot(_.fenced()). | ||||
|       map(b => new BrokerMetadata(b.id, b.rack)) | ||||
|   } | ||||
| } | ||||
|  |  | |||
|  | @ -71,6 +71,7 @@ class BrokerMetadataPublisher( | |||
|   shareCoordinator: Option[ShareCoordinator], | ||||
|   var dynamicConfigPublisher: DynamicConfigPublisher, | ||||
|   dynamicClientQuotaPublisher: DynamicClientQuotaPublisher, | ||||
|   dynamicTopicClusterQuotaPublisher: DynamicTopicClusterQuotaPublisher, | ||||
|   scramPublisher: ScramPublisher, | ||||
|   delegationTokenPublisher: DelegationTokenPublisher, | ||||
|   aclPublisher: AclPublisher, | ||||
|  | @ -199,6 +200,9 @@ class BrokerMetadataPublisher( | |||
|       // Apply client quotas delta. | ||||
|       dynamicClientQuotaPublisher.onMetadataUpdate(delta, newImage) | ||||
| 
 | ||||
|       // Apply topic or cluster quotas delta. | ||||
|       dynamicTopicClusterQuotaPublisher.onMetadataUpdate(delta, newImage) | ||||
| 
 | ||||
|       // Apply SCRAM delta. | ||||
|       scramPublisher.onMetadataUpdate(delta, newImage) | ||||
| 
 | ||||
|  |  | |||
|  | @ -0,0 +1,71 @@ | |||
| /** | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  * http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  **/ | ||||
| package kafka.server.metadata | ||||
| 
 | ||||
| import kafka.server.{KafkaConfig, MetadataCache} | ||||
| import kafka.server.QuotaFactory.QuotaManagers | ||||
| import kafka.utils.Logging | ||||
| import org.apache.kafka.image.{MetadataDelta, MetadataImage} | ||||
| import org.apache.kafka.image.loader.LoaderManifest | ||||
| import org.apache.kafka.server.fault.FaultHandler | ||||
| 
 | ||||
| /** | ||||
|  * Publishing dynamic topic or cluster changes to the client quota manager. | ||||
|  * Temporary solution since Cluster objects are immutable and costly to update for every metadata change. | ||||
|  * See KAFKA-18239 to trace the issue. | ||||
|  */ | ||||
| class DynamicTopicClusterQuotaPublisher ( | ||||
|   clusterId: String, | ||||
|   conf: KafkaConfig, | ||||
|   faultHandler: FaultHandler, | ||||
|   nodeType: String, | ||||
|   quotaManagers: QuotaManagers | ||||
| ) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher { | ||||
|   logIdent = s"[${name()}] " | ||||
| 
 | ||||
|   override def name(): String = s"DynamicTopicClusterQuotaPublisher $nodeType id=${conf.nodeId}" | ||||
| 
 | ||||
|   override def onMetadataUpdate( | ||||
|     delta: MetadataDelta, | ||||
|     newImage: MetadataImage, | ||||
|     manifest: LoaderManifest | ||||
|   ): Unit = { | ||||
|     onMetadataUpdate(delta, newImage) | ||||
|   } | ||||
| 
 | ||||
|   def onMetadataUpdate( | ||||
|     delta: MetadataDelta, | ||||
|     newImage: MetadataImage, | ||||
|   ): Unit = { | ||||
|     try { | ||||
|       quotaManagers.clientQuotaCallback().ifPresent(clientQuotaCallback => { | ||||
|         if (delta.topicsDelta() != null || delta.clusterDelta() != null) { | ||||
|           val cluster = MetadataCache.toCluster(clusterId, newImage) | ||||
|           if (clientQuotaCallback.updateClusterMetadata(cluster)) { | ||||
|             quotaManagers.fetch.updateQuotaMetricConfigs() | ||||
|             quotaManagers.produce.updateQuotaMetricConfigs() | ||||
|             quotaManagers.request.updateQuotaMetricConfigs() | ||||
|             quotaManagers.controllerMutation.updateQuotaMetricConfigs() | ||||
|           } | ||||
|         } | ||||
|       }) | ||||
|     } catch { | ||||
|       case t: Throwable => | ||||
|         val deltaName = s"MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}" | ||||
|         faultHandler.handleFault("Uncaught exception while " + | ||||
|           s"publishing dynamic topic or cluster changes from $deltaName", t) | ||||
|     } | ||||
|   } | ||||
| } | ||||
|   | ||||
|  | @ -0,0 +1,131 @@ | |||
| /* | ||||
|  * Licensed to the Apache Software Foundation (ASF) under one or more | ||||
|  * contributor license agreements. See the NOTICE file distributed with | ||||
|  * this work for additional information regarding copyright ownership. | ||||
|  * The ASF licenses this file to You under the Apache License, Version 2.0 | ||||
|  * (the "License"); you may not use this file except in compliance with | ||||
|  * the License. You may obtain a copy of the License at | ||||
|  * | ||||
|  *    http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
| package kafka.test.api; | ||||
| 
 | ||||
| import org.apache.kafka.clients.admin.Admin; | ||||
| import org.apache.kafka.clients.admin.NewTopic; | ||||
| import org.apache.kafka.common.Cluster; | ||||
| import org.apache.kafka.common.security.auth.KafkaPrincipal; | ||||
| import org.apache.kafka.common.test.ClusterInstance; | ||||
| import org.apache.kafka.common.test.TestUtils; | ||||
| import org.apache.kafka.common.test.api.ClusterConfigProperty; | ||||
| import org.apache.kafka.common.test.api.ClusterTest; | ||||
| import org.apache.kafka.common.test.api.ClusterTestDefaults; | ||||
| import org.apache.kafka.common.test.api.Type; | ||||
| import org.apache.kafka.common.test.junit.ClusterTestExtensions; | ||||
| import org.apache.kafka.server.config.QuotaConfig; | ||||
| import org.apache.kafka.server.quota.ClientQuotaCallback; | ||||
| import org.apache.kafka.server.quota.ClientQuotaEntity; | ||||
| import org.apache.kafka.server.quota.ClientQuotaType; | ||||
| 
 | ||||
| import org.junit.jupiter.api.extension.ExtendWith; | ||||
| 
 | ||||
| import java.util.List; | ||||
| import java.util.Map; | ||||
| import java.util.concurrent.ConcurrentHashMap; | ||||
| import java.util.concurrent.atomic.AtomicInteger; | ||||
| 
 | ||||
| @ClusterTestDefaults(controllers = 3,  | ||||
|     types = {Type.KRAFT}, | ||||
|     serverProperties = { | ||||
|         @ClusterConfigProperty(id = 3000, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "kafka.test.api.CustomQuotaCallbackTest$CustomQuotaCallback"), | ||||
|         @ClusterConfigProperty(id = 3001, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "kafka.test.api.CustomQuotaCallbackTest$CustomQuotaCallback"), | ||||
|         @ClusterConfigProperty(id = 3002, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "kafka.test.api.CustomQuotaCallbackTest$CustomQuotaCallback"), | ||||
|     } | ||||
| ) | ||||
| @ExtendWith(ClusterTestExtensions.class) | ||||
| public class CustomQuotaCallbackTest { | ||||
| 
 | ||||
|     private final ClusterInstance cluster; | ||||
| 
 | ||||
|     public CustomQuotaCallbackTest(ClusterInstance clusterInstance) { | ||||
|         this.cluster = clusterInstance; | ||||
|     } | ||||
| 
 | ||||
|     @ClusterTest | ||||
|     public void testCustomQuotaCallbackWithControllerServer() throws InterruptedException { | ||||
|          | ||||
|         try (Admin admin = cluster.admin(Map.of())) { | ||||
|             admin.createTopics(List.of(new NewTopic("topic", 1, (short) 1))); | ||||
|             TestUtils.waitForCondition( | ||||
|                 () -> CustomQuotaCallback.COUNTERS.size() == 3  | ||||
|                         && CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> counter.get() > 0),  | ||||
|                     "The CustomQuotaCallback not triggered in all controllers. " | ||||
|             ); | ||||
|              | ||||
|             // Reset the counters, and we expect the callback to be triggered again in all controllers | ||||
|             CustomQuotaCallback.COUNTERS.clear(); | ||||
|              | ||||
|             admin.deleteTopics(List.of("topic")); | ||||
|             TestUtils.waitForCondition( | ||||
|                 () -> CustomQuotaCallback.COUNTERS.size() == 3 | ||||
|                         && CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> counter.get() > 0),  | ||||
|                     "The CustomQuotaCallback not triggered in all controllers. " | ||||
|             ); | ||||
|              | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
|     public static class CustomQuotaCallback implements ClientQuotaCallback { | ||||
| 
 | ||||
|         public static final Map<String, AtomicInteger> COUNTERS = new ConcurrentHashMap<>(); | ||||
|         private String nodeId; | ||||
| 
 | ||||
|         @Override | ||||
|         public Map<String, String> quotaMetricTags(ClientQuotaType quotaType, KafkaPrincipal principal, String clientId) { | ||||
|             return Map.of(); | ||||
|         } | ||||
| 
 | ||||
|         @Override | ||||
|         public Double quotaLimit(ClientQuotaType quotaType, Map<String, String> metricTags) { | ||||
|             return Double.MAX_VALUE; | ||||
|         } | ||||
| 
 | ||||
|         @Override | ||||
|         public void updateQuota(ClientQuotaType quotaType, ClientQuotaEntity quotaEntity, double newValue) { | ||||
| 
 | ||||
|         } | ||||
| 
 | ||||
|         @Override | ||||
|         public void removeQuota(ClientQuotaType quotaType, ClientQuotaEntity quotaEntity) { | ||||
| 
 | ||||
|         } | ||||
| 
 | ||||
|         @Override | ||||
|         public boolean quotaResetRequired(ClientQuotaType quotaType) { | ||||
|             return true; | ||||
|         } | ||||
| 
 | ||||
|         @Override | ||||
|         public boolean updateClusterMetadata(Cluster cluster) { | ||||
|             COUNTERS.computeIfAbsent(nodeId, k -> new AtomicInteger()).incrementAndGet(); | ||||
|             return true; | ||||
|         } | ||||
| 
 | ||||
|         @Override | ||||
|         public void close() { | ||||
| 
 | ||||
|         } | ||||
| 
 | ||||
|         @Override | ||||
|         public void configure(Map<String, ?> configs) { | ||||
|             nodeId = (String) configs.get("node.id"); | ||||
|         } | ||||
| 
 | ||||
|     } | ||||
| } | ||||
|  | @ -26,11 +26,13 @@ import org.apache.kafka.common.config.SaslConfigs | |||
| import org.apache.kafka.common.config.internals.BrokerSecurityConfigs | ||||
| import org.apache.kafka.common.network.ListenerName | ||||
| import org.apache.kafka.common.security.auth._ | ||||
| import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder | ||||
| import org.apache.kafka.common.{Cluster, Reconfigurable} | ||||
| import org.apache.kafka.metadata.storage.Formatter | ||||
| import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs} | ||||
| import org.apache.kafka.server.quota._ | ||||
| import org.junit.jupiter.api.Assertions._ | ||||
| import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo} | ||||
| import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} | ||||
| import org.junit.jupiter.params.ParameterizedTest | ||||
| import org.junit.jupiter.params.provider.MethodSource | ||||
| 
 | ||||
|  | @ -38,11 +40,10 @@ import java.util.Properties | |||
| import java.util.concurrent.ConcurrentHashMap | ||||
| import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} | ||||
| import java.{lang, util} | ||||
| import scala.collection.Seq | ||||
| import scala.collection.mutable.ArrayBuffer | ||||
| import scala.jdk.CollectionConverters._ | ||||
| import scala.util.Using | ||||
| 
 | ||||
| @Disabled("KAFKA-18213") | ||||
| class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { | ||||
| 
 | ||||
|   override protected def securityProtocol = SecurityProtocol.SASL_SSL | ||||
|  | @ -70,6 +71,8 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { | |||
|     this.serverConfig.setProperty(s"${listenerName.configPrefix}${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG}", | ||||
|       classOf[GroupedUserPrincipalBuilder].getName) | ||||
|     this.serverConfig.setProperty(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, "true") | ||||
|     val superuserLoginContext = jaasAdminLoginModule(kafkaClientSaslMechanism) | ||||
|     superuserClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, superuserLoginContext) | ||||
|     super.setUp(testInfo) | ||||
| 
 | ||||
|     producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, | ||||
|  | @ -85,13 +88,23 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { | |||
|     closeSasl() | ||||
|   } | ||||
| 
 | ||||
|   override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = { | ||||
|     super.configureSecurityBeforeServersStart(testInfo) | ||||
|   override def configureSecurityAfterServersStart(): Unit = { | ||||
|     super.configureSecurityAfterServersStart() | ||||
|     createScramCredentials(createAdminClient(), JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD) | ||||
|   } | ||||
| 
 | ||||
|   override def addFormatterSettings(formatter: Formatter): Unit = { | ||||
|     formatter.setScramArguments( | ||||
|       util.List.of(s"SCRAM-SHA-256=[name=${JaasTestUtils.KAFKA_SCRAM_ADMIN},password=${JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD}]")) | ||||
|   } | ||||
| 
 | ||||
|   override def createPrivilegedAdminClient() = { | ||||
|     createAdminClient(bootstrapServers(), securityProtocol, trustStoreFile, clientSaslProperties, | ||||
|       kafkaClientSaslMechanism, JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD) | ||||
|   } | ||||
| 
 | ||||
|   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) | ||||
|   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit")) | ||||
|   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) | ||||
|   def testCustomQuotaCallback(quorum: String, groupProtocol: String): Unit = { | ||||
|     // Large quota override, should not throttle | ||||
|     var brokerId = 0 | ||||
|  | @ -141,6 +154,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { | |||
| 
 | ||||
|     // Remove quota override and test default quota applied with scaling based on partitions | ||||
|     user = addUser("group1_user2", brokerId) | ||||
|     user.removeQuotaOverrides() | ||||
|     user.waitForQuotaUpdate(defaultProduceQuota / 100, defaultConsumeQuota / 100, defaultRequestQuota) | ||||
|     user.removeThrottleMetrics() // since group was throttled before | ||||
|     user.produceConsume(expectProduceThrottle = false, expectConsumeThrottle = false) | ||||
|  | @ -178,14 +192,34 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { | |||
|     user | ||||
|   } | ||||
| 
 | ||||
|   override def deleteTopic( | ||||
|     topic: String,  | ||||
|     listenerName: ListenerName = listenerName | ||||
|   ): Unit = { | ||||
|     Using.resource(createAdminClient()) { admin => | ||||
|       TestUtils.deleteTopicWithAdmin( | ||||
|         admin = admin, | ||||
|         topic = topic, | ||||
|         brokers = aliveBrokers, | ||||
|         controllers = controllerServers) | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   private def createTopic(topic: String, numPartitions: Int, leader: Int): Unit = { | ||||
|     // TODO createTopic  | ||||
|     val assignment = (0 until numPartitions).map { i => i -> Seq(leader) }.toMap | ||||
|     TestUtils.createTopicWithAdmin( | ||||
|       createAdminClient(), | ||||
|       topic, | ||||
|       brokers, | ||||
|       controllerServers, | ||||
|       numPartitions, | ||||
|       replicaAssignment = assignment | ||||
|     ) | ||||
|   } | ||||
| 
 | ||||
|   private def createAdminClient(): Admin = { | ||||
|     val config = new util.HashMap[String, Object] | ||||
|     config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, | ||||
|       TestUtils.bootstrapServers(servers, new ListenerName("BROKER"))) | ||||
|     config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) | ||||
|     clientSecurityProps("admin-client").asInstanceOf[util.Map[Object, Object]].forEach { (key, value) => | ||||
|       config.put(key.toString, value) | ||||
|     } | ||||
|  | @ -230,11 +264,11 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { | |||
|     consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, s"$user-group") | ||||
|     consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, JaasModule.scramLoginModule(user, password).toString) | ||||
| 
 | ||||
|     GroupedUser(user, userGroup, topic, servers(leader), producerClientId, consumerClientId, | ||||
|     GroupedUser(user, userGroup, topic, brokerServers(leader), producerClientId, consumerClientId, | ||||
|       createProducer(), createConsumer(), adminClient) | ||||
|   } | ||||
| 
 | ||||
|   case class GroupedUser(user: String, userGroup: String, topic: String, leaderNode: KafkaBroker, | ||||
|   case class GroupedUser(user: String, userGroup: String, topic: String, leaderNode: BrokerServer, | ||||
|                          producerClientId: String, consumerClientId: String, | ||||
|                          override val producer: KafkaProducer[Array[Byte], Array[Byte]], | ||||
|                          override val consumer: Consumer[Array[Byte], Array[Byte]], | ||||
|  | @ -315,7 +349,7 @@ object GroupedUserPrincipalBuilder { | |||
|   } | ||||
| } | ||||
| 
 | ||||
| class GroupedUserPrincipalBuilder extends KafkaPrincipalBuilder { | ||||
| class GroupedUserPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) { | ||||
|   override def build(context: AuthenticationContext): KafkaPrincipal = { | ||||
|     val securityProtocol = context.securityProtocol | ||||
|     if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) { | ||||
|  | @ -395,7 +429,15 @@ class GroupedUserQuotaCallback extends ClientQuotaCallback with Reconfigurable w | |||
|   } | ||||
| 
 | ||||
|   override def quotaMetricTags(quotaType: ClientQuotaType, principal: KafkaPrincipal, clientId: String): util.Map[String, String] = { | ||||
|     principal match { | ||||
|     val user = principal.getName | ||||
|     val userGroup = group(user) | ||||
|     val newPrincipal = { | ||||
|       if (userGroup.isEmpty) | ||||
|         principal | ||||
|       else | ||||
|         GroupedUserPrincipal(user, userGroup) | ||||
|     } | ||||
|     newPrincipal match { | ||||
|       case groupPrincipal: GroupedUserPrincipal => | ||||
|         val userGroup = groupPrincipal.userGroup | ||||
|         val quotaLimit = quotaOrDefault(userGroup, quotaType) | ||||
|  | @ -468,5 +510,3 @@ class GroupedUserQuotaCallback extends ClientQuotaCallback with Reconfigurable w | |||
|     } | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
|  |  | |||
|  | @ -198,6 +198,7 @@ class BrokerMetadataPublisherTest { | |||
|       Some(mock(classOf[ShareCoordinator])), | ||||
|       mock(classOf[DynamicConfigPublisher]), | ||||
|       mock(classOf[DynamicClientQuotaPublisher]), | ||||
|       mock(classOf[DynamicTopicClusterQuotaPublisher]), | ||||
|       mock(classOf[ScramPublisher]), | ||||
|       mock(classOf[DelegationTokenPublisher]), | ||||
|       mock(classOf[AclPublisher]), | ||||
|  |  | |||
|  | @ -236,6 +236,10 @@ public class BrokerRegistration { | |||
|         return Optional.of(new Node(id, endpoint.host(), endpoint.port(), rack.orElse(null), fenced)); | ||||
|     } | ||||
|      | ||||
|     public List<Node> nodes() { | ||||
|         return listeners.keySet().stream().flatMap(l -> node(l).stream()).toList(); | ||||
|     } | ||||
| 
 | ||||
|     public Map<String, VersionRange> supportedFeatures() { | ||||
|         return supportedFeatures; | ||||
|     } | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue