From e98e239a0c7fd9a500de606b7a05582f1a384da4 Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Mon, 23 Sep 2019 07:12:45 -0700 Subject: [PATCH] KAFKA-8859: Refactor cache-level metrics (#7367) Cache-level metrics are refactor according to KIP-444: tag client-id changed to thread-id name hitRatio changed to hit-ratio made backward compatible by using streams config built.in.metrics.version Reviewers: Guozhang Wang , Bill Bejeck --- .../internals/metrics/StreamsMetricsImpl.java | 83 ++++++++++--- .../streams/state/internals/NamedCache.java | 88 +++---------- .../internals/metrics/NamedCacheMetrics.java | 86 +++++++++++++ .../integration/MetricsIntegrationTest.java | 44 +++++-- .../metrics/StreamsMetricsImplTest.java | 46 ++++++- .../state/internals/NamedCacheTest.java | 33 +---- .../metrics/NamedCacheMetricsTest.java | 117 ++++++++++++++++++ 7 files changed, 367 insertions(+), 130 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/NamedCacheMetrics.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/NamedCacheMetricsTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index 1e7c1aee480..961bec80044 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.CumulativeCount; import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Min; import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.metrics.stats.Value; import org.apache.kafka.common.metrics.stats.WindowedCount; @@ -63,9 +64,11 @@ public class StreamsMetricsImpl implements StreamsMetrics { private static final String SENSOR_PREFIX_DELIMITER = "."; private static final String SENSOR_NAME_DELIMITER = ".s."; - public static final String THREAD_ID_TAG = "client-id"; + public static final String THREAD_ID_TAG = "thread-id"; + public static final String THREAD_ID_TAG_0100_TO_23 = "client-id"; public static final String TASK_ID_TAG = "task-id"; public static final String STORE_ID_TAG = "state-id"; + public static final String RECORD_CACHE_ID_TAG = "record-cache-id"; public static final String ROLLUP_VALUE = "all"; @@ -77,6 +80,11 @@ public class StreamsMetricsImpl implements StreamsMetrics { public static final String TOTAL_SUFFIX = "-total"; public static final String RATIO_SUFFIX = "-ratio"; + public static final String AVG_VALUE_DOC = "The average value of "; + public static final String MAX_VALUE_DOC = "The maximum value of "; + public static final String AVG_LATENCY_DOC = "The average latency of "; + public static final String MAX_LATENCY_DOC = "The maximum latency of "; + public static final String GROUP_PREFIX_WO_DELIMITER = "stream"; public static final String GROUP_PREFIX = GROUP_PREFIX_WO_DELIMITER + "-"; public static final String GROUP_SUFFIX = "-metrics"; @@ -84,6 +92,7 @@ public class StreamsMetricsImpl implements StreamsMetrics { public static final String THREAD_LEVEL_GROUP = GROUP_PREFIX_WO_DELIMITER + GROUP_SUFFIX; public static final String TASK_LEVEL_GROUP = GROUP_PREFIX + "task" + GROUP_SUFFIX; public static final String STATE_LEVEL_GROUP = GROUP_PREFIX + "state" + GROUP_SUFFIX; + public static final String CACHE_LEVEL_GROUP = GROUP_PREFIX + "record-cache" + GROUP_SUFFIX; public static final String PROCESSOR_NODE_METRICS_GROUP = "stream-processor-node-metrics"; public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id"; @@ -130,7 +139,7 @@ public class StreamsMetricsImpl implements StreamsMetrics { public Map threadLevelTagMap() { final Map tagMap = new LinkedHashMap<>(); - tagMap.put(THREAD_ID_TAG, threadName); + tagMap.put(THREAD_ID_TAG_0100_TO_23, threadName); return tagMap; } @@ -237,12 +246,12 @@ public class StreamsMetricsImpl implements StreamsMetrics { return taskSensorPrefix(taskName) + SENSOR_PREFIX_DELIMITER + "node" + SENSOR_PREFIX_DELIMITER + processorNodeName; } - public final Sensor cacheLevelSensor(final String taskName, - final String cacheName, - final String sensorName, - final Sensor.RecordingLevel recordingLevel, - final Sensor... parents) { - final String key = cacheSensorPrefix(taskName, cacheName); + public Sensor cacheLevelSensor(final String taskName, + final String storeName, + final String sensorName, + final Sensor.RecordingLevel recordingLevel, + final Sensor... parents) { + final String key = cacheSensorPrefix(taskName, storeName); synchronized (cacheLevelSensors) { if (!cacheLevelSensors.containsKey(key)) { cacheLevelSensors.put(key, new LinkedList<>()); @@ -258,6 +267,18 @@ public class StreamsMetricsImpl implements StreamsMetrics { } } + public Map cacheLevelTagMap(final String taskName, final String storeName) { + final Map tagMap = new LinkedHashMap<>(); + tagMap.put(TASK_ID_TAG, taskName); + tagMap.put(RECORD_CACHE_ID_TAG, storeName); + if (version == Version.FROM_100_TO_23) { + tagMap.put(THREAD_ID_TAG_0100_TO_23, Thread.currentThread().getName()); + } else { + tagMap.put(THREAD_ID_TAG, Thread.currentThread().getName()); + } + return tagMap; + } + public final void removeAllCacheLevelSensors(final String taskName, final String cacheName) { final String key = cacheSensorPrefix(taskName, cacheName); synchronized (cacheLevelSensors) { @@ -423,15 +444,17 @@ public class StreamsMetricsImpl implements StreamsMetrics { } - public static void addAvgAndMaxToSensor(final Sensor sensor, + private static void addAvgAndMaxToSensor(final Sensor sensor, final String group, final Map tags, - final String operation) { + final String operation, + final String descriptionOfAvg, + final String descriptionOfMax) { sensor.add( new MetricName( operation + AVG_SUFFIX, group, - "The average value of " + operation + ".", + descriptionOfAvg, tags), new Avg() ); @@ -439,12 +462,26 @@ public class StreamsMetricsImpl implements StreamsMetrics { new MetricName( operation + MAX_SUFFIX, group, - "The max value of " + operation + ".", + descriptionOfMax, tags), new Max() ); } + public static void addAvgAndMaxToSensor(final Sensor sensor, + final String group, + final Map tags, + final String operation) { + addAvgAndMaxToSensor( + sensor, + group, + tags, + operation, + AVG_VALUE_DOC + operation + ".", + MAX_VALUE_DOC + operation + "." + ); + } + public static void addAvgAndMaxLatencyToSensor(final Sensor sensor, final String group, final Map tags, @@ -453,7 +490,7 @@ public class StreamsMetricsImpl implements StreamsMetrics { new MetricName( operation + "-latency-avg", group, - "The average latency of " + operation + " operation.", + AVG_LATENCY_DOC + operation + " operation.", tags), new Avg() ); @@ -461,12 +498,30 @@ public class StreamsMetricsImpl implements StreamsMetrics { new MetricName( operation + "-latency-max", group, - "The max latency of " + operation + " operation.", + MAX_LATENCY_DOC + operation + " operation.", tags), new Max() ); } + public static void addAvgAndMinAndMaxToSensor(final Sensor sensor, + final String group, + final Map tags, + final String operation, + final String descriptionOfAvg, + final String descriptionOfMin, + final String descriptionOfMax) { + addAvgAndMaxToSensor(sensor, group, tags, operation, descriptionOfAvg, descriptionOfMax); + sensor.add( + new MetricName( + operation + MIN_SUFFIX, + group, + descriptionOfMin, + tags), + new Min() + ); + } + public static void addInvocationRateAndCountToSensor(final Sensor sensor, final String group, final Map tags, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java index a1d0aab2770..2d53440ef47 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java @@ -19,14 +19,12 @@ package org.apache.kafka.streams.state.internals; import java.util.NavigableMap; import java.util.TreeMap; import java.util.TreeSet; -import org.apache.kafka.common.MetricName; + import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.Avg; -import org.apache.kafka.common.metrics.stats.Max; -import org.apache.kafka.common.metrics.stats.Min; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.internals.metrics.NamedCacheMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,19 +32,22 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; -import java.util.Map; import java.util.Set; class NamedCache { private static final Logger log = LoggerFactory.getLogger(NamedCache.class); private final String name; + private final String storeName; + private final String taskName; private final NavigableMap cache = new TreeMap<>(); private final Set dirtyKeys = new LinkedHashSet<>(); private ThreadCache.DirtyEntryFlushListener listener; private LRUNode tail; private LRUNode head; private long currentSizeBytes; - private final NamedCacheMetrics namedCacheMetrics; + + private final StreamsMetricsImpl streamsMetrics; + private final Sensor hitRatioSensor; // internal stats private long numReadHits = 0; @@ -54,9 +55,12 @@ class NamedCache { private long numOverwrites = 0; private long numFlushes = 0; - NamedCache(final String name, final StreamsMetricsImpl metrics) { + NamedCache(final String name, final StreamsMetricsImpl streamsMetrics) { this.name = name; - this.namedCacheMetrics = new NamedCacheMetrics(metrics, name); + this.streamsMetrics = streamsMetrics; + storeName = ThreadCache.underlyingStoreNamefromCacheName(name); + taskName = ThreadCache.taskIDfromCacheName(name); + hitRatioSensor = NamedCacheMetrics.hitRatioSensor(streamsMetrics, taskName, storeName); } synchronized final String name() { @@ -187,7 +191,7 @@ class NamedCache { return null; } else { numReadHits++; - namedCacheMetrics.hitRatioSensor.record((double) numReadHits / (double) (numReadHits + numReadMisses)); + hitRatioSensor.record((double) numReadHits / (double) (numReadHits + numReadMisses)); } return node; } @@ -311,7 +315,7 @@ class NamedCache { currentSizeBytes = 0; dirtyKeys.clear(); cache.clear(); - namedCacheMetrics.removeAllSensors(); + streamsMetrics.removeAllCacheLevelSensors(taskName, storeName); } /** @@ -357,68 +361,4 @@ class NamedCache { } } - private static class NamedCacheMetrics { - private final StreamsMetricsImpl metrics; - - private final Sensor hitRatioSensor; - private final String taskName; - private final String cacheName; - - private NamedCacheMetrics(final StreamsMetricsImpl metrics, final String cacheName) { - taskName = ThreadCache.taskIDfromCacheName(cacheName); - this.cacheName = cacheName; - this.metrics = metrics; - final String group = "stream-record-cache-metrics"; - - // add parent - final Map allMetricTags = metrics.tagMap( - "task-id", taskName, - "record-cache-id", "all" - ); - final Sensor taskLevelHitRatioSensor = metrics.taskLevelSensor(taskName, "hitRatio", Sensor.RecordingLevel.DEBUG); - taskLevelHitRatioSensor.add( - new MetricName("hitRatio-avg", group, "The average cache hit ratio.", allMetricTags), - new Avg() - ); - taskLevelHitRatioSensor.add( - new MetricName("hitRatio-min", group, "The minimum cache hit ratio.", allMetricTags), - new Min() - ); - taskLevelHitRatioSensor.add( - new MetricName("hitRatio-max", group, "The maximum cache hit ratio.", allMetricTags), - new Max() - ); - - // add child - final Map metricTags = metrics.tagMap( - "task-id", taskName, - "record-cache-id", ThreadCache.underlyingStoreNamefromCacheName(cacheName) - ); - - hitRatioSensor = metrics.cacheLevelSensor( - taskName, - cacheName, - "hitRatio", - Sensor.RecordingLevel.DEBUG, - taskLevelHitRatioSensor - ); - hitRatioSensor.add( - new MetricName("hitRatio-avg", group, "The average cache hit ratio.", metricTags), - new Avg() - ); - hitRatioSensor.add( - new MetricName("hitRatio-min", group, "The minimum cache hit ratio.", metricTags), - new Min() - ); - hitRatioSensor.add( - new MetricName("hitRatio-max", group, "The maximum cache hit ratio.", metricTags), - new Max() - ); - - } - - private void removeAllSensors() { - metrics.removeAllCacheLevelSensors(taskName, cacheName); - } - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/NamedCacheMetrics.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/NamedCacheMetrics.java new file mode 100644 index 00000000000..3744428ba58 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/NamedCacheMetrics.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals.metrics; + +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.CACHE_LEVEL_GROUP; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version.FROM_100_TO_23; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMinAndMaxToSensor; + +public class NamedCacheMetrics { + private NamedCacheMetrics() {} + + private static final String HIT_RATIO_0100_TO_23 = "hitRatio"; + private static final String HIT_RATIO = "hit-ratio"; + private static final String HIT_RATIO_AVG_DESCRIPTION = "The average cache hit ratio"; + private static final String HIT_RATIO_MIN_DESCRIPTION = "The minimum cache hit ratio"; + private static final String HIT_RATIO_MAX_DESCRIPTION = "The maximum cache hit ratio"; + + + public static Sensor hitRatioSensor(final StreamsMetricsImpl streamsMetrics, + final String taskName, + final String storeName) { + + final Sensor hitRatioSensor; + final String hitRatioName; + if (streamsMetrics.version() == FROM_100_TO_23) { + hitRatioName = HIT_RATIO_0100_TO_23; + final Sensor taskLevelHitRatioSensor = streamsMetrics.taskLevelSensor( + taskName, + hitRatioName, + Sensor.RecordingLevel.DEBUG + ); + addAvgAndMinAndMaxToSensor( + taskLevelHitRatioSensor, + CACHE_LEVEL_GROUP, + streamsMetrics.cacheLevelTagMap(taskName, ROLLUP_VALUE), + hitRatioName, + HIT_RATIO_AVG_DESCRIPTION, + HIT_RATIO_MIN_DESCRIPTION, + HIT_RATIO_MAX_DESCRIPTION + ); + hitRatioSensor = streamsMetrics.cacheLevelSensor( + taskName, + storeName, + hitRatioName, + Sensor.RecordingLevel.DEBUG, + taskLevelHitRatioSensor + ); + } else { + hitRatioName = HIT_RATIO; + hitRatioSensor = streamsMetrics.cacheLevelSensor( + taskName, + storeName, + hitRatioName, + Sensor.RecordingLevel.DEBUG + ); + } + addAvgAndMinAndMaxToSensor( + hitRatioSensor, + CACHE_LEVEL_GROUP, + streamsMetrics.cacheLevelTagMap(taskName, storeName), + hitRatioName, + HIT_RATIO_AVG_DESCRIPTION, + HIT_RATIO_MIN_DESCRIPTION, + HIT_RATIO_MAX_DESCRIPTION + ); + return hitRatioSensor; + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java index 0104d024e98..cac33c980f3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java @@ -151,9 +151,12 @@ public class MetricsIntegrationTest { private static final String SKIPPED_RECORDS_TOTAL = "skipped-records-total"; private static final String RECORD_LATENESS_AVG = "record-lateness-avg"; private static final String RECORD_LATENESS_MAX = "record-lateness-max"; - private static final String HIT_RATIO_AVG = "hitRatio-avg"; - private static final String HIT_RATIO_MIN = "hitRatio-min"; - private static final String HIT_RATIO_MAX = "hitRatio-max"; + private static final String HIT_RATIO_AVG_BEFORE_24 = "hitRatio-avg"; + private static final String HIT_RATIO_MIN_BEFORE_24 = "hitRatio-min"; + private static final String HIT_RATIO_MAX_BEFORE_24 = "hitRatio-max"; + private static final String HIT_RATIO_AVG = "hit-ratio-avg"; + private static final String HIT_RATIO_MIN = "hit-ratio-min"; + private static final String HIT_RATIO_MAX = "hit-ratio-max"; // RocksDB metrics private static final String BYTES_WRITTEN_RATE = "bytes-written-rate"; @@ -275,7 +278,18 @@ public class MetricsIntegrationTest { } @Test - public void shouldAddMetricsOnAllLevels() throws Exception { + public void shouldAddMetricsOnAllLevelsWithBuiltInMetricsLatestVersion() throws Exception { + shouldAddMetricsOnAllLevels(StreamsConfig.METRICS_LATEST); + } + + @Test + public void shouldAddMetricsOnAllLevelsWithBuiltInMetricsVersion0100To23() throws Exception { + shouldAddMetricsOnAllLevels(StreamsConfig.METRICS_0100_TO_23); + } + + private void shouldAddMetricsOnAllLevels(final String builtInMetricsVersion) throws Exception { + streamsConfiguration.put(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion); + builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String())) .to(STREAM_OUTPUT_1, Produced.with(Serdes.Integer(), Serdes.String())); builder.table(STREAM_OUTPUT_1, @@ -299,7 +313,7 @@ public class MetricsIntegrationTest { checkKeyValueStoreMetricsByGroup(STREAM_STORE_ROCKSDB_STATE_METRICS); checkKeyValueStoreMetricsByGroup(STREAM_STORE_IN_MEMORY_LRU_STATE_METRICS); checkRocksDBMetricsByTag("rocksdb-state-id"); - checkCacheMetrics(); + checkCacheMetrics(builtInMetricsVersion); closeApplication(); @@ -526,13 +540,25 @@ public class MetricsIntegrationTest { assertThat(listMetricAfterClosingApp.size(), is(0)); } - private void checkCacheMetrics() { + private void checkCacheMetrics(final String builtInMetricsVersion) { final List listMetricCache = new ArrayList(kafkaStreams.metrics().values()).stream() .filter(m -> m.metricName().group().equals(STREAM_CACHE_NODE_METRICS)) .collect(Collectors.toList()); - checkMetricByName(listMetricCache, HIT_RATIO_AVG, 6); - checkMetricByName(listMetricCache, HIT_RATIO_MIN, 6); - checkMetricByName(listMetricCache, HIT_RATIO_MAX, 6); + checkMetricByName( + listMetricCache, + builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST) ? HIT_RATIO_AVG : HIT_RATIO_AVG_BEFORE_24, + builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST) ? 3 : 6 /* includes parent sensors */ + ); + checkMetricByName( + listMetricCache, + builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST) ? HIT_RATIO_MIN : HIT_RATIO_MIN_BEFORE_24, + builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST) ? 3 : 6 /* includes parent sensors */ + ); + checkMetricByName( + listMetricCache, + builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST) ? HIT_RATIO_MAX : HIT_RATIO_MAX_BEFORE_24, + builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST) ? 3 : 6 /* includes parent sensors */ + ); } private void checkWindowStoreMetrics() { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java index f1d951bab96..b5891197661 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java @@ -62,6 +62,7 @@ public class StreamsMetricsImplTest extends EasyMockSupport { private final Map tags = mkMap(mkEntry("tag", "value")); private final String description1 = "description number one"; private final String description2 = "description number two"; + private final String description3 = "description number three"; private final MockTime time = new MockTime(0); private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_NAME, VERSION); @@ -252,11 +253,40 @@ public class StreamsMetricsImplTest extends EasyMockSupport { final Map tagMap = streamsMetrics.storeLevelTagMap(taskName, storeType, storeName); assertThat(tagMap.size(), equalTo(3)); - assertThat(tagMap.get(StreamsMetricsImpl.THREAD_ID_TAG), equalTo(THREAD_NAME)); + assertThat(tagMap.get(StreamsMetricsImpl.THREAD_ID_TAG_0100_TO_23), equalTo(THREAD_NAME)); assertThat(tagMap.get(StreamsMetricsImpl.TASK_ID_TAG), equalTo(taskName)); assertThat(tagMap.get(storeType + "-" + StreamsMetricsImpl.STORE_ID_TAG), equalTo(storeName)); } + @Test + public void shouldGetCacheLevelTagMapForBuiltInMetricsLatestVersion() { + shouldGetCacheLevelTagMap(StreamsConfig.METRICS_LATEST); + } + + @Test + public void shouldGetCacheLevelTagMapForBuiltInMetricsVersion0100To23() { + shouldGetCacheLevelTagMap(StreamsConfig.METRICS_0100_TO_23); + } + + private void shouldGetCacheLevelTagMap(final String builtInMetricsVersion) { + final StreamsMetricsImpl streamsMetrics = + new StreamsMetricsImpl(metrics, THREAD_NAME, builtInMetricsVersion); + final String taskName = "taskName"; + final String storeName = "storeName"; + + final Map tagMap = streamsMetrics.cacheLevelTagMap(taskName, storeName); + + assertThat(tagMap.size(), equalTo(3)); + assertThat( + tagMap.get( + builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST) ? StreamsMetricsImpl.THREAD_ID_TAG + : StreamsMetricsImpl.THREAD_ID_TAG_0100_TO_23), + equalTo(Thread.currentThread().getName()) + ); + assertThat(tagMap.get(StreamsMetricsImpl.TASK_ID_TAG), equalTo(taskName)); + assertThat(tagMap.get(StreamsMetricsImpl.RECORD_CACHE_ID_TAG), equalTo(storeName)); + } + @Test public void shouldAddAmountRateAndSum() { StreamsMetricsImpl @@ -325,6 +355,20 @@ public class StreamsMetricsImplTest extends EasyMockSupport { assertThat(metrics.metrics().size(), equalTo(2 + 1)); // one metric is added automatically in the constructor of Metrics } + @Test + public void shouldAddAvgAndMinAndMaxMetricsToSensor() { + StreamsMetricsImpl + .addAvgAndMinAndMaxToSensor(sensor, group, tags, metricNamePrefix, description1, description2, description3); + + final double valueToRecord1 = 18.0; + final double valueToRecord2 = 42.0; + final double expectedAvgMetricValue = (valueToRecord1 + valueToRecord2) / 2; + verifyMetric(metricNamePrefix + "-avg", description1, valueToRecord1, valueToRecord2, expectedAvgMetricValue); + verifyMetric(metricNamePrefix + "-min", description2, valueToRecord1, valueToRecord2, valueToRecord1); + verifyMetric(metricNamePrefix + "-max", description3, valueToRecord1, valueToRecord2, valueToRecord2); + assertThat(metrics.metrics().size(), equalTo(3 + 1)); // one metric is added automatically in the constructor of Metrics + } + @Test public void shouldReturnMetricsVersionCurrent() { assertThat( diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java index 6c82209c33a..ea52b4e82d8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; -import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -29,20 +28,15 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.junit.Before; import org.junit.Test; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; -import static org.apache.kafka.test.StreamsTestUtils.getMetricByNameFilterByTags; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; public class NamedCacheTest { @@ -61,7 +55,7 @@ public class NamedCacheTest { } @Test - public void shouldKeepTrackOfMostRecentlyAndLeastRecentlyUsed() throws IOException { + public void shouldKeepTrackOfMostRecentlyAndLeastRecentlyUsed() { final List> toInsert = Arrays.asList( new KeyValue<>("K1", "V1"), new KeyValue<>("K2", "V2"), @@ -83,31 +77,6 @@ public class NamedCacheTest { } } - @Test - public void testMetrics() { - final Map metricTags = new LinkedHashMap<>(); - metricTags.put("record-cache-id", underlyingStoreName); - metricTags.put("task-id", taskIDString); - metricTags.put("client-id", "test"); - - getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-avg", "stream-record-cache-metrics", metricTags); - getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-min", "stream-record-cache-metrics", metricTags); - getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-max", "stream-record-cache-metrics", metricTags); - - // test "all" - metricTags.put("record-cache-id", "all"); - getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-avg", "stream-record-cache-metrics", metricTags); - getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-min", "stream-record-cache-metrics", metricTags); - getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-max", "stream-record-cache-metrics", metricTags); - - final JmxReporter reporter = new JmxReporter("kafka.streams"); - innerMetrics.addReporter(reporter); - assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-record-cache-metrics,client-id=test,task-id=%s,record-cache-id=%s", - taskIDString, underlyingStoreName))); - assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-record-cache-metrics,client-id=test,task-id=%s,record-cache-id=%s", - taskIDString, "all"))); - } - @Test public void shouldKeepTrackOfSize() { final LRUCacheEntry value = new LRUCacheEntry(new byte[]{0}); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/NamedCacheMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/NamedCacheMetricsTest.java new file mode 100644 index 00000000000..46ea302738b --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/NamedCacheMetricsTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals.metrics; + +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.Sensor.RecordingLevel; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Map; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.mock; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.powermock.api.easymock.PowerMock.createMock; +import static org.powermock.api.easymock.PowerMock.mockStatic; +import static org.powermock.api.easymock.PowerMock.replay; +import static org.powermock.api.easymock.PowerMock.verify; + + +@RunWith(PowerMockRunner.class) +@PrepareForTest({StreamsMetricsImpl.class, Sensor.class}) +public class NamedCacheMetricsTest { + + private static final String TASK_NAME = "taskName"; + private static final String STORE_NAME = "storeName"; + private static final String HIT_RATIO_AVG_DESCRIPTION = "The average cache hit ratio"; + private static final String HIT_RATIO_MIN_DESCRIPTION = "The minimum cache hit ratio"; + private static final String HIT_RATIO_MAX_DESCRIPTION = "The maximum cache hit ratio"; + + private final StreamsMetricsImpl streamsMetrics = createMock(StreamsMetricsImpl.class); + private final Sensor expectedSensor = mock(Sensor.class); + private final Map tagMap = mkMap(mkEntry("key", "value")); + + @Test + public void shouldGetHitRatioSensorWithBuiltInMetricsVersionCurrent() { + final String hitRatio = "hit-ratio"; + mockStatic(StreamsMetricsImpl.class); + setUpStreamsMetrics(Version.LATEST, hitRatio); + replay(streamsMetrics); + replay(StreamsMetricsImpl.class); + + final Sensor sensor = NamedCacheMetrics.hitRatioSensor(streamsMetrics, TASK_NAME, STORE_NAME); + + verifyResult(sensor); + } + + @Test + public void shouldGetHitRatioSensorWithBuiltInMetricsVersionBefore24() { + final Map parentTagMap = mkMap(mkEntry("key", "all")); + final String hitRatio = "hitRatio"; + final RecordingLevel recordingLevel = RecordingLevel.DEBUG; + mockStatic(StreamsMetricsImpl.class); + final Sensor parentSensor = mock(Sensor.class); + expect(streamsMetrics.taskLevelSensor(TASK_NAME, hitRatio, recordingLevel)).andReturn(parentSensor); + expect(streamsMetrics.cacheLevelTagMap(TASK_NAME, StreamsMetricsImpl.ROLLUP_VALUE)).andReturn(parentTagMap); + StreamsMetricsImpl.addAvgAndMinAndMaxToSensor( + parentSensor, + StreamsMetricsImpl.CACHE_LEVEL_GROUP, + parentTagMap, + hitRatio, + HIT_RATIO_AVG_DESCRIPTION, + HIT_RATIO_MIN_DESCRIPTION, + HIT_RATIO_MAX_DESCRIPTION); + setUpStreamsMetrics(Version.FROM_100_TO_23, hitRatio, parentSensor); + replay(streamsMetrics); + replay(StreamsMetricsImpl.class); + + final Sensor sensor = NamedCacheMetrics.hitRatioSensor(streamsMetrics, TASK_NAME, STORE_NAME); + + verifyResult(sensor); + } + + private void setUpStreamsMetrics(final Version builtInMetricsVersion, + final String hitRatio, + final Sensor... parents) { + expect(streamsMetrics.version()).andReturn(builtInMetricsVersion); + expect(streamsMetrics.cacheLevelSensor(TASK_NAME, STORE_NAME, hitRatio, RecordingLevel.DEBUG, parents)) + .andReturn(expectedSensor); + expect(streamsMetrics.cacheLevelTagMap(TASK_NAME, STORE_NAME)).andReturn(tagMap); + StreamsMetricsImpl.addAvgAndMinAndMaxToSensor( + expectedSensor, + StreamsMetricsImpl.CACHE_LEVEL_GROUP, + tagMap, + hitRatio, + HIT_RATIO_AVG_DESCRIPTION, + HIT_RATIO_MIN_DESCRIPTION, + HIT_RATIO_MAX_DESCRIPTION); + } + + private void verifyResult(final Sensor sensor) { + verify(streamsMetrics); + verify(StreamsMetricsImpl.class); + assertThat(sensor, is(expectedSensor)); + } +} \ No newline at end of file