KAFKA-8859: Refactor cache-level metrics (#7367)

Cache-level metrics are refactor according to KIP-444:

tag client-id changed to thread-id
name hitRatio changed to hit-ratio
made backward compatible by using streams config built.in.metrics.version

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bbejeck@gmail.com>
This commit is contained in:
Bruno Cadonna 2019-09-23 07:12:45 -07:00 committed by Bill Bejeck
parent d91a94e7bf
commit e98e239a0c
7 changed files with 367 additions and 130 deletions

View File

@ -25,6 +25,7 @@ import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeCount;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.metrics.stats.WindowedCount;
@ -63,9 +64,11 @@ public class StreamsMetricsImpl implements StreamsMetrics {
private static final String SENSOR_PREFIX_DELIMITER = ".";
private static final String SENSOR_NAME_DELIMITER = ".s.";
public static final String THREAD_ID_TAG = "client-id";
public static final String THREAD_ID_TAG = "thread-id";
public static final String THREAD_ID_TAG_0100_TO_23 = "client-id";
public static final String TASK_ID_TAG = "task-id";
public static final String STORE_ID_TAG = "state-id";
public static final String RECORD_CACHE_ID_TAG = "record-cache-id";
public static final String ROLLUP_VALUE = "all";
@ -77,6 +80,11 @@ public class StreamsMetricsImpl implements StreamsMetrics {
public static final String TOTAL_SUFFIX = "-total";
public static final String RATIO_SUFFIX = "-ratio";
public static final String AVG_VALUE_DOC = "The average value of ";
public static final String MAX_VALUE_DOC = "The maximum value of ";
public static final String AVG_LATENCY_DOC = "The average latency of ";
public static final String MAX_LATENCY_DOC = "The maximum latency of ";
public static final String GROUP_PREFIX_WO_DELIMITER = "stream";
public static final String GROUP_PREFIX = GROUP_PREFIX_WO_DELIMITER + "-";
public static final String GROUP_SUFFIX = "-metrics";
@ -84,6 +92,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
public static final String THREAD_LEVEL_GROUP = GROUP_PREFIX_WO_DELIMITER + GROUP_SUFFIX;
public static final String TASK_LEVEL_GROUP = GROUP_PREFIX + "task" + GROUP_SUFFIX;
public static final String STATE_LEVEL_GROUP = GROUP_PREFIX + "state" + GROUP_SUFFIX;
public static final String CACHE_LEVEL_GROUP = GROUP_PREFIX + "record-cache" + GROUP_SUFFIX;
public static final String PROCESSOR_NODE_METRICS_GROUP = "stream-processor-node-metrics";
public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
@ -130,7 +139,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
public Map<String, String> threadLevelTagMap() {
final Map<String, String> tagMap = new LinkedHashMap<>();
tagMap.put(THREAD_ID_TAG, threadName);
tagMap.put(THREAD_ID_TAG_0100_TO_23, threadName);
return tagMap;
}
@ -237,12 +246,12 @@ public class StreamsMetricsImpl implements StreamsMetrics {
return taskSensorPrefix(taskName) + SENSOR_PREFIX_DELIMITER + "node" + SENSOR_PREFIX_DELIMITER + processorNodeName;
}
public final Sensor cacheLevelSensor(final String taskName,
final String cacheName,
final String sensorName,
final Sensor.RecordingLevel recordingLevel,
final Sensor... parents) {
final String key = cacheSensorPrefix(taskName, cacheName);
public Sensor cacheLevelSensor(final String taskName,
final String storeName,
final String sensorName,
final Sensor.RecordingLevel recordingLevel,
final Sensor... parents) {
final String key = cacheSensorPrefix(taskName, storeName);
synchronized (cacheLevelSensors) {
if (!cacheLevelSensors.containsKey(key)) {
cacheLevelSensors.put(key, new LinkedList<>());
@ -258,6 +267,18 @@ public class StreamsMetricsImpl implements StreamsMetrics {
}
}
public Map<String, String> cacheLevelTagMap(final String taskName, final String storeName) {
final Map<String, String> tagMap = new LinkedHashMap<>();
tagMap.put(TASK_ID_TAG, taskName);
tagMap.put(RECORD_CACHE_ID_TAG, storeName);
if (version == Version.FROM_100_TO_23) {
tagMap.put(THREAD_ID_TAG_0100_TO_23, Thread.currentThread().getName());
} else {
tagMap.put(THREAD_ID_TAG, Thread.currentThread().getName());
}
return tagMap;
}
public final void removeAllCacheLevelSensors(final String taskName, final String cacheName) {
final String key = cacheSensorPrefix(taskName, cacheName);
synchronized (cacheLevelSensors) {
@ -423,15 +444,17 @@ public class StreamsMetricsImpl implements StreamsMetrics {
}
public static void addAvgAndMaxToSensor(final Sensor sensor,
private static void addAvgAndMaxToSensor(final Sensor sensor,
final String group,
final Map<String, String> tags,
final String operation) {
final String operation,
final String descriptionOfAvg,
final String descriptionOfMax) {
sensor.add(
new MetricName(
operation + AVG_SUFFIX,
group,
"The average value of " + operation + ".",
descriptionOfAvg,
tags),
new Avg()
);
@ -439,12 +462,26 @@ public class StreamsMetricsImpl implements StreamsMetrics {
new MetricName(
operation + MAX_SUFFIX,
group,
"The max value of " + operation + ".",
descriptionOfMax,
tags),
new Max()
);
}
public static void addAvgAndMaxToSensor(final Sensor sensor,
final String group,
final Map<String, String> tags,
final String operation) {
addAvgAndMaxToSensor(
sensor,
group,
tags,
operation,
AVG_VALUE_DOC + operation + ".",
MAX_VALUE_DOC + operation + "."
);
}
public static void addAvgAndMaxLatencyToSensor(final Sensor sensor,
final String group,
final Map<String, String> tags,
@ -453,7 +490,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
new MetricName(
operation + "-latency-avg",
group,
"The average latency of " + operation + " operation.",
AVG_LATENCY_DOC + operation + " operation.",
tags),
new Avg()
);
@ -461,12 +498,30 @@ public class StreamsMetricsImpl implements StreamsMetrics {
new MetricName(
operation + "-latency-max",
group,
"The max latency of " + operation + " operation.",
MAX_LATENCY_DOC + operation + " operation.",
tags),
new Max()
);
}
public static void addAvgAndMinAndMaxToSensor(final Sensor sensor,
final String group,
final Map<String, String> tags,
final String operation,
final String descriptionOfAvg,
final String descriptionOfMin,
final String descriptionOfMax) {
addAvgAndMaxToSensor(sensor, group, tags, operation, descriptionOfAvg, descriptionOfMax);
sensor.add(
new MetricName(
operation + MIN_SUFFIX,
group,
descriptionOfMin,
tags),
new Min()
);
}
public static void addInvocationRateAndCountToSensor(final Sensor sensor,
final String group,
final Map<String, String> tags,

View File

@ -19,14 +19,12 @@ package org.apache.kafka.streams.state.internals;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.metrics.NamedCacheMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -34,19 +32,22 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
class NamedCache {
private static final Logger log = LoggerFactory.getLogger(NamedCache.class);
private final String name;
private final String storeName;
private final String taskName;
private final NavigableMap<Bytes, LRUNode> cache = new TreeMap<>();
private final Set<Bytes> dirtyKeys = new LinkedHashSet<>();
private ThreadCache.DirtyEntryFlushListener listener;
private LRUNode tail;
private LRUNode head;
private long currentSizeBytes;
private final NamedCacheMetrics namedCacheMetrics;
private final StreamsMetricsImpl streamsMetrics;
private final Sensor hitRatioSensor;
// internal stats
private long numReadHits = 0;
@ -54,9 +55,12 @@ class NamedCache {
private long numOverwrites = 0;
private long numFlushes = 0;
NamedCache(final String name, final StreamsMetricsImpl metrics) {
NamedCache(final String name, final StreamsMetricsImpl streamsMetrics) {
this.name = name;
this.namedCacheMetrics = new NamedCacheMetrics(metrics, name);
this.streamsMetrics = streamsMetrics;
storeName = ThreadCache.underlyingStoreNamefromCacheName(name);
taskName = ThreadCache.taskIDfromCacheName(name);
hitRatioSensor = NamedCacheMetrics.hitRatioSensor(streamsMetrics, taskName, storeName);
}
synchronized final String name() {
@ -187,7 +191,7 @@ class NamedCache {
return null;
} else {
numReadHits++;
namedCacheMetrics.hitRatioSensor.record((double) numReadHits / (double) (numReadHits + numReadMisses));
hitRatioSensor.record((double) numReadHits / (double) (numReadHits + numReadMisses));
}
return node;
}
@ -311,7 +315,7 @@ class NamedCache {
currentSizeBytes = 0;
dirtyKeys.clear();
cache.clear();
namedCacheMetrics.removeAllSensors();
streamsMetrics.removeAllCacheLevelSensors(taskName, storeName);
}
/**
@ -357,68 +361,4 @@ class NamedCache {
}
}
private static class NamedCacheMetrics {
private final StreamsMetricsImpl metrics;
private final Sensor hitRatioSensor;
private final String taskName;
private final String cacheName;
private NamedCacheMetrics(final StreamsMetricsImpl metrics, final String cacheName) {
taskName = ThreadCache.taskIDfromCacheName(cacheName);
this.cacheName = cacheName;
this.metrics = metrics;
final String group = "stream-record-cache-metrics";
// add parent
final Map<String, String> allMetricTags = metrics.tagMap(
"task-id", taskName,
"record-cache-id", "all"
);
final Sensor taskLevelHitRatioSensor = metrics.taskLevelSensor(taskName, "hitRatio", Sensor.RecordingLevel.DEBUG);
taskLevelHitRatioSensor.add(
new MetricName("hitRatio-avg", group, "The average cache hit ratio.", allMetricTags),
new Avg()
);
taskLevelHitRatioSensor.add(
new MetricName("hitRatio-min", group, "The minimum cache hit ratio.", allMetricTags),
new Min()
);
taskLevelHitRatioSensor.add(
new MetricName("hitRatio-max", group, "The maximum cache hit ratio.", allMetricTags),
new Max()
);
// add child
final Map<String, String> metricTags = metrics.tagMap(
"task-id", taskName,
"record-cache-id", ThreadCache.underlyingStoreNamefromCacheName(cacheName)
);
hitRatioSensor = metrics.cacheLevelSensor(
taskName,
cacheName,
"hitRatio",
Sensor.RecordingLevel.DEBUG,
taskLevelHitRatioSensor
);
hitRatioSensor.add(
new MetricName("hitRatio-avg", group, "The average cache hit ratio.", metricTags),
new Avg()
);
hitRatioSensor.add(
new MetricName("hitRatio-min", group, "The minimum cache hit ratio.", metricTags),
new Min()
);
hitRatioSensor.add(
new MetricName("hitRatio-max", group, "The maximum cache hit ratio.", metricTags),
new Max()
);
}
private void removeAllSensors() {
metrics.removeAllCacheLevelSensors(taskName, cacheName);
}
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals.metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.CACHE_LEVEL_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version.FROM_100_TO_23;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMinAndMaxToSensor;
public class NamedCacheMetrics {
private NamedCacheMetrics() {}
private static final String HIT_RATIO_0100_TO_23 = "hitRatio";
private static final String HIT_RATIO = "hit-ratio";
private static final String HIT_RATIO_AVG_DESCRIPTION = "The average cache hit ratio";
private static final String HIT_RATIO_MIN_DESCRIPTION = "The minimum cache hit ratio";
private static final String HIT_RATIO_MAX_DESCRIPTION = "The maximum cache hit ratio";
public static Sensor hitRatioSensor(final StreamsMetricsImpl streamsMetrics,
final String taskName,
final String storeName) {
final Sensor hitRatioSensor;
final String hitRatioName;
if (streamsMetrics.version() == FROM_100_TO_23) {
hitRatioName = HIT_RATIO_0100_TO_23;
final Sensor taskLevelHitRatioSensor = streamsMetrics.taskLevelSensor(
taskName,
hitRatioName,
Sensor.RecordingLevel.DEBUG
);
addAvgAndMinAndMaxToSensor(
taskLevelHitRatioSensor,
CACHE_LEVEL_GROUP,
streamsMetrics.cacheLevelTagMap(taskName, ROLLUP_VALUE),
hitRatioName,
HIT_RATIO_AVG_DESCRIPTION,
HIT_RATIO_MIN_DESCRIPTION,
HIT_RATIO_MAX_DESCRIPTION
);
hitRatioSensor = streamsMetrics.cacheLevelSensor(
taskName,
storeName,
hitRatioName,
Sensor.RecordingLevel.DEBUG,
taskLevelHitRatioSensor
);
} else {
hitRatioName = HIT_RATIO;
hitRatioSensor = streamsMetrics.cacheLevelSensor(
taskName,
storeName,
hitRatioName,
Sensor.RecordingLevel.DEBUG
);
}
addAvgAndMinAndMaxToSensor(
hitRatioSensor,
CACHE_LEVEL_GROUP,
streamsMetrics.cacheLevelTagMap(taskName, storeName),
hitRatioName,
HIT_RATIO_AVG_DESCRIPTION,
HIT_RATIO_MIN_DESCRIPTION,
HIT_RATIO_MAX_DESCRIPTION
);
return hitRatioSensor;
}
}

View File

@ -151,9 +151,12 @@ public class MetricsIntegrationTest {
private static final String SKIPPED_RECORDS_TOTAL = "skipped-records-total";
private static final String RECORD_LATENESS_AVG = "record-lateness-avg";
private static final String RECORD_LATENESS_MAX = "record-lateness-max";
private static final String HIT_RATIO_AVG = "hitRatio-avg";
private static final String HIT_RATIO_MIN = "hitRatio-min";
private static final String HIT_RATIO_MAX = "hitRatio-max";
private static final String HIT_RATIO_AVG_BEFORE_24 = "hitRatio-avg";
private static final String HIT_RATIO_MIN_BEFORE_24 = "hitRatio-min";
private static final String HIT_RATIO_MAX_BEFORE_24 = "hitRatio-max";
private static final String HIT_RATIO_AVG = "hit-ratio-avg";
private static final String HIT_RATIO_MIN = "hit-ratio-min";
private static final String HIT_RATIO_MAX = "hit-ratio-max";
// RocksDB metrics
private static final String BYTES_WRITTEN_RATE = "bytes-written-rate";
@ -275,7 +278,18 @@ public class MetricsIntegrationTest {
}
@Test
public void shouldAddMetricsOnAllLevels() throws Exception {
public void shouldAddMetricsOnAllLevelsWithBuiltInMetricsLatestVersion() throws Exception {
shouldAddMetricsOnAllLevels(StreamsConfig.METRICS_LATEST);
}
@Test
public void shouldAddMetricsOnAllLevelsWithBuiltInMetricsVersion0100To23() throws Exception {
shouldAddMetricsOnAllLevels(StreamsConfig.METRICS_0100_TO_23);
}
private void shouldAddMetricsOnAllLevels(final String builtInMetricsVersion) throws Exception {
streamsConfiguration.put(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion);
builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()))
.to(STREAM_OUTPUT_1, Produced.with(Serdes.Integer(), Serdes.String()));
builder.table(STREAM_OUTPUT_1,
@ -299,7 +313,7 @@ public class MetricsIntegrationTest {
checkKeyValueStoreMetricsByGroup(STREAM_STORE_ROCKSDB_STATE_METRICS);
checkKeyValueStoreMetricsByGroup(STREAM_STORE_IN_MEMORY_LRU_STATE_METRICS);
checkRocksDBMetricsByTag("rocksdb-state-id");
checkCacheMetrics();
checkCacheMetrics(builtInMetricsVersion);
closeApplication();
@ -526,13 +540,25 @@ public class MetricsIntegrationTest {
assertThat(listMetricAfterClosingApp.size(), is(0));
}
private void checkCacheMetrics() {
private void checkCacheMetrics(final String builtInMetricsVersion) {
final List<Metric> listMetricCache = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
.filter(m -> m.metricName().group().equals(STREAM_CACHE_NODE_METRICS))
.collect(Collectors.toList());
checkMetricByName(listMetricCache, HIT_RATIO_AVG, 6);
checkMetricByName(listMetricCache, HIT_RATIO_MIN, 6);
checkMetricByName(listMetricCache, HIT_RATIO_MAX, 6);
checkMetricByName(
listMetricCache,
builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST) ? HIT_RATIO_AVG : HIT_RATIO_AVG_BEFORE_24,
builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST) ? 3 : 6 /* includes parent sensors */
);
checkMetricByName(
listMetricCache,
builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST) ? HIT_RATIO_MIN : HIT_RATIO_MIN_BEFORE_24,
builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST) ? 3 : 6 /* includes parent sensors */
);
checkMetricByName(
listMetricCache,
builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST) ? HIT_RATIO_MAX : HIT_RATIO_MAX_BEFORE_24,
builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST) ? 3 : 6 /* includes parent sensors */
);
}
private void checkWindowStoreMetrics() {

View File

@ -62,6 +62,7 @@ public class StreamsMetricsImplTest extends EasyMockSupport {
private final Map<String, String> tags = mkMap(mkEntry("tag", "value"));
private final String description1 = "description number one";
private final String description2 = "description number two";
private final String description3 = "description number three";
private final MockTime time = new MockTime(0);
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_NAME, VERSION);
@ -252,11 +253,40 @@ public class StreamsMetricsImplTest extends EasyMockSupport {
final Map<String, String> tagMap = streamsMetrics.storeLevelTagMap(taskName, storeType, storeName);
assertThat(tagMap.size(), equalTo(3));
assertThat(tagMap.get(StreamsMetricsImpl.THREAD_ID_TAG), equalTo(THREAD_NAME));
assertThat(tagMap.get(StreamsMetricsImpl.THREAD_ID_TAG_0100_TO_23), equalTo(THREAD_NAME));
assertThat(tagMap.get(StreamsMetricsImpl.TASK_ID_TAG), equalTo(taskName));
assertThat(tagMap.get(storeType + "-" + StreamsMetricsImpl.STORE_ID_TAG), equalTo(storeName));
}
@Test
public void shouldGetCacheLevelTagMapForBuiltInMetricsLatestVersion() {
shouldGetCacheLevelTagMap(StreamsConfig.METRICS_LATEST);
}
@Test
public void shouldGetCacheLevelTagMapForBuiltInMetricsVersion0100To23() {
shouldGetCacheLevelTagMap(StreamsConfig.METRICS_0100_TO_23);
}
private void shouldGetCacheLevelTagMap(final String builtInMetricsVersion) {
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, THREAD_NAME, builtInMetricsVersion);
final String taskName = "taskName";
final String storeName = "storeName";
final Map<String, String> tagMap = streamsMetrics.cacheLevelTagMap(taskName, storeName);
assertThat(tagMap.size(), equalTo(3));
assertThat(
tagMap.get(
builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST) ? StreamsMetricsImpl.THREAD_ID_TAG
: StreamsMetricsImpl.THREAD_ID_TAG_0100_TO_23),
equalTo(Thread.currentThread().getName())
);
assertThat(tagMap.get(StreamsMetricsImpl.TASK_ID_TAG), equalTo(taskName));
assertThat(tagMap.get(StreamsMetricsImpl.RECORD_CACHE_ID_TAG), equalTo(storeName));
}
@Test
public void shouldAddAmountRateAndSum() {
StreamsMetricsImpl
@ -325,6 +355,20 @@ public class StreamsMetricsImplTest extends EasyMockSupport {
assertThat(metrics.metrics().size(), equalTo(2 + 1)); // one metric is added automatically in the constructor of Metrics
}
@Test
public void shouldAddAvgAndMinAndMaxMetricsToSensor() {
StreamsMetricsImpl
.addAvgAndMinAndMaxToSensor(sensor, group, tags, metricNamePrefix, description1, description2, description3);
final double valueToRecord1 = 18.0;
final double valueToRecord2 = 42.0;
final double expectedAvgMetricValue = (valueToRecord1 + valueToRecord2) / 2;
verifyMetric(metricNamePrefix + "-avg", description1, valueToRecord1, valueToRecord2, expectedAvgMetricValue);
verifyMetric(metricNamePrefix + "-min", description2, valueToRecord1, valueToRecord2, valueToRecord1);
verifyMetric(metricNamePrefix + "-max", description3, valueToRecord1, valueToRecord2, valueToRecord2);
assertThat(metrics.metrics().size(), equalTo(3 + 1)); // one metric is added automatically in the constructor of Metrics
}
@Test
public void shouldReturnMetricsVersionCurrent() {
assertThat(

View File

@ -20,7 +20,6 @@ import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
@ -29,20 +28,15 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByNameFilterByTags;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
public class NamedCacheTest {
@ -61,7 +55,7 @@ public class NamedCacheTest {
}
@Test
public void shouldKeepTrackOfMostRecentlyAndLeastRecentlyUsed() throws IOException {
public void shouldKeepTrackOfMostRecentlyAndLeastRecentlyUsed() {
final List<KeyValue<String, String>> toInsert = Arrays.asList(
new KeyValue<>("K1", "V1"),
new KeyValue<>("K2", "V2"),
@ -83,31 +77,6 @@ public class NamedCacheTest {
}
}
@Test
public void testMetrics() {
final Map<String, String> metricTags = new LinkedHashMap<>();
metricTags.put("record-cache-id", underlyingStoreName);
metricTags.put("task-id", taskIDString);
metricTags.put("client-id", "test");
getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-avg", "stream-record-cache-metrics", metricTags);
getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-min", "stream-record-cache-metrics", metricTags);
getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-max", "stream-record-cache-metrics", metricTags);
// test "all"
metricTags.put("record-cache-id", "all");
getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-avg", "stream-record-cache-metrics", metricTags);
getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-min", "stream-record-cache-metrics", metricTags);
getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-max", "stream-record-cache-metrics", metricTags);
final JmxReporter reporter = new JmxReporter("kafka.streams");
innerMetrics.addReporter(reporter);
assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-record-cache-metrics,client-id=test,task-id=%s,record-cache-id=%s",
taskIDString, underlyingStoreName)));
assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-record-cache-metrics,client-id=test,task-id=%s,record-cache-id=%s",
taskIDString, "all")));
}
@Test
public void shouldKeepTrackOfSize() {
final LRUCacheEntry value = new LRUCacheEntry(new byte[]{0});

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals.metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.Map;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.powermock.api.easymock.PowerMock.createMock;
import static org.powermock.api.easymock.PowerMock.mockStatic;
import static org.powermock.api.easymock.PowerMock.replay;
import static org.powermock.api.easymock.PowerMock.verify;
@RunWith(PowerMockRunner.class)
@PrepareForTest({StreamsMetricsImpl.class, Sensor.class})
public class NamedCacheMetricsTest {
private static final String TASK_NAME = "taskName";
private static final String STORE_NAME = "storeName";
private static final String HIT_RATIO_AVG_DESCRIPTION = "The average cache hit ratio";
private static final String HIT_RATIO_MIN_DESCRIPTION = "The minimum cache hit ratio";
private static final String HIT_RATIO_MAX_DESCRIPTION = "The maximum cache hit ratio";
private final StreamsMetricsImpl streamsMetrics = createMock(StreamsMetricsImpl.class);
private final Sensor expectedSensor = mock(Sensor.class);
private final Map<String, String> tagMap = mkMap(mkEntry("key", "value"));
@Test
public void shouldGetHitRatioSensorWithBuiltInMetricsVersionCurrent() {
final String hitRatio = "hit-ratio";
mockStatic(StreamsMetricsImpl.class);
setUpStreamsMetrics(Version.LATEST, hitRatio);
replay(streamsMetrics);
replay(StreamsMetricsImpl.class);
final Sensor sensor = NamedCacheMetrics.hitRatioSensor(streamsMetrics, TASK_NAME, STORE_NAME);
verifyResult(sensor);
}
@Test
public void shouldGetHitRatioSensorWithBuiltInMetricsVersionBefore24() {
final Map<String, String> parentTagMap = mkMap(mkEntry("key", "all"));
final String hitRatio = "hitRatio";
final RecordingLevel recordingLevel = RecordingLevel.DEBUG;
mockStatic(StreamsMetricsImpl.class);
final Sensor parentSensor = mock(Sensor.class);
expect(streamsMetrics.taskLevelSensor(TASK_NAME, hitRatio, recordingLevel)).andReturn(parentSensor);
expect(streamsMetrics.cacheLevelTagMap(TASK_NAME, StreamsMetricsImpl.ROLLUP_VALUE)).andReturn(parentTagMap);
StreamsMetricsImpl.addAvgAndMinAndMaxToSensor(
parentSensor,
StreamsMetricsImpl.CACHE_LEVEL_GROUP,
parentTagMap,
hitRatio,
HIT_RATIO_AVG_DESCRIPTION,
HIT_RATIO_MIN_DESCRIPTION,
HIT_RATIO_MAX_DESCRIPTION);
setUpStreamsMetrics(Version.FROM_100_TO_23, hitRatio, parentSensor);
replay(streamsMetrics);
replay(StreamsMetricsImpl.class);
final Sensor sensor = NamedCacheMetrics.hitRatioSensor(streamsMetrics, TASK_NAME, STORE_NAME);
verifyResult(sensor);
}
private void setUpStreamsMetrics(final Version builtInMetricsVersion,
final String hitRatio,
final Sensor... parents) {
expect(streamsMetrics.version()).andReturn(builtInMetricsVersion);
expect(streamsMetrics.cacheLevelSensor(TASK_NAME, STORE_NAME, hitRatio, RecordingLevel.DEBUG, parents))
.andReturn(expectedSensor);
expect(streamsMetrics.cacheLevelTagMap(TASK_NAME, STORE_NAME)).andReturn(tagMap);
StreamsMetricsImpl.addAvgAndMinAndMaxToSensor(
expectedSensor,
StreamsMetricsImpl.CACHE_LEVEL_GROUP,
tagMap,
hitRatio,
HIT_RATIO_AVG_DESCRIPTION,
HIT_RATIO_MIN_DESCRIPTION,
HIT_RATIO_MAX_DESCRIPTION);
}
private void verifyResult(final Sensor sensor) {
verify(streamsMetrics);
verify(StreamsMetricsImpl.class);
assertThat(sensor, is(expectedSensor));
}
}