mirror of https://github.com/apache/kafka.git
Merge 6a0a765967
into 4a5aa37169
This commit is contained in:
commit
5353b436ba
|
@ -42,7 +42,7 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRec
|
|||
import org.apache.kafka.coordinator.transaction.ProducerIdManager
|
||||
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
|
||||
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, MetadataVersionConfigValidator}
|
||||
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, ScramPublisher}
|
||||
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
|
||||
import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
|
||||
import org.apache.kafka.server.authorizer.Authorizer
|
||||
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
|
||||
|
@ -487,7 +487,7 @@ class BrokerServer(
|
|||
dynamicConfigHandlers.toMap,
|
||||
"broker"),
|
||||
new DynamicClientQuotaPublisher(
|
||||
config,
|
||||
config.nodeId,
|
||||
sharedServer.metadataPublishingFaultHandler,
|
||||
"broker",
|
||||
clientQuotaMetadataManager,
|
||||
|
|
|
@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager
|
|||
import kafka.server.QuotaFactory.QuotaManagers
|
||||
|
||||
import scala.collection.immutable
|
||||
import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher}
|
||||
import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher}
|
||||
import kafka.utils.{CoreUtils, Logging}
|
||||
import org.apache.kafka.common.internals.Plugin
|
||||
import org.apache.kafka.common.message.ApiMessageType.ListenerType
|
||||
|
@ -38,7 +38,7 @@ import org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, Metad
|
|||
import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo}
|
||||
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
|
||||
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
|
||||
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, FeaturesPublisher, ScramPublisher}
|
||||
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, FeaturesPublisher, ScramPublisher}
|
||||
import org.apache.kafka.raft.QuorumConfig
|
||||
import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
|
||||
import org.apache.kafka.server.{ProcessRole, SimpleApiVersionManager}
|
||||
|
@ -334,7 +334,7 @@ class ControllerServer(
|
|||
// Set up the client quotas publisher. This will enable controller mutation quotas and any
|
||||
// other quotas which are applicable.
|
||||
metadataPublishers.add(new DynamicClientQuotaPublisher(
|
||||
config,
|
||||
config.nodeId,
|
||||
sharedServer.metadataPublishingFaultHandler,
|
||||
"controller",
|
||||
clientQuotaMetadataManager
|
||||
|
|
|
@ -33,7 +33,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
|
|||
import org.apache.kafka.image.loader.LoaderManifest
|
||||
import org.apache.kafka.image.publisher.MetadataPublisher
|
||||
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
|
||||
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, ScramPublisher}
|
||||
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
|
||||
import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
|
||||
import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal, ShareVersion}
|
||||
import org.apache.kafka.server.fault.FaultHandler
|
||||
|
@ -218,7 +218,7 @@ class BrokerMetadataPublisher(
|
|||
dynamicConfigPublisher.onMetadataUpdate(delta, newImage)
|
||||
|
||||
// Apply client quotas delta.
|
||||
dynamicClientQuotaPublisher.onMetadataUpdate(delta, newImage)
|
||||
dynamicClientQuotaPublisher.onMetadataUpdate(delta, newImage, manifest)
|
||||
|
||||
// Apply topic or cluster quotas delta.
|
||||
dynamicTopicClusterQuotaPublisher.onMetadataUpdate(delta, newImage)
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.kafka.common.utils.Sanitizer
|
|||
import java.net.{InetAddress, UnknownHostException}
|
||||
import java.util.Optional
|
||||
import org.apache.kafka.image.{ClientQuotaDelta, ClientQuotasDelta}
|
||||
import org.apache.kafka.metadata.publisher.ClientQuotaUpdater
|
||||
import org.apache.kafka.server.config.QuotaConfig
|
||||
import org.apache.kafka.server.quota.ClientQuotaManager
|
||||
|
||||
|
@ -51,7 +52,7 @@ case object DefaultUserDefaultClientIdEntity extends QuotaEntity
|
|||
* Process quota metadata records as they appear in the metadata log and update quota managers and cache as necessary
|
||||
*/
|
||||
class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManagers,
|
||||
private[metadata] val connectionQuotas: ConnectionQuotas) extends Logging {
|
||||
private[metadata] val connectionQuotas: ConnectionQuotas) extends ClientQuotaUpdater with Logging {
|
||||
|
||||
def update(quotasDelta: ClientQuotasDelta): Unit = {
|
||||
quotasDelta.changes().forEach { (key, value) =>
|
||||
|
|
|
@ -1,59 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.server.metadata
|
||||
|
||||
import kafka.server.KafkaConfig
|
||||
import kafka.utils.Logging
|
||||
import org.apache.kafka.image.loader.LoaderManifest
|
||||
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
|
||||
import org.apache.kafka.server.fault.FaultHandler
|
||||
|
||||
|
||||
class DynamicClientQuotaPublisher(
|
||||
conf: KafkaConfig,
|
||||
faultHandler: FaultHandler,
|
||||
nodeType: String,
|
||||
clientQuotaMetadataManager: ClientQuotaMetadataManager,
|
||||
) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
|
||||
logIdent = s"[${name()}] "
|
||||
|
||||
override def name(): String = s"DynamicClientQuotaPublisher $nodeType id=${conf.nodeId}"
|
||||
|
||||
override def onMetadataUpdate(
|
||||
delta: MetadataDelta,
|
||||
newImage: MetadataImage,
|
||||
manifest: LoaderManifest
|
||||
): Unit = {
|
||||
onMetadataUpdate(delta, newImage)
|
||||
}
|
||||
|
||||
def onMetadataUpdate(
|
||||
delta: MetadataDelta,
|
||||
newImage: MetadataImage,
|
||||
): Unit = {
|
||||
val deltaName = s"MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}"
|
||||
try {
|
||||
Option(delta.clientQuotasDelta()).foreach { clientQuotasDelta =>
|
||||
clientQuotaMetadataManager.update(clientQuotasDelta)
|
||||
}
|
||||
} catch {
|
||||
case t: Throwable => faultHandler.handleFault("Uncaught exception while " +
|
||||
s"publishing dynamic client quota changes from $deltaName", t)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -40,7 +40,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
|
|||
import org.apache.kafka.coordinator.share.ShareCoordinator
|
||||
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta, MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage, ScramImage, TopicsImage}
|
||||
import org.apache.kafka.image.loader.LogDeltaManifest
|
||||
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, ScramPublisher}
|
||||
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
|
||||
import org.apache.kafka.network.SocketServerConfigs
|
||||
import org.apache.kafka.raft.LeaderAndEpoch
|
||||
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, ShareVersion}
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.metadata.publisher;
|
||||
|
||||
import org.apache.kafka.image.ClientQuotasDelta;
|
||||
|
||||
/**
|
||||
* Interface for updating client quotas based on metadata changes.
|
||||
*/
|
||||
public interface ClientQuotaUpdater {
|
||||
/**
|
||||
* Update client quotas based on the given delta.
|
||||
*
|
||||
* @param quotasDelta The client quotas delta to apply
|
||||
*/
|
||||
void update(ClientQuotasDelta quotasDelta);
|
||||
}
|
|
@ -0,0 +1,64 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.metadata.publisher;
|
||||
|
||||
import org.apache.kafka.image.ClientQuotasDelta;
|
||||
import org.apache.kafka.image.MetadataDelta;
|
||||
import org.apache.kafka.image.MetadataImage;
|
||||
import org.apache.kafka.image.loader.LoaderManifest;
|
||||
import org.apache.kafka.image.publisher.MetadataPublisher;
|
||||
import org.apache.kafka.server.fault.FaultHandler;
|
||||
|
||||
/**
|
||||
* Publishes dynamic client quota changes to the client quota metadata manager.
|
||||
*/
|
||||
public class DynamicClientQuotaPublisher implements MetadataPublisher {
|
||||
private final int nodeId;
|
||||
private final FaultHandler faultHandler;
|
||||
private final String nodeType;
|
||||
private final ClientQuotaUpdater clientQuotaUpdater;
|
||||
|
||||
public DynamicClientQuotaPublisher(
|
||||
int nodeId,
|
||||
FaultHandler faultHandler,
|
||||
String nodeType,
|
||||
ClientQuotaUpdater clientQuotaUpdater
|
||||
) {
|
||||
this.nodeId = nodeId;
|
||||
this.faultHandler = faultHandler;
|
||||
this.nodeType = nodeType;
|
||||
this.clientQuotaUpdater = clientQuotaUpdater;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return "DynamicClientQuotaPublisher " + nodeType + " id=" + nodeId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) {
|
||||
String deltaName = "MetadataDelta up to " + newImage.highestOffsetAndEpoch().offset();
|
||||
ClientQuotasDelta clientQuotasDelta = delta.clientQuotasDelta();
|
||||
if (clientQuotasDelta != null) {
|
||||
try {
|
||||
clientQuotaUpdater.update(clientQuotasDelta);
|
||||
} catch (Throwable t) {
|
||||
faultHandler.handleFault("Uncaught exception while publishing dynamic client quota changes from " + deltaName, t);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue