diff --git a/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java index f557daecdaf..a9cb2bfb2af 100644 --- a/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java +++ b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal; import java.util.Map; /** - * Quota callback interface for brokers that enables customization of client quota computation. + * Quota callback interface for brokers and controllers that enables customization of client quota computation. */ public interface ClientQuotaCallback extends Configurable { @@ -89,8 +89,9 @@ public interface ClientQuotaCallback extends Configurable { boolean quotaResetRequired(ClientQuotaType quotaType); /** - * Metadata update callback that is invoked whenever UpdateMetadata request is received from - * the controller. This is useful if quota computation takes partitions into account. + * This callback is invoked whenever there are changes in the cluster metadata, such as + * brokers being added or removed, topics being created or deleted, or partition leadership updates. + * This is useful if quota computation takes partitions into account. * Topics that are being deleted will not be included in `cluster`. * * @param cluster Cluster metadata including partitions and their leaders if known diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 2f923241bbe..122a469180b 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -509,7 +509,15 @@ class BrokerServer( config, sharedServer.metadataPublishingFaultHandler, "broker", - clientQuotaMetadataManager), + clientQuotaMetadataManager, + ), + new DynamicTopicClusterQuotaPublisher( + clusterId, + config, + sharedServer.metadataPublishingFaultHandler, + "broker", + quotaManagers, + ), new ScramPublisher( config, sharedServer.metadataPublishingFaultHandler, diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 93657b8b303..65f9f9191d6 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager import kafka.server.QuotaFactory.QuotaManagers import scala.collection.immutable -import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher} +import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher} import kafka.utils.{CoreUtils, Logging} import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.network.ListenerName @@ -326,7 +326,17 @@ class ControllerServer( config, sharedServer.metadataPublishingFaultHandler, "controller", - clientQuotaMetadataManager)) + clientQuotaMetadataManager + )) + + // Set up the DynamicTopicClusterQuotaPublisher. This will enable quotas for the cluster and topics. + metadataPublishers.add(new DynamicTopicClusterQuotaPublisher( + clusterId, + config, + sharedServer.metadataPublishingFaultHandler, + "controller", + quotaManagers, + )) // Set up the SCRAM publisher. metadataPublishers.add(new ScramPublisher( diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index e29b8945932..d8fd26c2b3b 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -19,15 +19,20 @@ package kafka.server import kafka.server.metadata.KRaftMetadataCache import org.apache.kafka.admin.BrokerMetadata +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.MetadataResponseData import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid} -import org.apache.kafka.metadata.LeaderAndIsr +import org.apache.kafka.common._ +import org.apache.kafka.image.MetadataImage +import org.apache.kafka.metadata.{BrokerRegistration, LeaderAndIsr, PartitionRegistration} import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion} import java.util +import java.util.Collections +import java.util.concurrent.ThreadLocalRandom import java.util.function.Supplier import scala.collection._ +import scala.jdk.CollectionConverters.CollectionHasAsScala /** * Used to represent the controller id cached in the metadata cache of the broker. This trait is @@ -123,4 +128,95 @@ object MetadataCache { ): KRaftMetadataCache = { new KRaftMetadataCache(brokerId, kraftVersionSupplier) } + + def toCluster(clusterId: String, image: MetadataImage): Cluster = { + val brokerToNodes = new util.HashMap[Integer, util.List[Node]] + image.cluster().brokers() + .values().stream() + .filter(broker => !broker.fenced()) + .forEach { broker => brokerToNodes.put(broker.id(), broker.nodes()) } + + def getNodes(id: Int): util.List[Node] = brokerToNodes.get(id) + + val partitionInfos = new util.ArrayList[PartitionInfo] + val internalTopics = new util.HashSet[String] + + def toArray(replicas: Array[Int]): Array[Node] = { + util.Arrays.stream(replicas) + .mapToObj(replica => getNodes(replica)) + .flatMap(replica => replica.stream()).toArray(size => new Array[Node](size)) + } + + val topicImages = image.topics().topicsByName().values() + if (topicImages != null) { + topicImages.forEach { topic => + topic.partitions().forEach { (key, value) => + val partitionId = key + val partition = value + val nodes = getNodes(partition.leader) + if (nodes != null) { + nodes.forEach(node => { + partitionInfos.add(new PartitionInfo(topic.name(), + partitionId, + node, + toArray(partition.replicas), + toArray(partition.isr), + getOfflineReplicas(image, partition).stream() + .map(replica => getNodes(replica)) + .flatMap(replica => replica.stream()).toArray(size => new Array[Node](size)))) + }) + if (Topic.isInternal(topic.name())) { + internalTopics.add(topic.name()) + } + } + } + } + } + + val controllerNode = getNodes(getRandomAliveBroker(image).getOrElse(-1)) match { + case null => Node.noNode() + case nodes => nodes.get(0) + } + // Note: the constructor of Cluster does not allow us to reference unregistered nodes. + // So, for example, if partition foo-0 has replicas [1, 2] but broker 2 is not + // registered, we pass its replicas as [1, -1]. This doesn't make a lot of sense, but + // we are duplicating the behavior of ZkMetadataCache, for now. + new Cluster(clusterId, brokerToNodes.values().stream().flatMap(n => n.stream()).collect(util.stream.Collectors.toList()), + partitionInfos, Collections.emptySet(), internalTopics, controllerNode) + } + + private def getOfflineReplicas(image: MetadataImage, + partition: PartitionRegistration, + listenerName: ListenerName = null): util.List[Integer] = { + val offlineReplicas = new util.ArrayList[Integer](0) + for (brokerId <- partition.replicas) { + Option(image.cluster().broker(brokerId)) match { + case None => offlineReplicas.add(brokerId) + case Some(broker) => if (listenerName == null || isReplicaOffline(partition, listenerName, broker)) { + offlineReplicas.add(brokerId) + } + } + } + offlineReplicas + } + + private def isReplicaOffline(partition: PartitionRegistration, listenerName: ListenerName, broker: BrokerRegistration) = + broker.fenced() || !broker.listeners().containsKey(listenerName.value()) || isReplicaInOfflineDir(broker, partition) + + private def isReplicaInOfflineDir(broker: BrokerRegistration, partition: PartitionRegistration): Boolean = + !broker.hasOnlineDir(partition.directory(broker.id())) + + private def getRandomAliveBroker(image: MetadataImage): Option[Int] = { + val aliveBrokers = getAliveBrokers(image).toList + if (aliveBrokers.isEmpty) { + None + } else { + Some(aliveBrokers(ThreadLocalRandom.current().nextInt(aliveBrokers.size)).id) + } + } + + private def getAliveBrokers(image: MetadataImage): Iterable[BrokerMetadata] = { + image.cluster().brokers().values().asScala.filterNot(_.fenced()). + map(b => new BrokerMetadata(b.id, b.rack)) + } } diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index cc8d16b2e7c..08738215b21 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -71,6 +71,7 @@ class BrokerMetadataPublisher( shareCoordinator: Option[ShareCoordinator], var dynamicConfigPublisher: DynamicConfigPublisher, dynamicClientQuotaPublisher: DynamicClientQuotaPublisher, + dynamicTopicClusterQuotaPublisher: DynamicTopicClusterQuotaPublisher, scramPublisher: ScramPublisher, delegationTokenPublisher: DelegationTokenPublisher, aclPublisher: AclPublisher, @@ -199,6 +200,9 @@ class BrokerMetadataPublisher( // Apply client quotas delta. dynamicClientQuotaPublisher.onMetadataUpdate(delta, newImage) + // Apply topic or cluster quotas delta. + dynamicTopicClusterQuotaPublisher.onMetadataUpdate(delta, newImage) + // Apply SCRAM delta. scramPublisher.onMetadataUpdate(delta, newImage) diff --git a/core/src/main/scala/kafka/server/metadata/DynamicTopicClusterQuotaPublisher.scala b/core/src/main/scala/kafka/server/metadata/DynamicTopicClusterQuotaPublisher.scala new file mode 100644 index 00000000000..68788ffe3cd --- /dev/null +++ b/core/src/main/scala/kafka/server/metadata/DynamicTopicClusterQuotaPublisher.scala @@ -0,0 +1,71 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package kafka.server.metadata + +import kafka.server.{KafkaConfig, MetadataCache} +import kafka.server.QuotaFactory.QuotaManagers +import kafka.utils.Logging +import org.apache.kafka.image.{MetadataDelta, MetadataImage} +import org.apache.kafka.image.loader.LoaderManifest +import org.apache.kafka.server.fault.FaultHandler + +/** + * Publishing dynamic topic or cluster changes to the client quota manager. + * Temporary solution since Cluster objects are immutable and costly to update for every metadata change. + * See KAFKA-18239 to trace the issue. + */ +class DynamicTopicClusterQuotaPublisher ( + clusterId: String, + conf: KafkaConfig, + faultHandler: FaultHandler, + nodeType: String, + quotaManagers: QuotaManagers +) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher { + logIdent = s"[${name()}] " + + override def name(): String = s"DynamicTopicClusterQuotaPublisher $nodeType id=${conf.nodeId}" + + override def onMetadataUpdate( + delta: MetadataDelta, + newImage: MetadataImage, + manifest: LoaderManifest + ): Unit = { + onMetadataUpdate(delta, newImage) + } + + def onMetadataUpdate( + delta: MetadataDelta, + newImage: MetadataImage, + ): Unit = { + try { + quotaManagers.clientQuotaCallback().ifPresent(clientQuotaCallback => { + if (delta.topicsDelta() != null || delta.clusterDelta() != null) { + val cluster = MetadataCache.toCluster(clusterId, newImage) + if (clientQuotaCallback.updateClusterMetadata(cluster)) { + quotaManagers.fetch.updateQuotaMetricConfigs() + quotaManagers.produce.updateQuotaMetricConfigs() + quotaManagers.request.updateQuotaMetricConfigs() + quotaManagers.controllerMutation.updateQuotaMetricConfigs() + } + } + }) + } catch { + case t: Throwable => + val deltaName = s"MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}" + faultHandler.handleFault("Uncaught exception while " + + s"publishing dynamic topic or cluster changes from $deltaName", t) + } + } +} + \ No newline at end of file diff --git a/core/src/test/java/kafka/test/api/CustomQuotaCallbackTest.java b/core/src/test/java/kafka/test/api/CustomQuotaCallbackTest.java new file mode 100644 index 00000000000..a7da8ce4eb5 --- /dev/null +++ b/core/src/test/java/kafka/test/api/CustomQuotaCallbackTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.test.api; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.TestUtils; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.common.test.junit.ClusterTestExtensions; +import org.apache.kafka.server.config.QuotaConfig; +import org.apache.kafka.server.quota.ClientQuotaCallback; +import org.apache.kafka.server.quota.ClientQuotaEntity; +import org.apache.kafka.server.quota.ClientQuotaType; + +import org.junit.jupiter.api.extension.ExtendWith; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +@ClusterTestDefaults(controllers = 3, + types = {Type.KRAFT}, + serverProperties = { + @ClusterConfigProperty(id = 3000, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "kafka.test.api.CustomQuotaCallbackTest$CustomQuotaCallback"), + @ClusterConfigProperty(id = 3001, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "kafka.test.api.CustomQuotaCallbackTest$CustomQuotaCallback"), + @ClusterConfigProperty(id = 3002, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "kafka.test.api.CustomQuotaCallbackTest$CustomQuotaCallback"), + } +) +@ExtendWith(ClusterTestExtensions.class) +public class CustomQuotaCallbackTest { + + private final ClusterInstance cluster; + + public CustomQuotaCallbackTest(ClusterInstance clusterInstance) { + this.cluster = clusterInstance; + } + + @ClusterTest + public void testCustomQuotaCallbackWithControllerServer() throws InterruptedException { + + try (Admin admin = cluster.admin(Map.of())) { + admin.createTopics(List.of(new NewTopic("topic", 1, (short) 1))); + TestUtils.waitForCondition( + () -> CustomQuotaCallback.COUNTERS.size() == 3 + && CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> counter.get() > 0), + "The CustomQuotaCallback not triggered in all controllers. " + ); + + // Reset the counters, and we expect the callback to be triggered again in all controllers + CustomQuotaCallback.COUNTERS.clear(); + + admin.deleteTopics(List.of("topic")); + TestUtils.waitForCondition( + () -> CustomQuotaCallback.COUNTERS.size() == 3 + && CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> counter.get() > 0), + "The CustomQuotaCallback not triggered in all controllers. " + ); + + } + } + + + public static class CustomQuotaCallback implements ClientQuotaCallback { + + public static final Map COUNTERS = new ConcurrentHashMap<>(); + private String nodeId; + + @Override + public Map quotaMetricTags(ClientQuotaType quotaType, KafkaPrincipal principal, String clientId) { + return Map.of(); + } + + @Override + public Double quotaLimit(ClientQuotaType quotaType, Map metricTags) { + return Double.MAX_VALUE; + } + + @Override + public void updateQuota(ClientQuotaType quotaType, ClientQuotaEntity quotaEntity, double newValue) { + + } + + @Override + public void removeQuota(ClientQuotaType quotaType, ClientQuotaEntity quotaEntity) { + + } + + @Override + public boolean quotaResetRequired(ClientQuotaType quotaType) { + return true; + } + + @Override + public boolean updateClusterMetadata(Cluster cluster) { + COUNTERS.computeIfAbsent(nodeId, k -> new AtomicInteger()).incrementAndGet(); + return true; + } + + @Override + public void close() { + + } + + @Override + public void configure(Map configs) { + nodeId = (String) configs.get("node.id"); + } + + } +} diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala index 4d443a7f9b7..3d09c667371 100644 --- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala +++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala @@ -26,11 +26,13 @@ import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth._ +import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder import org.apache.kafka.common.{Cluster, Reconfigurable} +import org.apache.kafka.metadata.storage.Formatter import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs} import org.apache.kafka.server.quota._ import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo} +import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource @@ -38,11 +40,10 @@ import java.util.Properties import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.{lang, util} -import scala.collection.Seq import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ +import scala.util.Using -@Disabled("KAFKA-18213") class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { override protected def securityProtocol = SecurityProtocol.SASL_SSL @@ -70,6 +71,8 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { this.serverConfig.setProperty(s"${listenerName.configPrefix}${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG}", classOf[GroupedUserPrincipalBuilder].getName) this.serverConfig.setProperty(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, "true") + val superuserLoginContext = jaasAdminLoginModule(kafkaClientSaslMechanism) + superuserClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, superuserLoginContext) super.setUp(testInfo) producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, @@ -85,13 +88,23 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { closeSasl() } - override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = { - super.configureSecurityBeforeServersStart(testInfo) + override def configureSecurityAfterServersStart(): Unit = { + super.configureSecurityAfterServersStart() createScramCredentials(createAdminClient(), JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD) } + override def addFormatterSettings(formatter: Formatter): Unit = { + formatter.setScramArguments( + util.List.of(s"SCRAM-SHA-256=[name=${JaasTestUtils.KAFKA_SCRAM_ADMIN},password=${JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD}]")) + } + + override def createPrivilegedAdminClient() = { + createAdminClient(bootstrapServers(), securityProtocol, trustStoreFile, clientSaslProperties, + kafkaClientSaslMechanism, JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD) + } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testCustomQuotaCallback(quorum: String, groupProtocol: String): Unit = { // Large quota override, should not throttle var brokerId = 0 @@ -141,6 +154,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { // Remove quota override and test default quota applied with scaling based on partitions user = addUser("group1_user2", brokerId) + user.removeQuotaOverrides() user.waitForQuotaUpdate(defaultProduceQuota / 100, defaultConsumeQuota / 100, defaultRequestQuota) user.removeThrottleMetrics() // since group was throttled before user.produceConsume(expectProduceThrottle = false, expectConsumeThrottle = false) @@ -178,14 +192,34 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { user } + override def deleteTopic( + topic: String, + listenerName: ListenerName = listenerName + ): Unit = { + Using.resource(createAdminClient()) { admin => + TestUtils.deleteTopicWithAdmin( + admin = admin, + topic = topic, + brokers = aliveBrokers, + controllers = controllerServers) + } + } + private def createTopic(topic: String, numPartitions: Int, leader: Int): Unit = { - // TODO createTopic + val assignment = (0 until numPartitions).map { i => i -> Seq(leader) }.toMap + TestUtils.createTopicWithAdmin( + createAdminClient(), + topic, + brokers, + controllerServers, + numPartitions, + replicaAssignment = assignment + ) } private def createAdminClient(): Admin = { val config = new util.HashMap[String, Object] - config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, - TestUtils.bootstrapServers(servers, new ListenerName("BROKER"))) + config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) clientSecurityProps("admin-client").asInstanceOf[util.Map[Object, Object]].forEach { (key, value) => config.put(key.toString, value) } @@ -230,11 +264,11 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, s"$user-group") consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, JaasModule.scramLoginModule(user, password).toString) - GroupedUser(user, userGroup, topic, servers(leader), producerClientId, consumerClientId, + GroupedUser(user, userGroup, topic, brokerServers(leader), producerClientId, consumerClientId, createProducer(), createConsumer(), adminClient) } - case class GroupedUser(user: String, userGroup: String, topic: String, leaderNode: KafkaBroker, + case class GroupedUser(user: String, userGroup: String, topic: String, leaderNode: BrokerServer, producerClientId: String, consumerClientId: String, override val producer: KafkaProducer[Array[Byte], Array[Byte]], override val consumer: Consumer[Array[Byte], Array[Byte]], @@ -315,7 +349,7 @@ object GroupedUserPrincipalBuilder { } } -class GroupedUserPrincipalBuilder extends KafkaPrincipalBuilder { +class GroupedUserPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) { override def build(context: AuthenticationContext): KafkaPrincipal = { val securityProtocol = context.securityProtocol if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) { @@ -395,7 +429,15 @@ class GroupedUserQuotaCallback extends ClientQuotaCallback with Reconfigurable w } override def quotaMetricTags(quotaType: ClientQuotaType, principal: KafkaPrincipal, clientId: String): util.Map[String, String] = { - principal match { + val user = principal.getName + val userGroup = group(user) + val newPrincipal = { + if (userGroup.isEmpty) + principal + else + GroupedUserPrincipal(user, userGroup) + } + newPrincipal match { case groupPrincipal: GroupedUserPrincipal => val userGroup = groupPrincipal.userGroup val quotaLimit = quotaOrDefault(userGroup, quotaType) @@ -468,5 +510,3 @@ class GroupedUserQuotaCallback extends ClientQuotaCallback with Reconfigurable w } } } - - diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala index df383b25bf6..a166368a5aa 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala @@ -198,6 +198,7 @@ class BrokerMetadataPublisherTest { Some(mock(classOf[ShareCoordinator])), mock(classOf[DynamicConfigPublisher]), mock(classOf[DynamicClientQuotaPublisher]), + mock(classOf[DynamicTopicClusterQuotaPublisher]), mock(classOf[ScramPublisher]), mock(classOf[DelegationTokenPublisher]), mock(classOf[AclPublisher]), diff --git a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java index a7409da27a4..18d8382a633 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java @@ -235,6 +235,10 @@ public class BrokerRegistration { } return Optional.of(new Node(id, endpoint.host(), endpoint.port(), rack.orElse(null), fenced)); } + + public List nodes() { + return listeners.keySet().stream().flatMap(l -> node(l).stream()).toList(); + } public Map supportedFeatures() { return supportedFeatures;