KAFKA-2209; Change quotas dynamically using DynamicConfigManager

Changes in this patch are:
1. ClientIdConfigHandler now passes through the config changes to the quota manager.
2. Removed static KafkaConfigs for quota overrides. These are no longer needed since we can override configs through ZooKeeper.
3. Added testcases to verify that the config changes are propogated from ZK (written using AdminTools) to the actual Metric objects.

Author: Aditya Auradkar <aauradka@aauradka-mn1.(none)>
Author: Aditya Auradkar <aauradka@aauradka-mn1.linkedin.biz>

Reviewers: Dong Lin <lindong28@gmail.com>, Jun Rao <junrao@gmail.com>

Closes #298 from auradkar/K-2209
This commit is contained in:
Aditya Auradkar 2015-10-21 16:07:39 -07:00 committed by Jun Rao
parent 361686d4a9
commit d9b1dc7081
15 changed files with 216 additions and 167 deletions

View File

@ -29,11 +29,11 @@ public final class Quota {
this.upper = upper;
}
public static Quota lessThan(double upperBound) {
public static Quota upperBound(double upperBound) {
return new Quota(upperBound, true);
}
public static Quota moreThan(double lowerBound) {
public static Quota lowerBound(double lowerBound) {
return new Quota(lowerBound, false);
}

View File

@ -39,7 +39,7 @@ public class Avg extends SampledStat {
total += s.value;
count += s.eventCount;
}
return total / count;
return count == 0 ? 0 : total / count;
}
}

View File

@ -320,8 +320,8 @@ public class MetricsTest {
@Test
public void testQuotas() {
Sensor sensor = metrics.sensor("test");
sensor.add(new MetricName("test1.total", "grp1"), new Total(), new MetricConfig().quota(Quota.lessThan(5.0)));
sensor.add(new MetricName("test2.total", "grp1"), new Total(), new MetricConfig().quota(Quota.moreThan(0.0)));
sensor.add(new MetricName("test1.total", "grp1"), new Total(), new MetricConfig().quota(Quota.upperBound(5.0)));
sensor.add(new MetricName("test2.total", "grp1"), new Total(), new MetricConfig().quota(Quota.lowerBound(0.0)));
sensor.record(5.0);
try {
sensor.record(1.0);
@ -341,12 +341,12 @@ public class MetricsTest {
@Test
public void testQuotasEquality() {
final Quota quota1 = Quota.lessThan(10.5);
final Quota quota2 = Quota.moreThan(10.5);
final Quota quota1 = Quota.upperBound(10.5);
final Quota quota2 = Quota.lowerBound(10.5);
assertFalse("Quota with different upper values shouldn't be equal", quota1.equals(quota2));
final Quota quota3 = Quota.moreThan(10.5);
final Quota quota3 = Quota.lowerBound(10.5);
assertTrue("Quota with same upper and bound values should be equal", quota2.equals(quota3));
}

View File

@ -98,7 +98,7 @@ object AdminUtils extends Logging {
/**
* Add partitions to existing topic with optional replica assignment
*
* @param zkClient Zookeeper client
* @param zkUtils Zookeeper utilities
* @param topic Topic for adding partitions to
* @param numPartitions Number of partitions to be set
* @param replicaAssignmentStr Manual replica assignment
@ -177,7 +177,7 @@ object AdminUtils extends Logging {
/**
* Delete the whole directory of the given consumer group if the group is inactive.
*
* @param zkClient Zookeeper client
* @param zkUtils Zookeeper utilities
* @param group Consumer group
* @return whether or not we deleted the consumer group information
*/
@ -194,7 +194,7 @@ object AdminUtils extends Logging {
* Delete the given consumer group's information for the given topic in Zookeeper if the group is inactive.
* If the consumer group consumes no other topics, delete the whole consumer group directory.
*
* @param zkClient Zookeeper client
* @param zkUtils Zookeeper utilities
* @param group Consumer group
* @param topic Topic of the consumer group information we wish to delete
* @return whether or not we deleted the consumer group information for the given topic
@ -216,7 +216,7 @@ object AdminUtils extends Logging {
/**
* Delete every inactive consumer group's information about the given topic in Zookeeper.
*
* @param zkClient Zookeeper client
* @param zkUtils Zookeeper utilities
* @param topic Topic of the consumer group information we wish to delete
*/
def deleteAllConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, topic: String) {
@ -294,7 +294,7 @@ object AdminUtils extends Logging {
/**
* Update the config for a client and create a change notification so the change will propagate to other brokers
* @param zkClient: The ZkClient handle used to write the new config to zookeeper
* @param zkUtils Zookeeper utilities used to write the config to ZK
* @param clientId: The clientId for which configs are being changed
* @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
* existing configs need to be deleted, it should be done prior to invoking this API
@ -306,7 +306,7 @@ object AdminUtils extends Logging {
/**
* Update the config for an existing topic and create a change notification so the change will propagate to other brokers
* @param zkClient: The ZkClient handle used to write the new config to zookeeper
* @param zkUtils Zookeeper utilities used to write the config to ZK
* @param topic: The topic for which configs are being changed
* @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
* existing configs need to be deleted, it should be done prior to invoking this API
@ -379,6 +379,9 @@ object AdminUtils extends Logging {
def fetchAllTopicConfigs(zkUtils: ZkUtils): Map[String, Properties] =
zkUtils.getAllTopics().map(topic => (topic, fetchEntityConfig(zkUtils, ConfigType.Topic, topic))).toMap
def fetchAllEntityConfigs(zkUtils: ZkUtils, entityType: String): Map[String, Properties] =
zkUtils.getAllEntitiesWithConfig(entityType).map(entity => (entity, fetchEntityConfig(zkUtils, entityType, entity))).toMap
def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils): TopicMetadata =
fetchTopicMetadataFromZk(topic, zkUtils, new mutable.HashMap[Int, Broker])
@ -387,8 +390,6 @@ object AdminUtils extends Logging {
topics.map(topic => fetchTopicMetadataFromZk(topic, zkUtils, cachedBrokerInfo))
}
private def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils, cachedBrokerInfo: mutable.HashMap[Int, Broker], protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): TopicMetadata = {
if(zkUtils.pathExists(getTopicPath(topic))) {
val topicPartitionAssignment = zkUtils.getPartitionAssignmentForTopics(List(topic)).get(topic).get

View File

@ -16,7 +16,7 @@
*/
package kafka.server
import java.util.concurrent.{DelayQueue, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit}
import kafka.utils.{ShutdownableThread, Logging}
import org.apache.kafka.common.MetricName
@ -36,15 +36,12 @@ private case class ClientSensors(quotaSensor: Sensor, throttleTimeSensor: Sensor
/**
* Configuration settings for quota management
* @param quotaBytesPerSecondDefault The default bytes per second quota allocated to any client
* @param quotaBytesPerSecondOverrides The comma separated overrides per client. "c1=X,c2=Y"
* @param numQuotaSamples The number of samples to retain in memory
* @param quotaWindowSizeSeconds The time span of each sample
*
*/
case class ClientQuotaManagerConfig(quotaBytesPerSecondDefault: Long =
ClientQuotaManagerConfig.QuotaBytesPerSecondDefault,
quotaBytesPerSecondOverrides: String =
ClientQuotaManagerConfig.QuotaBytesPerSecondOverrides,
numQuotaSamples: Int =
ClientQuotaManagerConfig.DefaultNumQuotaSamples,
quotaWindowSizeSeconds: Int =
@ -52,11 +49,9 @@ case class ClientQuotaManagerConfig(quotaBytesPerSecondDefault: Long =
object ClientQuotaManagerConfig {
val QuotaBytesPerSecondDefault = Long.MaxValue
val QuotaBytesPerSecondOverrides = ""
// Always have 10 whole windows + 1 current window
val DefaultNumQuotaSamples = 11
val DefaultQuotaWindowSizeSeconds = 1
val MaxThrottleTimeSeconds = 30
// Purge sensors after 1 hour of inactivity
val InactiveSensorExpirationTimeSeconds = 3600
}
@ -73,8 +68,8 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
private val metrics: Metrics,
private val apiKey: String,
private val time: Time) extends Logging {
private val overriddenQuota = initQuotaMap(config.quotaBytesPerSecondOverrides)
private val defaultQuota = Quota.lessThan(config.quotaBytesPerSecondDefault)
private val overriddenQuota = new ConcurrentHashMap[String, Quota]()
private val defaultQuota = Quota.upperBound(config.quotaBytesPerSecondDefault)
private val lock = new ReentrantReadWriteLock()
private val delayQueue = new DelayQueue[ThrottledResponse]()
val throttledRequestReaper = new ThrottledRequestReaper(delayQueue)
@ -124,13 +119,12 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
// Compute the delay
val clientMetric = metrics.metrics().get(clientRateMetricName(clientId))
throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(quota(clientId)))
clientSensors.throttleTimeSensor.record(throttleTimeMs)
delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
delayQueueSensor.record()
// If delayed, add the element to the delayQueue
logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs))
}
// If the request is not throttled, a throttleTime of 0 ms is recorded
clientSensors.throttleTimeSensor.record(throttleTimeMs)
throttleTimeMs
}
@ -160,10 +154,10 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}
/**
* Returns the consumer quota for the specified clientId
* @return
* Returns the quota for the specified clientId
*/
private[server] def quota(clientId: String): Quota = overriddenQuota.getOrElse(clientId, defaultQuota)
def quota(clientId: String): Quota =
if (overriddenQuota.containsKey(clientId)) overriddenQuota.get(clientId) else defaultQuota;
/*
* This function either returns the sensors for a given client id or creates them if they don't exist
@ -172,8 +166,8 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
private def getOrCreateQuotaSensors(clientId: String): ClientSensors = {
// Names of the sensors to access
val quotaSensorName = apiKey + "-" + clientId
val throttleTimeSensorName = apiKey + "ThrottleTime-" + clientId
val quotaSensorName = getQuotaSensorName(clientId)
val throttleTimeSensorName = getThrottleTimeSensorName(clientId)
var quotaSensor: Sensor = null
var throttleTimeSensor: Sensor = null
@ -231,6 +225,10 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
ClientSensors(quotaSensor, throttleTimeSensor)
}
private def getThrottleTimeSensorName(clientId: String): String = apiKey + "ThrottleTime-" + clientId
private def getQuotaSensorName(clientId: String): String = apiKey + "-" + clientId
private def getQuotaMetricConfig(quota: Quota): MetricConfig = {
new MetricConfig()
.timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
@ -238,21 +236,38 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
.quota(quota)
}
/* Construct a Map of (clientId -> Quota)
* The input config is specified as a comma-separated K=V pairs
/**
* Overrides quotas per clientId
* @param clientId client to override
* @param quota custom quota to apply
*/
private def initQuotaMap(input: String): Map[String, Quota] = {
// If empty input, return an empty map
if (input.trim.length == 0)
Map[String, Quota]()
else
input.split(",").map(entry => {
val trimmedEntry = entry.trim
val pair: Array[String] = trimmedEntry.split("=")
if (pair.length != 2)
throw new IllegalArgumentException("Incorrectly formatted override entry (%s). Format is k1=v1,k2=v2".format(entry))
pair(0) -> new Quota(pair(1).toDouble, true)
}).toMap
def updateQuota(clientId: String, quota: Quota) = {
/*
* Acquire the write lock to apply changes in the quota objects.
* This method changes the quota in the overriddenQuota map and applies the update on the actual KafkaMetric object (if it exists).
* If the KafkaMetric hasn't been created, the most recent value will be used from the overriddenQuota map.
* The write lock prevents quota update and creation at the same time. It also guards against concurrent quota change
* notifications
*/
lock.writeLock().lock()
try {
logger.info(s"Changing quota for clientId $clientId to ${quota.bound()}")
if (quota.equals(defaultQuota))
this.overriddenQuota.remove(clientId)
else
this.overriddenQuota.put(clientId, quota)
// Change the underlying metric config if the sensor has been created
val allMetrics = metrics.metrics()
val quotaMetricName = clientRateMetricName(clientId)
if (allMetrics.containsKey(quotaMetricName)) {
logger.info(s"Sensor for clientId $clientId already exists. Changing quota to ${quota.bound()} in MetricConfig")
allMetrics.get(quotaMetricName).config(getQuotaMetricConfig(quota))
}
} finally {
lock.writeLock().unlock()
}
}
private def clientRateMetricName(clientId: String): MetricName = {

View File

@ -21,22 +21,24 @@ import java.util.Properties
import kafka.common.TopicAndPartition
import kafka.log.{Log, LogConfig, LogManager}
import kafka.utils.Pool
import kafka.api.RequestKeys
import org.apache.kafka.common.metrics.Quota
import scala.collection.mutable
import scala.collection.Map
/**
* The ConfigHandler is used to process config change notifications received by the DynamicConfigManager
*/
trait ConfigHandler {
def processConfigChanges(entityName : String, value : Properties)
def processConfigChanges(entityName: String, value: Properties)
}
/**
* The TopicConfigHandler will process topic config changes in ZK.
* The callback provides the topic name and the full properties set read from ZK
*/
class TopicConfigHandler(private val logManager: LogManager) extends ConfigHandler{
class TopicConfigHandler(private val logManager: LogManager) extends ConfigHandler {
def processConfigChanges(topic : String, topicConfig : Properties) {
val logs: mutable.Buffer[(TopicAndPartition, Log)] = logManager.logsByTopicPartition.toBuffer
@ -55,15 +57,27 @@ class TopicConfigHandler(private val logManager: LogManager) extends ConfigHandl
}
}
object ClientConfigOverride {
val ProducerOverride = "producer_byte_rate"
val ConsumerOverride = "consumer_byte_rate"
}
/**
* The ClientIdConfigHandler will process clientId config changes in ZK.
* The callback provides the clientId and the full properties set read from ZK.
* This implementation does nothing currently. In the future, it will change quotas per client
* This implementation reports the overrides to the respective ClientQuotaManager objects
*/
class ClientIdConfigHandler extends ConfigHandler {
val configPool = new Pool[String, Properties]()
class ClientIdConfigHandler(private val quotaManagers: Map[Short, ClientQuotaManager]) extends ConfigHandler {
def processConfigChanges(clientId : String, clientConfig : Properties): Unit = {
configPool.put(clientId, clientConfig)
def processConfigChanges(clientId: String, clientConfig: Properties) = {
if (clientConfig.containsKey(ClientConfigOverride.ProducerOverride)) {
quotaManagers(RequestKeys.ProduceKey).updateQuota(clientId,
new Quota(clientConfig.getProperty(ClientConfigOverride.ProducerOverride).toLong, true))
}
if (clientConfig.containsKey(ClientConfigOverride.ConsumerOverride)) {
quotaManagers(RequestKeys.FetchKey).updateQuota(clientId,
new Quota(clientConfig.getProperty(ClientConfigOverride.ConsumerOverride).toLong, true))
}
}
}

View File

@ -71,7 +71,7 @@ object ConfigType {
*
*/
class DynamicConfigManager(private val zkUtils: ZkUtils,
private val configHandler : Map[String, ConfigHandler],
private val configHandlers: Map[String, ConfigHandler],
private val changeExpirationMs: Long = 15*60*1000,
private val time: Time = SystemTime) extends Logging {
private var lastExecutedChange = -1L
@ -138,7 +138,9 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
case Some(value: String) => value
case _ => throw new IllegalArgumentException("Config change notification does not specify 'entity_name'. Received: " + json)
}
configHandler(entityType).processConfigChanges(entity, AdminUtils.fetchEntityConfig(zkUtils, entityType, entity))
val entityConfig = AdminUtils.fetchEntityConfig(zkUtils, entityType, entity)
logger.info(s"Processing override for entityType: $entityType, entity: $entity with config: $entityConfig")
configHandlers(entityType).processConfigChanges(entity, entityConfig)
case o => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
"{\"version\" : 1," +

View File

@ -30,7 +30,6 @@ import kafka.network._
import kafka.network.RequestChannel.{Session, Response}
import kafka.security.auth.{Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write}
import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.requests.{HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse}
@ -55,7 +54,7 @@ class KafkaApis(val requestChannel: RequestChannel,
this.logIdent = "[KafkaApi-%d] ".format(brokerId)
// Store all the quota managers for each type of request
private val quotaManagers = instantiateQuotaManagers(config)
val quotaManagers: Map[Short, ClientQuotaManager] = instantiateQuotaManagers(config)
/**
* Top-level method that handles all requests and multiplexes to the right api
@ -784,14 +783,12 @@ class KafkaApis(val requestChannel: RequestChannel,
private def instantiateQuotaManagers(cfg: KafkaConfig): Map[Short, ClientQuotaManager] = {
val producerQuotaManagerCfg = ClientQuotaManagerConfig(
quotaBytesPerSecondDefault = cfg.producerQuotaBytesPerSecondDefault,
quotaBytesPerSecondOverrides = cfg.producerQuotaBytesPerSecondOverrides,
numQuotaSamples = cfg.numQuotaSamples,
quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
)
val consumerQuotaManagerCfg = ClientQuotaManagerConfig(
quotaBytesPerSecondDefault = cfg.consumerQuotaBytesPerSecondDefault,
quotaBytesPerSecondOverrides = cfg.consumerQuotaBytesPerSecondOverrides,
numQuotaSamples = cfg.numQuotaSamples,
quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
)

View File

@ -142,8 +142,6 @@ object Defaults {
/** ********* Quota Configuration ***********/
val ProducerQuotaBytesPerSecondDefault = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
val ConsumerQuotaBytesPerSecondDefault = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
val ProducerQuotaBytesPerSecondOverrides = ClientQuotaManagerConfig.QuotaBytesPerSecondOverrides
val ConsumerQuotaBytesPerSecondOverrides = ClientQuotaManagerConfig.QuotaBytesPerSecondOverrides
val NumQuotaSamples: Int = ClientQuotaManagerConfig.DefaultNumQuotaSamples
val QuotaWindowSizeSeconds: Int = ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds
@ -294,8 +292,6 @@ object KafkaConfig {
/** ********* Quota Configuration ***********/
val ProducerQuotaBytesPerSecondDefaultProp = "quota.producer.default"
val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default"
val ProducerQuotaBytesPerSecondOverridesProp = "quota.producer.bytes.per.second.overrides"
val ConsumerQuotaBytesPerSecondOverridesProp = "quota.consumer.bytes.per.second.overrides"
val NumQuotaSamplesProp = "quota.window.num"
val QuotaWindowSizeSecondsProp = "quota.window.size.seconds"
@ -468,10 +464,6 @@ object KafkaConfig {
/** ********* Quota Configuration ***********/
val ProducerQuotaBytesPerSecondDefaultDoc = "Any producer distinguished by clientId will get throttled if it produces more bytes than this value per-second"
val ConsumerQuotaBytesPerSecondDefaultDoc = "Any consumer distinguished by clientId/consumer group will get throttled if it fetches more bytes than this value per-second"
val ProducerQuotaBytesPerSecondOverridesDoc = "Comma separated list of clientId:quotaBytesPerSecond to override the default producer quota. " +
"Example: clientIdX=10485760,clientIdY=10485760"
val ConsumerQuotaBytesPerSecondOverridesDoc = "Comma separated list of clientId:quotaBytesPerSecond to override the default consumer quota. " +
"Example: clientIdX=10485760,clientIdY=10485760"
val NumQuotaSamplesDoc = "The number of samples to retain in memory"
val QuotaWindowSizeSecondsDoc = "The time span of each sample"
@ -644,8 +636,6 @@ object KafkaConfig {
/** ********* Quota configuration ***********/
.define(ProducerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ProducerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ProducerQuotaBytesPerSecondDefaultDoc)
.define(ConsumerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ConsumerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ConsumerQuotaBytesPerSecondDefaultDoc)
.define(ProducerQuotaBytesPerSecondOverridesProp, STRING, Defaults.ProducerQuotaBytesPerSecondOverrides, HIGH, ProducerQuotaBytesPerSecondOverridesDoc)
.define(ConsumerQuotaBytesPerSecondOverridesProp, STRING, Defaults.ConsumerQuotaBytesPerSecondOverrides, HIGH, ConsumerQuotaBytesPerSecondOverridesDoc)
.define(NumQuotaSamplesProp, INT, Defaults.NumQuotaSamples, atLeast(1), LOW, NumQuotaSamplesDoc)
.define(QuotaWindowSizeSecondsProp, INT, Defaults.QuotaWindowSizeSeconds, atLeast(1), LOW, QuotaWindowSizeSecondsDoc)
@ -846,8 +836,6 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
/** ********* Quota Configuration **************/
val producerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp)
val consumerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp)
val producerQuotaBytesPerSecondOverrides = getString(KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp)
val consumerQuotaBytesPerSecondOverrides = getString(KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp)
val numQuotaSamples = getInt(KafkaConfig.NumQuotaSamplesProp)
val quotaWindowSizeSeconds = getInt(KafkaConfig.QuotaWindowSizeSecondsProp)

View File

@ -209,7 +209,15 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
/* start dynamic config manager */
dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager),
ConfigType.Client -> new ClientIdConfigHandler)
ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers))
// Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides
// TODO: Move this logic to DynamicConfigManager
AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach {
case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties)
}
// Create the config manager. start listening to notifications
dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
dynamicConfigManager.startup()

View File

@ -16,21 +16,23 @@ package kafka.api
import java.util.Properties
import junit.framework.Assert
import kafka.admin.AdminUtils
import kafka.consumer.SimpleConsumer
import kafka.integration.KafkaServerTestHarness
import kafka.server.{KafkaServer, KafkaConfig}
import kafka.server.{ClientQuotaManager, ClientConfigOverride, KafkaConfig, KafkaServer}
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer._
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.common.MetricName
import org.apache.kafka.common.metrics.KafkaMetric
import org.junit.Assert._
import org.apache.kafka.common.metrics.{Quota, KafkaMetric}
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.{After, Before, Test}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.mutable
class QuotasTest extends KafkaServerTestHarness {
@ -47,10 +49,6 @@ class QuotasTest extends KafkaServerTestHarness {
overridingProps.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "8000")
overridingProps.put(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, "2500")
// un-throttled
overridingProps.put(KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp, producerId2 + "=" + Long.MaxValue)
overridingProps.put(KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp, consumerId2 + "=" + Long.MaxValue)
override def generateConfigs() = {
FixedPortTestUtils.createBrokerConfigs(numServers,
zkConnect,
@ -110,7 +108,6 @@ class QuotasTest extends KafkaServerTestHarness {
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId2)
consumers += new KafkaConsumer(consumerProps)
replicaConsumers += new SimpleConsumer("localhost", leaderNode.boundPort(), 1000000, 64*1024, consumerId2)
}
@After
@ -132,7 +129,7 @@ class QuotasTest extends KafkaServerTestHarness {
RequestKeys.nameForKey(RequestKeys.ProduceKey),
"Tracking throttle-time per client",
"client-id", producerId1)
Assert.assertTrue("Should have been throttled", allMetrics(producerMetricName).value() > 0)
assertTrue("Should have been throttled", allMetrics(producerMetricName).value() > 0)
// Consumer should read in a bursty manner and get throttled immediately
consume(consumers.head, numRecords)
@ -143,11 +140,29 @@ class QuotasTest extends KafkaServerTestHarness {
RequestKeys.nameForKey(RequestKeys.FetchKey),
"Tracking throttle-time per client",
"client-id", consumerId1)
Assert.assertTrue("Should have been throttled", allMetrics(consumerMetricName).value() > 0)
assertTrue("Should have been throttled", allMetrics(consumerMetricName).value() > 0)
}
@Test
def testProducerConsumerOverrideUnthrottled() {
// Give effectively unlimited quota for producerId2 and consumerId2
val props = new Properties()
props.put(ClientConfigOverride.ProducerOverride, Long.MaxValue.toString)
props.put(ClientConfigOverride.ConsumerOverride, Long.MaxValue.toString)
AdminUtils.changeClientIdConfig(zkUtils, producerId2, props)
AdminUtils.changeClientIdConfig(zkUtils, consumerId2, props)
TestUtils.retry(10000) {
val quotaManagers: Map[Short, ClientQuotaManager] = leaderNode.apis.quotaManagers
val overrideProducerQuota = quotaManagers.get(RequestKeys.ProduceKey).get.quota(producerId2)
val overrideConsumerQuota = quotaManagers.get(RequestKeys.FetchKey).get.quota(consumerId2)
assertEquals(s"ClientId $producerId2 must have unlimited producer quota", Quota.upperBound(Long.MaxValue), overrideProducerQuota)
assertEquals(s"ClientId $consumerId2 must have unlimited consumer quota", Quota.upperBound(Long.MaxValue), overrideConsumerQuota)
}
val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala
val numRecords = 1000
produce(producers(1), numRecords)
@ -155,7 +170,7 @@ class QuotasTest extends KafkaServerTestHarness {
RequestKeys.nameForKey(RequestKeys.ProduceKey),
"Tracking throttle-time per client",
"client-id", producerId2)
Assert.assertEquals("Should not have been throttled", 0.0, allMetrics(producerMetricName).value())
assertEquals("Should not have been throttled", 0.0, allMetrics(producerMetricName).value(), 0.0)
// The "client" consumer does not get throttled.
consume(consumers(1), numRecords)
@ -166,7 +181,7 @@ class QuotasTest extends KafkaServerTestHarness {
RequestKeys.nameForKey(RequestKeys.FetchKey),
"Tracking throttle-time per client",
"client-id", consumerId2)
Assert.assertEquals("Should not have been throttled", 0.0, allMetrics(consumerMetricName).value())
assertEquals("Should not have been throttled", 0.0, allMetrics(consumerMetricName).value(), 0.0)
}
def produce(p: KafkaProducer[Array[Byte], Array[Byte]], count: Int): Int = {

View File

@ -17,6 +17,8 @@
package kafka.admin
import junit.framework.Assert._
import kafka.api.RequestKeys
import org.apache.kafka.common.metrics.Quota
import org.junit.Test
import java.util.Properties
import kafka.utils._
@ -28,6 +30,7 @@ import kafka.server.{ConfigType, KafkaServer, KafkaConfig}
import java.io.File
import TestUtils._
import scala.collection.{Map, immutable}
class AdminTest extends ZooKeeperTestHarness with Logging {
@ -102,7 +105,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
10 -> List(1, 2, 3),
11 -> List(1, 3, 4)
)
val leaderForPartitionMap = Map(
val leaderForPartitionMap = immutable.Map(
0 -> 0,
1 -> 1,
2 -> 2,
@ -417,4 +420,35 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
server.config.logDirs.foreach(CoreUtils.rm(_))
}
}
/**
* This test simulates a client config change in ZK whose notification has been purged.
* Basically, it asserts that notifications are bootstrapped from ZK
*/
@Test
def testBootstrapClientIdConfig() {
val clientId = "my-client"
val props = new Properties()
props.setProperty("producer_byte_rate", "1000")
props.setProperty("consumer_byte_rate", "2000")
// Write config without notification to ZK.
val configMap = Map[String, String] ("producer_byte_rate" -> "1000", "consumer_byte_rate" -> "2000")
val map = Map("version" -> 1, "config" -> configMap)
zkUtils.updatePersistentPath(ZkUtils.getEntityConfigPath(ConfigType.Client, clientId), Json.encode(map))
val configInZk: Map[String, Properties] = AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client)
assertEquals("Must have 1 overriden client config", 1, configInZk.size)
assertEquals(props, configInZk(clientId))
// Test that the existing clientId overrides are read
val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
try {
assertEquals(new Quota(1000, true), server.apis.quotaManagers(RequestKeys.ProduceKey).quota(clientId));
assertEquals(new Quota(2000, true), server.apis.quotaManagers(RequestKeys.FetchKey).quota(clientId));
} finally {
server.shutdown()
server.config.logDirs.foreach(CoreUtils.rm(_))
}
}
}

View File

@ -21,13 +21,13 @@ import java.util.Collections
import org.apache.kafka.common.MetricName
import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota}
import org.apache.kafka.common.utils.MockTime
import org.junit.{Assert, Before, Test}
import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.{Before, Test}
class ClientQuotaManagerTest {
private val time = new MockTime
private val config = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500,
quotaBytesPerSecondOverrides = "p1=2000,p2=4000")
private val config = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500)
var numCallbacks: Int = 0
def callback(delayTimeMs: Int) {
@ -42,13 +42,34 @@ class ClientQuotaManagerTest {
@Test
def testQuotaParsing() {
val clientMetrics = new ClientQuotaManager(config, newMetrics, "producer", time)
// Case 1: Update the quota. Assert that the new quota value is returned
clientMetrics.updateQuota("p1", new Quota(2000, true));
clientMetrics.updateQuota("p2", new Quota(4000, true));
try {
Assert.assertEquals("Default producer quota should be 500",
new Quota(500, true), clientMetrics.quota("random-client-id"))
Assert.assertEquals("Should return the overridden value (2000)",
new Quota(2000, true), clientMetrics.quota("p1"))
Assert.assertEquals("Should return the overridden value (4000)",
new Quota(4000, true), clientMetrics.quota("p2"))
assertEquals("Default producer quota should be 500", new Quota(500, true), clientMetrics.quota("random-client-id"))
assertEquals("Should return the overridden value (2000)", new Quota(2000, true), clientMetrics.quota("p1"))
assertEquals("Should return the overridden value (4000)", new Quota(4000, true), clientMetrics.quota("p2"))
// p1 should be throttled using the overridden quota
var throttleTimeMs = clientMetrics.recordAndMaybeThrottle("p1", 2500 * config.numQuotaSamples, this.callback)
assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0)
// Case 2: Change quota again. The quota should be updated within KafkaMetrics as well since the sensor was created.
// p1 should not longer be throttled after the quota change
clientMetrics.updateQuota("p1", new Quota(3000, true));
assertEquals("Should return the newly overridden value (3000)", new Quota(3000, true), clientMetrics.quota("p1"))
throttleTimeMs = clientMetrics.recordAndMaybeThrottle("p1", 0, this.callback)
assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, throttleTimeMs)
// Case 3: Change quota back to default. Should be throttled again
clientMetrics.updateQuota("p1", new Quota(500, true));
assertEquals("Should return the default value (500)", new Quota(500, true), clientMetrics.quota("p1"))
throttleTimeMs = clientMetrics.recordAndMaybeThrottle("p1", 0, this.callback)
assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0)
} finally {
clientMetrics.shutdown()
}
@ -67,8 +88,8 @@ class ClientQuotaManagerTest {
clientMetrics.recordAndMaybeThrottle("unknown", 400, callback)
time.sleep(1000)
}
Assert.assertEquals(10, numCallbacks)
Assert.assertEquals(0, queueSizeMetric.value().toInt)
assertEquals(10, numCallbacks)
assertEquals(0, queueSizeMetric.value().toInt)
// Create a spike.
// 400*10 + 2000 + 300 = 6300/10.5 = 600 bytes per second.
@ -77,17 +98,17 @@ class ClientQuotaManagerTest {
time.sleep(500)
val sleepTime = clientMetrics.recordAndMaybeThrottle("unknown", 2300, callback)
Assert.assertEquals("Should be throttled", 2100, sleepTime)
Assert.assertEquals(1, queueSizeMetric.value().toInt)
assertEquals("Should be throttled", 2100, sleepTime)
assertEquals(1, queueSizeMetric.value().toInt)
// After a request is delayed, the callback cannot be triggered immediately
clientMetrics.throttledRequestReaper.doWork()
Assert.assertEquals(10, numCallbacks)
assertEquals(10, numCallbacks)
time.sleep(sleepTime)
// Callback can only be triggered after the the delay time passes
clientMetrics.throttledRequestReaper.doWork()
Assert.assertEquals(0, queueSizeMetric.value().toInt)
Assert.assertEquals(11, numCallbacks)
assertEquals(0, queueSizeMetric.value().toInt)
assertEquals(11, numCallbacks)
// Could continue to see delays until the bursty sample disappears
for (i <- 0 until 10) {
@ -95,65 +116,13 @@ class ClientQuotaManagerTest {
time.sleep(1000)
}
Assert.assertEquals("Should be unthrottled since bursty sample has rolled over",
0, clientMetrics.recordAndMaybeThrottle("unknown", 0, callback))
assertEquals("Should be unthrottled since bursty sample has rolled over",
0, clientMetrics.recordAndMaybeThrottle("unknown", 0, callback))
} finally {
clientMetrics.shutdown()
}
}
@Test
def testOverrideParse() {
var testConfig = ClientQuotaManagerConfig()
var clientMetrics = new ClientQuotaManager(testConfig, newMetrics, "consumer", time)
try {
// Case 1 - Default config
Assert.assertEquals(new Quota(ClientQuotaManagerConfig.QuotaBytesPerSecondDefault, true),
clientMetrics.quota("p1"))
} finally {
clientMetrics.shutdown()
}
// Case 2 - Empty override
testConfig = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500,
quotaBytesPerSecondOverrides = "p1=2000,p2=4000,,")
clientMetrics = new ClientQuotaManager(testConfig, newMetrics, "consumer", time)
try {
Assert.assertEquals(new Quota(2000, true), clientMetrics.quota("p1"))
Assert.assertEquals(new Quota(4000, true), clientMetrics.quota("p2"))
} finally {
clientMetrics.shutdown()
}
// Case 3 - NumberFormatException for override
testConfig = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500,
quotaBytesPerSecondOverrides = "p1=2000,p2=4000,p3=p4")
try {
clientMetrics = new ClientQuotaManager(testConfig, newMetrics, "consumer", time)
Assert.fail("Should fail to parse invalid config " + testConfig.quotaBytesPerSecondOverrides)
}
catch {
// Swallow.
case nfe: NumberFormatException =>
}
// Case 4 - IllegalArgumentException for override
testConfig = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500,
quotaBytesPerSecondOverrides = "p1=2000=3000")
try {
clientMetrics = new ClientQuotaManager(testConfig, newMetrics, "producer", time)
Assert.fail("Should fail to parse invalid config " + testConfig.quotaBytesPerSecondOverrides)
}
catch {
// Swallow.
case nfe: IllegalArgumentException =>
}
}
def newMetrics: Metrics = {
new Metrics(new MetricConfig(), Collections.emptyList(), time)
}

View File

@ -18,7 +18,9 @@ package kafka.server
import java.util.Properties
import junit.framework.Assert._
import org.junit.Assert._
import kafka.api.RequestKeys
import org.apache.kafka.common.metrics.Quota
import org.easymock.{Capture, EasyMock}
import org.junit.Test
import kafka.integration.KafkaServerTestHarness
@ -27,6 +29,8 @@ import kafka.common._
import kafka.log.LogConfig
import kafka.admin.{AdminOperationException, AdminUtils}
import scala.collection.Map
class DynamicConfigChangeTest extends KafkaServerTestHarness {
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
@ -52,22 +56,26 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
}
// For now client config changes do not do anything. Simply verify that the call was made
@Test
def testClientConfigChange() {
def testClientQuotaConfigChange() {
assertTrue("Should contain a ConfigHandler for topics",
this.servers(0).dynamicConfigHandlers.contains(ConfigType.Client))
val clientId = "testClient"
val props = new Properties()
props.put("a.b", "c")
props.put("x.y", "z")
props.put(ClientConfigOverride.ProducerOverride, "1000")
props.put(ClientConfigOverride.ConsumerOverride, "2000")
AdminUtils.changeClientIdConfig(zkUtils, clientId, props)
TestUtils.retry(10000) {
val configHandler = this.servers(0).dynamicConfigHandlers(ConfigType.Client).asInstanceOf[ClientIdConfigHandler]
assertTrue("ClientId testClient must exist", configHandler.configPool.contains(clientId))
assertEquals("ClientId testClient must be the only override", 1, configHandler.configPool.size)
assertEquals("c", configHandler.configPool.get(clientId).getProperty("a.b"))
assertEquals("z", configHandler.configPool.get(clientId).getProperty("x.y"))
val quotaManagers: Map[Short, ClientQuotaManager] = servers(0).apis.quotaManagers
val overrideProducerQuota = quotaManagers.get(RequestKeys.ProduceKey).get.quota(clientId)
val overrideConsumerQuota = quotaManagers.get(RequestKeys.FetchKey).get.quota(clientId)
assertEquals(s"ClientId $clientId must have overridden producer quota of 1000",
Quota.upperBound(1000), overrideProducerQuota)
assertEquals(s"ClientId $clientId must have overridden consumer quota of 2000",
Quota.upperBound(2000), overrideConsumerQuota)
}
}

View File

@ -479,8 +479,6 @@ class KafkaConfigTest {
case KafkaConfig.OffsetCommitRequiredAcksProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2")
case KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp => // ignore string
case KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp => // ignore string
case KafkaConfig.NumQuotaSamplesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.QuotaWindowSizeSecondsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")