mirror of https://github.com/apache/kafka.git
KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III) (#9114)
Based on the discussion in #9072, I have put together an alternative way. This one does the following: Instead of changing the implementation of the Rate to behave like a Token Bucket, it actually use two different metrics: the regular Rate and a new Token Bucket. The latter is used to enforce the quota. The Token Bucket algorithm uses the rate of the quota as the refill rate for the credits and compute the burst based on the number of samples and their length (# samples * sample length * quota). The Token Bucket algorithm used can go under zero in order to handle unlimited burst (e.g. create topic with a number of partitions higher than the burst). Throttling kicks in when the number of credits is under zero. The throttle time is computed as credits under zero / refill rate (or quota). Only the controller mutation uses it for now. The remaining number of credits in the bucket is exposed with the tokens metrics per user/clientId. Reviewers: Anna Povzner <anna@confluent.io>, Jun Rao <junrao@gmail.com>
This commit is contained in:
parent
b351493543
commit
b5f90daf13
|
|
@ -18,6 +18,7 @@ package org.apache.kafka.common.metrics;
|
|||
|
||||
import org.apache.kafka.common.MetricName;
|
||||
import org.apache.kafka.common.metrics.CompoundStat.NamedMeasurable;
|
||||
import org.apache.kafka.common.metrics.stats.TokenBucket;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
|
@ -43,7 +44,7 @@ public final class Sensor {
|
|||
private final Metrics registry;
|
||||
private final String name;
|
||||
private final Sensor[] parents;
|
||||
private final List<Stat> stats;
|
||||
private final List<StatAndConfig> stats;
|
||||
private final Map<MetricName, KafkaMetric> metrics;
|
||||
private final MetricConfig config;
|
||||
private final Time time;
|
||||
|
|
@ -51,6 +52,16 @@ public final class Sensor {
|
|||
private final long inactiveSensorExpirationTimeMs;
|
||||
private final Object metricLock;
|
||||
|
||||
private static class StatAndConfig {
|
||||
public final Stat stat;
|
||||
public final MetricConfig config;
|
||||
|
||||
StatAndConfig(Stat stat, MetricConfig config) {
|
||||
this.stat = stat;
|
||||
this.config = config;
|
||||
}
|
||||
}
|
||||
|
||||
public enum RecordingLevel {
|
||||
INFO(0, "INFO"), DEBUG(1, "DEBUG");
|
||||
|
||||
|
|
@ -199,8 +210,9 @@ public final class Sensor {
|
|||
synchronized (this) {
|
||||
synchronized (metricLock()) {
|
||||
// increment all the stats
|
||||
for (Stat stat : this.stats)
|
||||
stat.record(config, value, timeMs);
|
||||
for (StatAndConfig statAndConfig : this.stats) {
|
||||
statAndConfig.stat.record(statAndConfig.config, value, timeMs);
|
||||
}
|
||||
}
|
||||
if (checkQuotas)
|
||||
checkQuotas(timeMs);
|
||||
|
|
@ -223,8 +235,14 @@ public final class Sensor {
|
|||
Quota quota = config.quota();
|
||||
if (quota != null) {
|
||||
double value = metric.measurableValue(timeMs);
|
||||
if (!quota.acceptable(value)) {
|
||||
throw new QuotaViolationException(metric, value, quota.bound());
|
||||
if (metric.measurable() instanceof TokenBucket) {
|
||||
if (value < 0) {
|
||||
throw new QuotaViolationException(metric, value, quota.bound());
|
||||
}
|
||||
} else {
|
||||
if (!quota.acceptable(value)) {
|
||||
throw new QuotaViolationException(metric, value, quota.bound());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -251,10 +269,11 @@ public final class Sensor {
|
|||
if (hasExpired())
|
||||
return false;
|
||||
|
||||
this.stats.add(Objects.requireNonNull(stat));
|
||||
final MetricConfig statConfig = config == null ? this.config : config;
|
||||
stats.add(new StatAndConfig(Objects.requireNonNull(stat), statConfig));
|
||||
Object lock = metricLock();
|
||||
for (NamedMeasurable m : stat.stats()) {
|
||||
final KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), config == null ? this.config : config, time);
|
||||
final KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), statConfig, time);
|
||||
if (!metrics.containsKey(metric.metricName())) {
|
||||
registry.registerMetric(metric);
|
||||
metrics.put(metric.metricName(), metric);
|
||||
|
|
@ -287,16 +306,17 @@ public final class Sensor {
|
|||
} else if (metrics.containsKey(metricName)) {
|
||||
return true;
|
||||
} else {
|
||||
final MetricConfig statConfig = config == null ? this.config : config;
|
||||
final KafkaMetric metric = new KafkaMetric(
|
||||
metricLock(),
|
||||
Objects.requireNonNull(metricName),
|
||||
Objects.requireNonNull(stat),
|
||||
config == null ? this.config : config,
|
||||
statConfig,
|
||||
time
|
||||
);
|
||||
registry.registerMetric(metric);
|
||||
metrics.put(metric.metricName(), metric);
|
||||
stats.add(stat);
|
||||
stats.add(new StatAndConfig(Objects.requireNonNull(stat), statConfig));
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.common.metrics.internals;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class MetricsUtils {
|
||||
/**
|
||||
* Converts the provided time from milliseconds to the requested
|
||||
* time unit.
|
||||
*/
|
||||
public static double convert(long timeMs, TimeUnit unit) {
|
||||
switch (unit) {
|
||||
case NANOSECONDS:
|
||||
return timeMs * 1000.0 * 1000.0;
|
||||
case MICROSECONDS:
|
||||
return timeMs * 1000.0;
|
||||
case MILLISECONDS:
|
||||
return timeMs;
|
||||
case SECONDS:
|
||||
return timeMs / 1000.0;
|
||||
case MINUTES:
|
||||
return timeMs / (60.0 * 1000.0);
|
||||
case HOURS:
|
||||
return timeMs / (60.0 * 60.0 * 1000.0);
|
||||
case DAYS:
|
||||
return timeMs / (24.0 * 60.0 * 60.0 * 1000.0);
|
||||
default:
|
||||
throw new IllegalStateException("Unknown unit: " + unit);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import org.apache.kafka.common.metrics.MeasurableStat;
|
||||
import org.apache.kafka.common.metrics.MetricConfig;
|
||||
|
||||
import static org.apache.kafka.common.metrics.internals.MetricsUtils.convert;
|
||||
|
||||
/**
|
||||
* The rate of the given quantity. By default this is the total observed over a set of samples from a sampled statistic
|
||||
|
|
@ -63,7 +64,7 @@ public class Rate implements MeasurableStat {
|
|||
@Override
|
||||
public double measure(MetricConfig config, long now) {
|
||||
double value = stat.measure(config, now);
|
||||
return value / convert(windowSize(config, now));
|
||||
return value / convert(windowSize(config, now), unit);
|
||||
}
|
||||
|
||||
public long windowSize(MetricConfig config, long now) {
|
||||
|
|
@ -93,27 +94,6 @@ public class Rate implements MeasurableStat {
|
|||
return totalElapsedTimeMs;
|
||||
}
|
||||
|
||||
private double convert(long timeMs) {
|
||||
switch (unit) {
|
||||
case NANOSECONDS:
|
||||
return timeMs * 1000.0 * 1000.0;
|
||||
case MICROSECONDS:
|
||||
return timeMs * 1000.0;
|
||||
case MILLISECONDS:
|
||||
return timeMs;
|
||||
case SECONDS:
|
||||
return timeMs / 1000.0;
|
||||
case MINUTES:
|
||||
return timeMs / (60.0 * 1000.0);
|
||||
case HOURS:
|
||||
return timeMs / (60.0 * 60.0 * 1000.0);
|
||||
case DAYS:
|
||||
return timeMs / (24.0 * 60.0 * 60.0 * 1000.0);
|
||||
default:
|
||||
throw new IllegalStateException("Unknown unit: " + unit);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated since 2.4 Use {@link WindowedSum} instead.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -0,0 +1,101 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.common.metrics.stats;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.kafka.common.metrics.MeasurableStat;
|
||||
import org.apache.kafka.common.metrics.MetricConfig;
|
||||
import org.apache.kafka.common.metrics.Quota;
|
||||
|
||||
import static org.apache.kafka.common.metrics.internals.MetricsUtils.convert;
|
||||
|
||||
/**
|
||||
* The {@link TokenBucket} is a {@link MeasurableStat} implementing a token bucket algorithm
|
||||
* that is usable within a {@link org.apache.kafka.common.metrics.Sensor}.
|
||||
*
|
||||
* The {@link Quota#bound()} defined the refill rate of the bucket while the maximum burst or
|
||||
* the maximum number of credits of the bucket is defined by
|
||||
* {@link MetricConfig#samples() * MetricConfig#timeWindowMs() * Quota#bound()}.
|
||||
*
|
||||
* The quota is considered as exhausted when the amount of remaining credits in the bucket
|
||||
* is below zero. The enforcement is done by the {@link org.apache.kafka.common.metrics.Sensor}.
|
||||
*
|
||||
* Token Bucket vs Rate based Quota:
|
||||
* The current sampled rate based quota does not cope well with bursty workloads. The issue is
|
||||
* that a unique and large sample can hold the average above the quota until it is discarded.
|
||||
* Practically, when this happens, one must wait until the sample is expired to bring the rate
|
||||
* below the quota even though less time would be theoretically required. As an examples, let's
|
||||
* imagine that we have:
|
||||
* - Quota (Q) = 5
|
||||
* - Samples (S) = 100
|
||||
* - Window (W) = 1s
|
||||
* A burst of 560 brings the average rate (R) to 5.6 (560 / 100). The expected throttle time is
|
||||
* computed as follow: ((R - Q / Q * S * W)) = ((5.6 - 5) / 5 * 100 * 1) = 12 secs. In practice,
|
||||
* the average rate won't go below the quota before the burst is dropped from the samples so one
|
||||
* must wait 100s (S * W).
|
||||
*
|
||||
* The token bucket relies on continuously updated amount of credits. Therefore, it does not
|
||||
* suffers from the above issue. The same example would work as follow:
|
||||
* - Quota (Q) = 5
|
||||
* - Burst (B) = 5 * 1 * 100 = 500 (Q * S * W)
|
||||
* A burst of 560 brings the amount of credits to -60. One must wait 12s (-(-60)/5) to refill the
|
||||
* bucket to zero.
|
||||
*/
|
||||
public class TokenBucket implements MeasurableStat {
|
||||
private final TimeUnit unit;
|
||||
private double tokens;
|
||||
private long lastUpdateMs;
|
||||
|
||||
public TokenBucket() {
|
||||
this(TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public TokenBucket(TimeUnit unit) {
|
||||
this.unit = unit;
|
||||
this.tokens = 0;
|
||||
this.lastUpdateMs = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public double measure(final MetricConfig config, final long timeMs) {
|
||||
if (config.quota() == null)
|
||||
return Long.MAX_VALUE;
|
||||
final double quota = config.quota().bound();
|
||||
final double burst = burst(config);
|
||||
refill(quota, burst, timeMs);
|
||||
return this.tokens;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void record(final MetricConfig config, final double value, final long timeMs) {
|
||||
if (config.quota() == null)
|
||||
return;
|
||||
final double quota = config.quota().bound();
|
||||
final double burst = burst(config);
|
||||
refill(quota, burst, timeMs);
|
||||
this.tokens = Math.min(burst, this.tokens - value);
|
||||
}
|
||||
|
||||
private void refill(final double quota, final double burst, final long timeMs) {
|
||||
this.tokens = Math.min(burst, this.tokens + quota * convert(timeMs - lastUpdateMs, unit));
|
||||
this.lastUpdateMs = timeMs;
|
||||
}
|
||||
|
||||
private double burst(final MetricConfig config) {
|
||||
return config.samples() * convert(config.timeWindowMs(), unit) * config.quota().bound();
|
||||
}
|
||||
}
|
||||
|
|
@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.stats.Avg;
|
|||
import org.apache.kafka.common.metrics.stats.CumulativeCount;
|
||||
import org.apache.kafka.common.metrics.stats.Meter;
|
||||
import org.apache.kafka.common.metrics.stats.Rate;
|
||||
import org.apache.kafka.common.metrics.stats.TokenBucket;
|
||||
import org.apache.kafka.common.metrics.stats.WindowedSum;
|
||||
import org.apache.kafka.common.utils.MockTime;
|
||||
import org.apache.kafka.common.utils.SystemTime;
|
||||
|
|
@ -39,12 +40,14 @@ import java.util.concurrent.ExecutorService;
|
|||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
|
@ -209,4 +212,95 @@ public class SensorTest {
|
|||
|
||||
assertThat(sensor.hasMetrics(), is(true));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStrictQuotaEnforcementWithRate() {
|
||||
final Time time = new MockTime(0, System.currentTimeMillis(), 0);
|
||||
final Metrics metrics = new Metrics(time);
|
||||
final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
|
||||
.quota(Quota.upperBound(2))
|
||||
.timeWindow(1, TimeUnit.SECONDS)
|
||||
.samples(11));
|
||||
final MetricName metricName = metrics.metricName("rate", "test-group");
|
||||
assertTrue(sensor.add(metricName, new Rate()));
|
||||
final KafkaMetric rateMetric = metrics.metric(metricName);
|
||||
|
||||
// Recording a first value at T+0 to bring the avg rate to 3 which is already
|
||||
// above the quota.
|
||||
strictRecord(sensor, 30, time.milliseconds());
|
||||
assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
|
||||
|
||||
// Theoretically, we should wait 5s to bring back the avg rate to the define quota:
|
||||
// ((30 / 10) - 2) / 2 * 10 = 5s
|
||||
time.sleep(5000);
|
||||
|
||||
// But, recording a second value is rejected because the avg rate is still equal
|
||||
// to 3 after 5s.
|
||||
assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
|
||||
assertThrows(QuotaViolationException.class, () -> strictRecord(sensor, 30, time.milliseconds()));
|
||||
|
||||
metrics.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStrictQuotaEnforcementWithTokenBucket() {
|
||||
final Time time = new MockTime(0, System.currentTimeMillis(), 0);
|
||||
final Metrics metrics = new Metrics(time);
|
||||
final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
|
||||
.quota(Quota.upperBound(2))
|
||||
.timeWindow(1, TimeUnit.SECONDS)
|
||||
.samples(10));
|
||||
final MetricName metricName = metrics.metricName("credits", "test-group");
|
||||
assertTrue(sensor.add(metricName, new TokenBucket()));
|
||||
final KafkaMetric tkMetric = metrics.metric(metricName);
|
||||
|
||||
// Recording a first value at T+0 to bring the remaining credits below zero
|
||||
strictRecord(sensor, 30, time.milliseconds());
|
||||
assertEquals(-10, tkMetric.measurableValue(time.milliseconds()), 0.1);
|
||||
|
||||
// Theoretically, we should wait 5s to bring back the avg rate to the define quota:
|
||||
// 10 / 2 = 5s
|
||||
time.sleep(5000);
|
||||
|
||||
// Unlike the default rate based on a windowed sum, it works as expected.
|
||||
assertEquals(0, tkMetric.measurableValue(time.milliseconds()), 0.1);
|
||||
strictRecord(sensor, 30, time.milliseconds());
|
||||
assertEquals(-30, tkMetric.measurableValue(time.milliseconds()), 0.1);
|
||||
|
||||
metrics.close();
|
||||
}
|
||||
|
||||
private void strictRecord(Sensor sensor, double value, long timeMs) {
|
||||
synchronized (sensor) {
|
||||
sensor.checkQuotas(timeMs);
|
||||
sensor.record(value, timeMs, false);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecordAndCheckQuotaUseMetricConfigOfEachStat() {
|
||||
final Time time = new MockTime(0, System.currentTimeMillis(), 0);
|
||||
final Metrics metrics = new Metrics(time);
|
||||
final Sensor sensor = metrics.sensor("sensor");
|
||||
|
||||
final MeasurableStat stat1 = Mockito.mock(MeasurableStat.class);
|
||||
final MetricName stat1Name = metrics.metricName("stat1", "test-group");
|
||||
final MetricConfig stat1Config = new MetricConfig().quota(Quota.upperBound(5));
|
||||
sensor.add(stat1Name, stat1, stat1Config);
|
||||
|
||||
final MeasurableStat stat2 = Mockito.mock(MeasurableStat.class);
|
||||
final MetricName stat2Name = metrics.metricName("stat2", "test-group");
|
||||
final MetricConfig stat2Config = new MetricConfig().quota(Quota.upperBound(10));
|
||||
sensor.add(stat2Name, stat2, stat2Config);
|
||||
|
||||
sensor.record(10, 1);
|
||||
Mockito.verify(stat1).record(stat1Config, 10, 1);
|
||||
Mockito.verify(stat2).record(stat2Config, 10, 1);
|
||||
|
||||
Mockito.when(stat1.measure(stat1Config, 2)).thenReturn(2.0);
|
||||
Mockito.when(stat2.measure(stat2Config, 2)).thenReturn(2.0);
|
||||
sensor.checkQuotas(2);
|
||||
|
||||
metrics.close();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,93 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.common.metrics;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.kafka.common.metrics.stats.TokenBucket;
|
||||
import org.apache.kafka.common.utils.MockTime;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TokenBucketTest {
|
||||
Time time;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
time = new MockTime(0, System.currentTimeMillis(), System.nanoTime());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecord() {
|
||||
// Rate = 5 unit / sec
|
||||
// Burst = 2 * 10 = 20 units
|
||||
MetricConfig config = new MetricConfig()
|
||||
.quota(Quota.upperBound(5))
|
||||
.timeWindow(2, TimeUnit.SECONDS)
|
||||
.samples(10);
|
||||
|
||||
TokenBucket tk = new TokenBucket();
|
||||
|
||||
// Expect 100 credits at T
|
||||
assertEquals(100, tk.measure(config, time.milliseconds()), 0.1);
|
||||
|
||||
// Record 60 at T, expect 13 credits
|
||||
tk.record(config, 60, time.milliseconds());
|
||||
assertEquals(40, tk.measure(config, time.milliseconds()), 0.1);
|
||||
|
||||
// Advance by 2s, record 5, expect 45 credits
|
||||
time.sleep(2000);
|
||||
tk.record(config, 5, time.milliseconds());
|
||||
assertEquals(45, tk.measure(config, time.milliseconds()), 0.1);
|
||||
|
||||
// Advance by 2s, record 60, expect -5 credits
|
||||
time.sleep(2000);
|
||||
tk.record(config, 60, time.milliseconds());
|
||||
assertEquals(-5, tk.measure(config, time.milliseconds()), 0.1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnrecord() {
|
||||
// Rate = 5 unit / sec
|
||||
// Burst = 2 * 10 = 20 units
|
||||
MetricConfig config = new MetricConfig()
|
||||
.quota(Quota.upperBound(5))
|
||||
.timeWindow(2, TimeUnit.SECONDS)
|
||||
.samples(10);
|
||||
|
||||
TokenBucket tk = new TokenBucket();
|
||||
|
||||
// Expect 100 credits at T
|
||||
assertEquals(100, tk.measure(config, time.milliseconds()), 0.1);
|
||||
|
||||
// Record -60 at T, expect 100 credits
|
||||
tk.record(config, -60, time.milliseconds());
|
||||
assertEquals(100, tk.measure(config, time.milliseconds()), 0.1);
|
||||
|
||||
// Advance by 2s, record 60, expect 40 credits
|
||||
time.sleep(2000);
|
||||
tk.record(config, 60, time.milliseconds());
|
||||
assertEquals(40, tk.measure(config, time.milliseconds()), 0.1);
|
||||
|
||||
// Advance by 2s, record -60, expect 100 credits
|
||||
time.sleep(2000);
|
||||
tk.record(config, -60, time.milliseconds());
|
||||
assertEquals(100, tk.measure(config, time.milliseconds()), 0.1);
|
||||
}
|
||||
}
|
||||
|
|
@ -394,16 +394,12 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
|
|||
sensorAccessor.getOrCreate(
|
||||
getQuotaSensorName(metricTags),
|
||||
ClientQuotaManager.InactiveSensorExpirationTimeSeconds,
|
||||
clientRateMetricName(metricTags),
|
||||
Some(getQuotaMetricConfig(metricTags)),
|
||||
new Rate
|
||||
registerQuotaMetrics(metricTags)
|
||||
),
|
||||
sensorAccessor.getOrCreate(
|
||||
getThrottleTimeSensorName(metricTags),
|
||||
ClientQuotaManager.InactiveSensorExpirationTimeSeconds,
|
||||
throttleMetricName(metricTags),
|
||||
None,
|
||||
new Avg
|
||||
sensor => sensor.add(throttleMetricName(metricTags), new Avg)
|
||||
)
|
||||
)
|
||||
if (quotaCallback.quotaResetRequired(clientQuotaType))
|
||||
|
|
@ -411,6 +407,14 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
|
|||
sensors
|
||||
}
|
||||
|
||||
protected def registerQuotaMetrics(metricTags: Map[String, String])(sensor: Sensor): Unit = {
|
||||
sensor.add(
|
||||
clientRateMetricName(metricTags),
|
||||
new Rate,
|
||||
getQuotaMetricConfig(metricTags)
|
||||
)
|
||||
}
|
||||
|
||||
private def metricTagsToSensorSuffix(metricTags: Map[String, String]): String =
|
||||
metricTags.values.mkString(":")
|
||||
|
||||
|
|
@ -420,7 +424,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
|
|||
private def getQuotaSensorName(metricTags: Map[String, String]): String =
|
||||
s"$quotaType-${metricTagsToSensorSuffix(metricTags)}"
|
||||
|
||||
private def getQuotaMetricConfig(metricTags: Map[String, String]): MetricConfig = {
|
||||
protected def getQuotaMetricConfig(metricTags: Map[String, String]): MetricConfig = {
|
||||
getQuotaMetricConfig(quotaLimit(metricTags.asJava))
|
||||
}
|
||||
|
||||
|
|
@ -435,9 +439,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
|
|||
sensorAccessor.getOrCreate(
|
||||
sensorName,
|
||||
ClientQuotaManager.InactiveSensorExpirationTimeSeconds,
|
||||
metricName,
|
||||
None,
|
||||
new Rate
|
||||
sensor => sensor.add(metricName, new Rate)
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -23,6 +23,8 @@ import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
|
|||
import org.apache.kafka.common.metrics.Metrics
|
||||
import org.apache.kafka.common.metrics.QuotaViolationException
|
||||
import org.apache.kafka.common.metrics.Sensor
|
||||
import org.apache.kafka.common.metrics.stats.Rate
|
||||
import org.apache.kafka.common.metrics.stats.TokenBucket
|
||||
import org.apache.kafka.common.protocol.Errors
|
||||
import org.apache.kafka.common.utils.Time
|
||||
import org.apache.kafka.server.quota.ClientQuotaCallback
|
||||
|
|
@ -60,7 +62,7 @@ abstract class AbstractControllerMutationQuota(private val time: Time) extends C
|
|||
protected var lastRecordedTimeMs = 0L
|
||||
|
||||
protected def updateThrottleTime(e: QuotaViolationException, timeMs: Long): Unit = {
|
||||
lastThrottleTimeMs = ClientQuotaManager.throttleTime(e, timeMs)
|
||||
lastThrottleTimeMs = ControllerMutationQuotaManager.throttleTimeMs(e, timeMs)
|
||||
lastRecordedTimeMs = timeMs
|
||||
}
|
||||
|
||||
|
|
@ -131,6 +133,22 @@ class PermissiveControllerMutationQuota(private val time: Time,
|
|||
|
||||
object ControllerMutationQuotaManager {
|
||||
val QuotaControllerMutationDefault = Int.MaxValue.toDouble
|
||||
|
||||
/**
|
||||
* This calculates the amount of time needed to bring the TokenBucket within quota
|
||||
* assuming that no new metrics are recorded.
|
||||
*
|
||||
* Basically, if a value < 0 is observed, the time required to bring it to zero is
|
||||
* -value / refill rate (quota bound) * 1000.
|
||||
*/
|
||||
def throttleTimeMs(e: QuotaViolationException, timeMs: Long): Long = {
|
||||
e.metric().measurable() match {
|
||||
case _: TokenBucket =>
|
||||
Math.round(-e.value() / e.bound() * 1000)
|
||||
case _ => throw new IllegalArgumentException(
|
||||
s"Metric ${e.metric().metricName()} is not a TokenBucket metric, value ${e.metric().measurable()}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -156,6 +174,24 @@ class ControllerMutationQuotaManager(private val config: ClientQuotaManagerConfi
|
|||
quotaMetricTags.asJava)
|
||||
}
|
||||
|
||||
private def clientTokenBucketMetricName(quotaMetricTags: Map[String, String]): MetricName = {
|
||||
metrics.metricName("tokens", QuotaType.ControllerMutation.toString,
|
||||
"Tracking remaining tokens in the token bucket per user/client-id",
|
||||
quotaMetricTags.asJava)
|
||||
}
|
||||
|
||||
override protected def registerQuotaMetrics(metricTags: Map[String, String])(sensor: Sensor): Unit = {
|
||||
sensor.add(
|
||||
clientRateMetricName(metricTags),
|
||||
new Rate
|
||||
)
|
||||
sensor.add(
|
||||
clientTokenBucketMetricName(metricTags),
|
||||
new TokenBucket,
|
||||
getQuotaMetricConfig(metricTags)
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Records that a user/clientId accumulated or would like to accumulate the provided amount at the
|
||||
* the specified time, returns throttle time in milliseconds. The quota is strict meaning that it
|
||||
|
|
@ -179,7 +215,7 @@ class ControllerMutationQuotaManager(private val config: ClientQuotaManagerConfi
|
|||
0
|
||||
} catch {
|
||||
case e: QuotaViolationException =>
|
||||
val throttleTimeMs = throttleTime(e, timeMs).toInt
|
||||
val throttleTimeMs = ControllerMutationQuotaManager.throttleTimeMs(e, timeMs).toInt
|
||||
debug(s"Quota violated for sensor (${quotaSensor.name}). Delay time: ($throttleTimeMs)")
|
||||
throttleTimeMs
|
||||
}
|
||||
|
|
|
|||
|
|
@ -194,9 +194,7 @@ class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig,
|
|||
sensorAccess.getOrCreate(
|
||||
replicationType.toString,
|
||||
InactiveSensorExpirationTimeSeconds,
|
||||
rateMetricName,
|
||||
Some(getQuotaMetricConfig(quota)),
|
||||
new SimpleRate
|
||||
sensor => sensor.add(rateMetricName, new SimpleRate, getQuotaMetricConfig(quota))
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,8 +18,7 @@ package kafka.server
|
|||
|
||||
import java.util.concurrent.locks.ReadWriteLock
|
||||
|
||||
import org.apache.kafka.common.MetricName
|
||||
import org.apache.kafka.common.metrics.{MeasurableStat, MetricConfig, Metrics, Sensor}
|
||||
import org.apache.kafka.common.metrics.{Metrics, Sensor}
|
||||
|
||||
/**
|
||||
* Class which centralises the logic for creating/accessing sensors.
|
||||
|
|
@ -29,8 +28,7 @@ import org.apache.kafka.common.metrics.{MeasurableStat, MetricConfig, Metrics, S
|
|||
*/
|
||||
class SensorAccess(lock: ReadWriteLock, metrics: Metrics) {
|
||||
|
||||
def getOrCreate(sensorName: String, expirationTime: Long,
|
||||
metricName: => MetricName, config: => Option[MetricConfig], measure: => MeasurableStat): Sensor = {
|
||||
def getOrCreate(sensorName: String, expirationTime: Long, registerMetrics: Sensor => Unit): Sensor = {
|
||||
var sensor: Sensor = null
|
||||
|
||||
/* Acquire the read lock to fetch the sensor. It is safe to call getSensor from multiple threads.
|
||||
|
|
@ -61,8 +59,8 @@ class SensorAccess(lock: ReadWriteLock, metrics: Metrics) {
|
|||
// ensure that we initialise `ClientSensors` with non-null parameters.
|
||||
sensor = metrics.getSensor(sensorName)
|
||||
if (sensor == null) {
|
||||
sensor = metrics.sensor(sensorName, config.orNull, expirationTime)
|
||||
sensor.add(metricName, measure)
|
||||
sensor = metrics.sensor(sensorName, null, expirationTime)
|
||||
registerMetrics(sensor)
|
||||
}
|
||||
} finally {
|
||||
lock.writeLock().unlock()
|
||||
|
|
|
|||
|
|
@ -23,7 +23,8 @@ import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
|
|||
import org.apache.kafka.common.metrics.MetricConfig
|
||||
import org.apache.kafka.common.metrics.Metrics
|
||||
import org.apache.kafka.common.metrics.Quota
|
||||
import org.apache.kafka.common.metrics.stats.Rate
|
||||
import org.apache.kafka.common.metrics.QuotaViolationException
|
||||
import org.apache.kafka.common.metrics.stats.TokenBucket
|
||||
import org.apache.kafka.common.utils.MockTime
|
||||
import org.junit.Assert._
|
||||
import org.junit.Assert.assertEquals
|
||||
|
|
@ -38,26 +39,26 @@ class StrictControllerMutationQuotaTest {
|
|||
val sensor = metrics.sensor("sensor", new MetricConfig()
|
||||
.quota(Quota.upperBound(10))
|
||||
.timeWindow(1, TimeUnit.SECONDS)
|
||||
.samples(11))
|
||||
.samples(10))
|
||||
val metricName = metrics.metricName("rate", "test-group")
|
||||
assertTrue(sensor.add(metricName, new Rate))
|
||||
assertTrue(sensor.add(metricName, new TokenBucket))
|
||||
|
||||
val quota = new StrictControllerMutationQuota(time, sensor)
|
||||
assertFalse(quota.isExceeded)
|
||||
|
||||
// Recording a first value at T to bring the avg rate to 9. Value is accepted
|
||||
// Recording a first value at T to bring the tokens to 10. Value is accepted
|
||||
// because the quota is not exhausted yet.
|
||||
quota.record(90)
|
||||
assertFalse(quota.isExceeded)
|
||||
assertEquals(0, quota.throttleTime)
|
||||
|
||||
// Recording a second value at T to bring the avg rate to 18. Value is accepted
|
||||
// Recording a second value at T to bring the tokens to -80. Value is accepted
|
||||
quota.record(90)
|
||||
assertFalse(quota.isExceeded)
|
||||
assertEquals(0, quota.throttleTime)
|
||||
|
||||
// Recording a third value at T is rejected immediately and rate is not updated
|
||||
// because the quota is exhausted.
|
||||
// Recording a third value at T is rejected immediately because there are not
|
||||
// tokens available in the bucket.
|
||||
assertThrows(classOf[ThrottlingQuotaExceededException],
|
||||
() => quota.record(90))
|
||||
assertTrue(quota.isExceeded)
|
||||
|
|
@ -79,26 +80,26 @@ class PermissiveControllerMutationQuotaTest {
|
|||
val sensor = metrics.sensor("sensor", new MetricConfig()
|
||||
.quota(Quota.upperBound(10))
|
||||
.timeWindow(1, TimeUnit.SECONDS)
|
||||
.samples(11))
|
||||
.samples(10))
|
||||
val metricName = metrics.metricName("rate", "test-group")
|
||||
assertTrue(sensor.add(metricName, new Rate))
|
||||
assertTrue(sensor.add(metricName, new TokenBucket))
|
||||
|
||||
val quota = new PermissiveControllerMutationQuota(time, sensor)
|
||||
assertFalse(quota.isExceeded)
|
||||
|
||||
// Recording a first value at T to bring the avg rate to 9. Value is accepted
|
||||
// Recording a first value at T to bring the tokens 10. Value is accepted
|
||||
// because the quota is not exhausted yet.
|
||||
quota.record(90)
|
||||
assertFalse(quota.isExceeded)
|
||||
assertEquals(0, quota.throttleTime)
|
||||
|
||||
// Recording a second value at T to bring the avg rate to 18. Value is accepted
|
||||
// Recording a second value at T to bring the tokens to -80. Value is accepted
|
||||
quota.record(90)
|
||||
assertFalse(quota.isExceeded)
|
||||
assertEquals(8000, quota.throttleTime)
|
||||
|
||||
// Recording a second value at T to bring the avg rate to 27. Value is accepted
|
||||
// and rate is updated even though the quota is exhausted.
|
||||
// Recording a second value at T to bring the tokens to -170. Value is accepted
|
||||
// even though the quota is exhausted.
|
||||
quota.record(90)
|
||||
assertFalse(quota.isExceeded) // quota is never exceeded
|
||||
assertEquals(17000, quota.throttleTime)
|
||||
|
|
@ -115,7 +116,10 @@ class ControllerMutationQuotaManagerTest extends BaseClientQuotaManagerTest {
|
|||
private val User = "ANONYMOUS"
|
||||
private val ClientId = "test-client"
|
||||
|
||||
private val config = ClientQuotaManagerConfig()
|
||||
private val config = ClientQuotaManagerConfig(
|
||||
numQuotaSamples = 10,
|
||||
quotaWindowSizeSeconds = 1
|
||||
)
|
||||
|
||||
private def withQuotaManager(f: ControllerMutationQuotaManager => Unit): Unit = {
|
||||
val quotaManager = new ControllerMutationQuotaManager(config, metrics, time,"", None)
|
||||
|
|
@ -126,6 +130,22 @@ class ControllerMutationQuotaManagerTest extends BaseClientQuotaManagerTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testThrottleTime(): Unit = {
|
||||
import ControllerMutationQuotaManager._
|
||||
|
||||
val time = new MockTime(0, System.currentTimeMillis, 0)
|
||||
val metrics = new Metrics(time)
|
||||
val sensor = metrics.sensor("sensor")
|
||||
val metricName = metrics.metricName("tokens", "test-group")
|
||||
sensor.add(metricName, new TokenBucket)
|
||||
val metric = metrics.metric(metricName)
|
||||
|
||||
assertEquals(0, throttleTimeMs(new QuotaViolationException(metric, 0, 10), time.milliseconds()))
|
||||
assertEquals(500, throttleTimeMs(new QuotaViolationException(metric, -5, 10), time.milliseconds()))
|
||||
assertEquals(1000, throttleTimeMs(new QuotaViolationException(metric, -10, 10), time.milliseconds()))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testControllerMutationQuotaViolation(): Unit = {
|
||||
withQuotaManager { quotaManager =>
|
||||
|
|
@ -142,19 +162,19 @@ class ControllerMutationQuotaManagerTest extends BaseClientQuotaManagerTest {
|
|||
assertEquals(0, queueSizeMetric.metricValue.asInstanceOf[Double].toInt)
|
||||
|
||||
// Create a spike worth of 110 mutations.
|
||||
// Current avg rate = 10 * 10 = 100/10 = 10 mutations per second.
|
||||
// Current tokens in the bucket = 100
|
||||
// As we use the Strict enforcement, the quota is checked before updating the rate. Hence,
|
||||
// the spike is accepted and no quota violation error is raised.
|
||||
var throttleTime = maybeRecord(quotaManager, User, ClientId, 110)
|
||||
assertEquals("Should not be throttled", 0, throttleTime)
|
||||
|
||||
// Create a spike worth of 110 mutations.
|
||||
// Current avg rate = 10 * 10 + 110 = 210/10 = 21 mutations per second.
|
||||
// Current tokens in the bucket = 100 - 110 = -10
|
||||
// As the quota is already violated, the spike is rejected immediately without updating the
|
||||
// rate. The client must wait:
|
||||
// (21 - quota) / quota * window-size = (21 - 10) / 10 * 10 = 11 seconds
|
||||
// 10 / 10 = 1s
|
||||
throttleTime = maybeRecord(quotaManager, User, ClientId, 110)
|
||||
assertEquals("Should be throttled", 11000, throttleTime)
|
||||
assertEquals("Should be throttled", 1000, throttleTime)
|
||||
|
||||
// Throttle
|
||||
throttle(quotaManager, User, ClientId, throttleTime, callback)
|
||||
|
|
@ -171,7 +191,7 @@ class ControllerMutationQuotaManagerTest extends BaseClientQuotaManagerTest {
|
|||
assertEquals(1, numCallbacks)
|
||||
|
||||
// Retry to spike worth of 110 mutations after having waited the required throttle time.
|
||||
// Current avg rate = 0 = 0/11 = 0 mutations per second.
|
||||
// Current tokens in the bucket = 0
|
||||
throttleTime = maybeRecord(quotaManager, User, ClientId, 110)
|
||||
assertEquals("Should be throttled", 0, throttleTime)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -94,8 +94,8 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
|
|||
properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
|
||||
properties.put(KafkaConfig.PrincipalBuilderClassProp,
|
||||
classOf[ControllerMutationQuotaTest.TestPrincipalBuilder].getName)
|
||||
// We use the default number of samples and window size.
|
||||
properties.put(KafkaConfig.NumControllerQuotaSamplesProp, "11")
|
||||
// Specify number of samples and window size.
|
||||
properties.put(KafkaConfig.NumControllerQuotaSamplesProp, "10")
|
||||
properties.put(KafkaConfig.ControllerQuotaWindowSizeSecondsProp, "1")
|
||||
}
|
||||
|
||||
|
|
@ -130,7 +130,7 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
|
|||
// Create two topics worth of 30 partitions each. As we use a strict quota, we
|
||||
// expect one to be created and one to be rejected.
|
||||
// Theoretically, the throttle time should be below or equal to:
|
||||
// ((30 / 10) - 2) / 2 * 10 = 5s
|
||||
// -(-10) / 2 = 5s
|
||||
val (throttleTimeMs1, errors1) = createTopics(TopicsWith30Partitions, StrictCreateTopicsRequestVersion)
|
||||
assertThrottleTime(5000, throttleTimeMs1)
|
||||
// Ordering is not guaranteed so we only check the errors
|
||||
|
|
@ -153,7 +153,7 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
|
|||
// Create two topics worth of 30 partitions each. As we use a permissive quota, we
|
||||
// expect both topics to be created.
|
||||
// Theoretically, the throttle time should be below or equal to:
|
||||
// ((60 / 10) - 2) / 2 * 10 = 20s
|
||||
// -(-40) / 2 = 20s
|
||||
val (throttleTimeMs, errors) = createTopics(TopicsWith30Partitions, PermissiveCreateTopicsRequestVersion)
|
||||
assertThrottleTime(20000, throttleTimeMs)
|
||||
assertEquals(Map(Topic1 -> Errors.NONE, Topic2 -> Errors.NONE), errors)
|
||||
|
|
@ -181,7 +181,7 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
|
|||
// Delete two topics worth of 30 partitions each. As we use a strict quota, we
|
||||
// expect the first topic to be deleted and the second to be rejected.
|
||||
// Theoretically, the throttle time should be below or equal to:
|
||||
// ((30 / 10) - 2) / 2 * 10 = 5s
|
||||
// -(-10) / 2 = 5s
|
||||
val (throttleTimeMs1, errors1) = deleteTopics(TopicsWith30Partitions, StrictDeleteTopicsRequestVersion)
|
||||
assertThrottleTime(5000, throttleTimeMs1)
|
||||
// Ordering is not guaranteed so we only check the errors
|
||||
|
|
@ -208,7 +208,7 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
|
|||
// Delete two topics worth of 30 partitions each. As we use a permissive quota, we
|
||||
// expect both topics to be deleted.
|
||||
// Theoretically, the throttle time should be below or equal to:
|
||||
// ((60 / 10) - 2) / 2 * 10 = 20s
|
||||
// -(-40) / 2 = 20s
|
||||
val (throttleTimeMs, errors) = deleteTopics(TopicsWith30Partitions, PermissiveDeleteTopicsRequestVersion)
|
||||
assertThrottleTime(20000, throttleTimeMs)
|
||||
assertEquals(Map(Topic1 -> Errors.NONE, Topic2 -> Errors.NONE), errors)
|
||||
|
|
@ -238,7 +238,7 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
|
|||
// Add 30 partitions to each topic. As we use a strict quota, we
|
||||
// expect the first topic to be extended and the second to be rejected.
|
||||
// Theoretically, the throttle time should be below or equal to:
|
||||
// ((30 / 10) - 2) / 2 * 10 = 5s
|
||||
// -(-10) / 2 = 5s
|
||||
val (throttleTimeMs1, errors1) = createPartitions(TopicsWith31Partitions, StrictCreatePartitionsRequestVersion)
|
||||
assertThrottleTime(5000, throttleTimeMs1)
|
||||
// Ordering is not guaranteed so we only check the errors
|
||||
|
|
@ -265,7 +265,7 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
|
|||
// Create two topics worth of 30 partitions each. As we use a permissive quota, we
|
||||
// expect both topics to be created.
|
||||
// Theoretically, the throttle time should be below or equal to:
|
||||
// ((60 / 10) - 2) / 2 * 10 = 20s
|
||||
// -(-40) / 2 = 20s
|
||||
val (throttleTimeMs, errors) = createPartitions(TopicsWith31Partitions, PermissiveCreatePartitionsRequestVersion)
|
||||
assertThrottleTime(20000, throttleTimeMs)
|
||||
assertEquals(Map(Topic1 -> Errors.NONE, Topic2 -> Errors.NONE), errors)
|
||||
|
|
|
|||
Loading…
Reference in New Issue