diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java index 89df1a4ec3e..e4d3ae834fb 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java @@ -53,6 +53,10 @@ public final class KafkaMetric implements Metric { } } + public Measurable measurable() { + return this.measurable; + } + double value(long timeMs) { return this.measurable.measure(config, timeMs); } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java index fe43940b19c..9dfc457b091 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java @@ -59,10 +59,34 @@ public class Rate implements MeasurableStat { @Override public double measure(MetricConfig config, long now) { double value = stat.measure(config, now); - // the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete - long elapsedCurrentWindowMs = now - stat.current(now).lastWindowMs; - long elapsedPriorWindowsMs = config.timeWindowMs() * (config.samples() - 1); - return value / convert(elapsedCurrentWindowMs + elapsedPriorWindowsMs); + return value / convert(windowSize(config, now)); + } + + public long windowSize(MetricConfig config, long now) { + // purge old samples before we compute the window size + stat.purgeObsoleteSamples(config, now); + + /* + * Here we check the total amount of time elapsed since the oldest non-obsolete window. + * This give the total windowSize of the batch which is the time used for Rate computation. + * However, there is an issue if we do not have sufficient data for e.g. if only 1 second has elapsed in a 30 second + * window, the measured rate will be very high. + * Hence we assume that the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete. + * + * Note that we could simply count the amount of time elapsed in the current window and add n-1 windows to get the total time, + * but this approach does not account for sleeps. SampledStat only creates samples whenever record is called, + * if no record is called for a period of time that time is not accounted for in windowSize and produces incorrect results. + */ + long totalElapsedTimeMs = now - stat.oldest(now).lastWindowMs; + // Check how many full windows of data we have currently retained + int numFullWindows = (int) (totalElapsedTimeMs / config.timeWindowMs()); + int minFullWindows = config.samples() - 1; + + // If the available windows are less than the minimum required, add the difference to the totalElapsedTime + if (numFullWindows < minFullWindows) + totalElapsedTimeMs += (minFullWindows - numFullWindows) * config.timeWindowMs(); + + return totalElapsedTimeMs; } private double convert(long timeMs) { diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java index 8d3e33ddbd8..90583870a07 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java @@ -384,6 +384,34 @@ public class MetricsTest { assertEquals(0.0, p75.value(), 1.0); } + @Test + public void testRateWindowing() throws Exception { + // Use the default time window. Set 3 samples + MetricConfig cfg = new MetricConfig().samples(3); + Sensor s = metrics.sensor("test.sensor", cfg); + s.add(new MetricName("test.rate", "grp1"), new Rate(TimeUnit.SECONDS)); + + int sum = 0; + int count = cfg.samples() - 1; + // Advance 1 window after every record + for (int i = 0; i < count; i++) { + s.record(100); + sum += 100; + time.sleep(cfg.timeWindowMs()); + } + + // Sleep for half the window. + time.sleep(cfg.timeWindowMs() / 2); + + // prior to any time passing + double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + cfg.timeWindowMs() / 2) / 1000.0; + + KafkaMetric km = metrics.metrics().get(new MetricName("test.rate", "grp1")); + assertEquals("Rate(0...2) = 2.666", sum / elapsedSecs, km.value(), EPS); + assertEquals("Elapsed Time = 75 seconds", elapsedSecs, + ((Rate) km.measurable()).windowSize(cfg, time.milliseconds()) / 1000, EPS); + } + public static class ConstantMeasurable implements Measurable { public double value = 0.0; diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index b21785f8974..24f294d2b43 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -123,13 +123,14 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, case qve: QuotaViolationException => // Compute the delay val clientMetric = metrics.metrics().get(clientRateMetricName(clientId)) - throttleTimeMs = throttleTime(clientMetric.value(), getQuotaMetricConfig(quota(clientId))) + throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(quota(clientId))) delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback)) delayQueueSensor.record() - clientSensors.throttleTimeSensor.record(throttleTimeMs) // If delayed, add the element to the delayQueue logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs)) } + // If the request is not throttled, a throttleTime of 0 ms is recorded + clientSensors.throttleTimeSensor.record(throttleTimeMs) throttleTimeMs } @@ -141,11 +142,21 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, * we need to add a delay of X to W such that O * W / (W + X) = T. * Solving for X, we get X = (O - T)/T * W. */ - private def throttleTime(metricValue: Double, config: MetricConfig): Int = { + private def throttleTime(clientMetric: KafkaMetric, config: MetricConfig): Int = { + val rateMetric: Rate = measurableAsRate(clientMetric.metricName(), clientMetric.measurable()) val quota = config.quota() - val difference = metricValue - quota.bound - val time = difference / quota.bound * config.timeWindowMs() * config.samples() - time.round.toInt + val difference = clientMetric.value() - quota.bound + // Use the precise window used by the rate calculation + val throttleTimeMs = difference / quota.bound * rateMetric.windowSize(config, time.milliseconds()) + throttleTimeMs.round.toInt + } + + // Casting to Rate because we only use Rate in Quota computation + private def measurableAsRate(name: MetricName, measurable: Measurable): Rate = { + measurable match { + case r: Rate => r + case _ => throw new IllegalArgumentException(s"Metric $name is not a Rate metric, value $measurable") + } } /** diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala index 3343c53991f..38b3dbd102d 100644 --- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala +++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala @@ -156,7 +156,7 @@ class QuotasTest extends KafkaServerTestHarness { RequestKeys.nameForKey(RequestKeys.ProduceKey), "Tracking throttle-time per client", "client-id", producerId2) - Assert.assertEquals("Should not have been throttled", Double.NaN, allMetrics(producerMetricName).value()) + Assert.assertEquals("Should not have been throttled", 0.0, allMetrics(producerMetricName).value()) // The "client" consumer does not get throttled. consume(consumers(1), numRecords) @@ -167,7 +167,7 @@ class QuotasTest extends KafkaServerTestHarness { RequestKeys.nameForKey(RequestKeys.FetchKey), "Tracking throttle-time per client", "client-id", consumerId2) - Assert.assertEquals("Should not have been throttled", Double.NaN, allMetrics(consumerMetricName).value()) + Assert.assertEquals("Should not have been throttled", 0.0, allMetrics(consumerMetricName).value()) } def produce(p: KafkaProducer[Array[Byte], Array[Byte]], count: Int): Int = { diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala index 997928cfe5d..75e856a0ba2 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -71,10 +71,13 @@ class ClientQuotaManagerTest { Assert.assertEquals(0, queueSizeMetric.value().toInt) // Create a spike. - // 400*10 + 2000 = 6000/10 = 600 bytes per second. - // (600 - quota)/quota*window-size = (600-500)/500*11 seconds = 2200 - val sleepTime = clientMetrics.recordAndMaybeThrottle("unknown", 2000, callback) - Assert.assertEquals("Should be throttled", 2200, sleepTime) + // 400*10 + 2000 + 300 = 6300/10.5 = 600 bytes per second. + // (600 - quota)/quota*window-size = (600-500)/500*10.5 seconds = 2100 + // 10.5 seconds because the last window is half complete + time.sleep(500) + val sleepTime = clientMetrics.recordAndMaybeThrottle("unknown", 2300, callback) + + Assert.assertEquals("Should be throttled", 2100, sleepTime) Assert.assertEquals(1, queueSizeMetric.value().toInt) // After a request is delayed, the callback cannot be triggered immediately clientMetrics.throttledRequestReaper.doWork()