MINOR: Refactor tag key for store level metrics (#7257)

The tag key for store level metrics specified in StreamsMetricsImpl
is unified with the tag keys on thread and task level.

Reviewers: Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
This commit is contained in:
Bruno Cadonna 2019-08-30 15:46:07 +02:00 committed by Bill Bejeck
parent fb381cb6c7
commit d18d6b033e
16 changed files with 69 additions and 60 deletions

View File

@ -57,9 +57,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
public static final String THREAD_ID_TAG = "client-id"; public static final String THREAD_ID_TAG = "client-id";
public static final String TASK_ID_TAG = "task-id"; public static final String TASK_ID_TAG = "task-id";
public static final String STORE_ID_TAG = "id"; public static final String STORE_ID_TAG = "state-id";
public static final String ALL_TASKS = "all"; public static final String ROLLUP_VALUE = "all";
public static final String LATENCY_SUFFIX = "-latency"; public static final String LATENCY_SUFFIX = "-latency";
public static final String AVG_SUFFIX = "-avg"; public static final String AVG_SUFFIX = "-avg";
@ -69,9 +69,13 @@ public class StreamsMetricsImpl implements StreamsMetrics {
public static final String TOTAL_SUFFIX = "-total"; public static final String TOTAL_SUFFIX = "-total";
public static final String RATIO_SUFFIX = "-ratio"; public static final String RATIO_SUFFIX = "-ratio";
public static final String THREAD_LEVEL_GROUP = "stream-metrics"; public static final String GROUP_PREFIX_WO_DELIMITER = "stream";
public static final String TASK_LEVEL_GROUP = "stream-task-metrics"; public static final String GROUP_PREFIX = GROUP_PREFIX_WO_DELIMITER + "-";
public static final String STATE_LEVEL_GROUP = "stream-state-metrics"; public static final String GROUP_SUFFIX = "-metrics";
public static final String STATE_LEVEL_GROUP_SUFFIX = "-state" + GROUP_SUFFIX;
public static final String THREAD_LEVEL_GROUP = GROUP_PREFIX_WO_DELIMITER + GROUP_SUFFIX;
public static final String TASK_LEVEL_GROUP = GROUP_PREFIX + "task" + GROUP_SUFFIX;
public static final String STATE_LEVEL_GROUP = GROUP_PREFIX + "state" + GROUP_SUFFIX;
public static final String PROCESSOR_NODE_METRICS_GROUP = "stream-processor-node-metrics"; public static final String PROCESSOR_NODE_METRICS_GROUP = "stream-processor-node-metrics";
public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id"; public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";

View File

@ -21,7 +21,7 @@ import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import java.util.Map; import java.util.Map;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ALL_TASKS; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_ID_TAG; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_ID_TAG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_LEVEL_GROUP; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_LEVEL_GROUP;
@ -162,7 +162,7 @@ public class ThreadMetrics {
public static Sensor commitOverTasksSensor(final StreamsMetricsImpl streamsMetrics) { public static Sensor commitOverTasksSensor(final StreamsMetricsImpl streamsMetrics) {
final Sensor commitOverTasksSensor = streamsMetrics.threadLevelSensor(COMMIT, Sensor.RecordingLevel.DEBUG); final Sensor commitOverTasksSensor = streamsMetrics.threadLevelSensor(COMMIT, Sensor.RecordingLevel.DEBUG);
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(TASK_ID_TAG, ALL_TASKS); final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(TASK_ID_TAG, ROLLUP_VALUE);
addAvgAndMaxToSensor(commitOverTasksSensor, addAvgAndMaxToSensor(commitOverTasksSensor,
TASK_LEVEL_GROUP, TASK_LEVEL_GROUP,
tagMap, tagMap,

View File

@ -135,7 +135,7 @@ public final class Stores {
@Override @Override
public String metricsScope() { public String metricsScope() {
return "in-memory-state"; return "in-memory";
} }
}; };
} }
@ -169,7 +169,7 @@ public final class Stores {
@Override @Override
public String metricsScope() { public String metricsScope() {
return "in-memory-lru-state"; return "in-memory-lru";
} }
}; };
} }

View File

@ -42,7 +42,7 @@ public class InMemorySessionBytesStoreSupplier implements SessionBytesStoreSuppl
@Override @Override
public String metricsScope() { public String metricsScope() {
return "in-memory-session-state"; return "in-memory-session";
} }
// In-memory store is not *really* segmented, so just say it is 1 (for ordering consistency with caching enabled) // In-memory store is not *really* segmented, so just say it is 1 (for ordering consistency with caching enabled)

View File

@ -52,7 +52,7 @@ public class InMemoryWindowBytesStoreSupplier implements WindowBytesStoreSupplie
@Override @Override
public String metricsScope() { public String metricsScope() {
return "in-memory-window-state"; return "in-memory-window";
} }
@Deprecated @Deprecated

View File

@ -35,6 +35,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG; import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static org.apache.kafka.streams.state.internals.metrics.Sensors.createTaskAndStoreLatencyAndThroughputSensors; import static org.apache.kafka.streams.state.internals.metrics.Sensors.createTaskAndStoreLatencyAndThroughputSensors;
/** /**
@ -53,7 +54,7 @@ public class MeteredKeyValueStore<K, V>
final Serde<V> valueSerde; final Serde<V> valueSerde;
StateSerdes<K, V> serdes; StateSerdes<K, V> serdes;
private final String metricScope; private final String metricsScope;
protected final Time time; protected final Time time;
private Sensor putTime; private Sensor putTime;
private Sensor putIfAbsentTime; private Sensor putIfAbsentTime;
@ -67,12 +68,12 @@ public class MeteredKeyValueStore<K, V>
private String taskName; private String taskName;
MeteredKeyValueStore(final KeyValueStore<Bytes, byte[]> inner, MeteredKeyValueStore(final KeyValueStore<Bytes, byte[]> inner,
final String metricScope, final String metricsScope,
final Time time, final Time time,
final Serde<K> keySerde, final Serde<K> keySerde,
final Serde<V> valueSerde) { final Serde<V> valueSerde) {
super(inner); super(inner);
this.metricScope = metricScope; this.metricsScope = metricsScope;
this.time = time != null ? time : Time.SYSTEM; this.time = time != null ? time : Time.SYSTEM;
this.keySerde = keySerde; this.keySerde = keySerde;
this.valueSerde = valueSerde; this.valueSerde = valueSerde;
@ -84,9 +85,9 @@ public class MeteredKeyValueStore<K, V>
metrics = (StreamsMetricsImpl) context.metrics(); metrics = (StreamsMetricsImpl) context.metrics();
taskName = context.taskId().toString(); taskName = context.taskId().toString();
final String metricsGroup = "stream-" + metricScope + "-metrics"; final String metricsGroup = "stream-" + metricsScope + "-state-metrics";
final Map<String, String> taskTags = metrics.tagMap("task-id", taskName, metricScope + "-id", "all"); final Map<String, String> taskTags = metrics.storeLevelTagMap(taskName, metricsScope, ROLLUP_VALUE);
final Map<String, String> storeTags = metrics.tagMap("task-id", taskName, metricScope + "-id", name()); final Map<String, String> storeTags = metrics.storeLevelTagMap(taskName, metricsScope, name());
initStoreSerde(context); initStoreSerde(context);

View File

@ -34,13 +34,14 @@ import java.util.Map;
import java.util.Objects; import java.util.Objects;
import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG; import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static org.apache.kafka.streams.state.internals.metrics.Sensors.createTaskAndStoreLatencyAndThroughputSensors; import static org.apache.kafka.streams.state.internals.metrics.Sensors.createTaskAndStoreLatencyAndThroughputSensors;
public class MeteredSessionStore<K, V> public class MeteredSessionStore<K, V>
extends WrappedStateStore<SessionStore<Bytes, byte[]>, Windowed<K>, V> extends WrappedStateStore<SessionStore<Bytes, byte[]>, Windowed<K>, V>
implements SessionStore<K, V> { implements SessionStore<K, V> {
private final String metricScope; private final String metricsScope;
private final Serde<K> keySerde; private final Serde<K> keySerde;
private final Serde<V> valueSerde; private final Serde<V> valueSerde;
private final Time time; private final Time time;
@ -53,12 +54,12 @@ public class MeteredSessionStore<K, V>
private String taskName; private String taskName;
MeteredSessionStore(final SessionStore<Bytes, byte[]> inner, MeteredSessionStore(final SessionStore<Bytes, byte[]> inner,
final String metricScope, final String metricsScope,
final Serde<K> keySerde, final Serde<K> keySerde,
final Serde<V> valueSerde, final Serde<V> valueSerde,
final Time time) { final Time time) {
super(inner); super(inner);
this.metricScope = metricScope; this.metricsScope = metricsScope;
this.keySerde = keySerde; this.keySerde = keySerde;
this.valueSerde = valueSerde; this.valueSerde = valueSerde;
this.time = time; this.time = time;
@ -76,9 +77,9 @@ public class MeteredSessionStore<K, V>
metrics = (StreamsMetricsImpl) context.metrics(); metrics = (StreamsMetricsImpl) context.metrics();
taskName = context.taskId().toString(); taskName = context.taskId().toString();
final String metricsGroup = "stream-" + metricScope + "-metrics"; final String metricsGroup = "stream-" + metricsScope + "-state-metrics";
final Map<String, String> taskTags = metrics.tagMap("task-id", taskName, metricScope + "-id", "all"); final Map<String, String> taskTags = metrics.storeLevelTagMap(taskName, metricsScope, ROLLUP_VALUE);
final Map<String, String> storeTags = metrics.tagMap("task-id", taskName, metricScope + "-id", name()); final Map<String, String> storeTags = metrics.storeLevelTagMap(taskName, metricsScope, name());
putTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "put", metrics, metricsGroup, taskName, name(), taskTags, storeTags); putTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "put", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
fetchTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "fetch", metrics, metricsGroup, taskName, name(), taskTags, storeTags); fetchTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "fetch", metrics, metricsGroup, taskName, name(), taskTags, storeTags);

View File

@ -34,6 +34,9 @@ import org.apache.kafka.streams.state.WindowStoreIterator;
import java.util.Map; import java.util.Map;
import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG; import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.GROUP_PREFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.STATE_LEVEL_GROUP_SUFFIX;
import static org.apache.kafka.streams.state.internals.metrics.Sensors.createTaskAndStoreLatencyAndThroughputSensors; import static org.apache.kafka.streams.state.internals.metrics.Sensors.createTaskAndStoreLatencyAndThroughputSensors;
public class MeteredWindowStore<K, V> public class MeteredWindowStore<K, V>
@ -41,7 +44,7 @@ public class MeteredWindowStore<K, V>
implements WindowStore<K, V> { implements WindowStore<K, V> {
private final long windowSizeMs; private final long windowSizeMs;
private final String metricScope; private final String metricsScope;
private final Time time; private final Time time;
final Serde<K> keySerde; final Serde<K> keySerde;
final Serde<V> valueSerde; final Serde<V> valueSerde;
@ -55,13 +58,13 @@ public class MeteredWindowStore<K, V>
MeteredWindowStore(final WindowStore<Bytes, byte[]> inner, MeteredWindowStore(final WindowStore<Bytes, byte[]> inner,
final long windowSizeMs, final long windowSizeMs,
final String metricScope, final String metricsScope,
final Time time, final Time time,
final Serde<K> keySerde, final Serde<K> keySerde,
final Serde<V> valueSerde) { final Serde<V> valueSerde) {
super(inner); super(inner);
this.windowSizeMs = windowSizeMs; this.windowSizeMs = windowSizeMs;
this.metricScope = metricScope; this.metricsScope = metricsScope;
this.time = time; this.time = time;
this.keySerde = keySerde; this.keySerde = keySerde;
this.valueSerde = valueSerde; this.valueSerde = valueSerde;
@ -75,9 +78,9 @@ public class MeteredWindowStore<K, V>
metrics = (StreamsMetricsImpl) context.metrics(); metrics = (StreamsMetricsImpl) context.metrics();
taskName = context.taskId().toString(); taskName = context.taskId().toString();
final String metricsGroup = "stream-" + metricScope + "-metrics"; final String metricsGroup = GROUP_PREFIX + metricsScope + STATE_LEVEL_GROUP_SUFFIX;
final Map<String, String> taskTags = metrics.tagMap("task-id", taskName, metricScope + "-id", "all"); final Map<String, String> taskTags = metrics.storeLevelTagMap(taskName, metricsScope, ROLLUP_VALUE);
final Map<String, String> storeTags = metrics.tagMap("task-id", taskName, metricScope + "-id", name()); final Map<String, String> storeTags = metrics.storeLevelTagMap(taskName, metricsScope, name());
putTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "put", metrics, metricsGroup, taskName, name(), taskTags, storeTags); putTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "put", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
fetchTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "fetch", metrics, metricsGroup, taskName, name(), taskTags, storeTags); fetchTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "fetch", metrics, metricsGroup, taskName, name(), taskTags, storeTags);

View File

@ -45,6 +45,6 @@ public class RocksDbKeyValueBytesStoreSupplier implements KeyValueBytesStoreSupp
@Override @Override
public String metricsScope() { public String metricsScope() {
return "rocksdb-state"; return "rocksdb";
} }
} }

View File

@ -48,7 +48,7 @@ public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSuppli
@Override @Override
public String metricsScope() { public String metricsScope() {
return "rocksdb-session-state"; return "rocksdb-session";
} }
@Override @Override

View File

@ -74,7 +74,7 @@ public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier
@Override @Override
public String metricsScope() { public String metricsScope() {
return "rocksdb-window-state"; return "rocksdb-window";
} }
@Deprecated @Deprecated

View File

@ -27,7 +27,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ALL_TASKS; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_ID_TAG; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_ID_TAG;
import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expect;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
@ -225,7 +225,7 @@ public class ThreadMetricsTest {
final String rateDescription = "The average per-second number of commit calls over all tasks"; final String rateDescription = "The average per-second number of commit calls over all tasks";
mockStatic(StreamsMetricsImpl.class); mockStatic(StreamsMetricsImpl.class);
expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.DEBUG)).andReturn(dummySensor); expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.DEBUG)).andReturn(dummySensor);
expect(streamsMetrics.threadLevelTagMap(TASK_ID_TAG, ALL_TASKS)).andReturn(dummyTagMap); expect(streamsMetrics.threadLevelTagMap(TASK_ID_TAG, ROLLUP_VALUE)).andReturn(dummyTagMap);
StreamsMetricsImpl.addInvocationRateAndCountToSensor( StreamsMetricsImpl.addInvocationRateAndCountToSensor(
dummySensor, TASK_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); dummySensor, TASK_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
StreamsMetricsImpl.addAvgAndMaxToSensor( StreamsMetricsImpl.addAvgAndMaxToSensor(

View File

@ -65,7 +65,7 @@ public class MeteredKeyValueStoreTest {
private final Map<String, String> tags = mkMap( private final Map<String, String> tags = mkMap(
mkEntry("client-id", "test"), mkEntry("client-id", "test"),
mkEntry("task-id", taskId.toString()), mkEntry("task-id", taskId.toString()),
mkEntry("scope-id", "metered") mkEntry("scope-state-id", "metered")
); );
@Mock(type = MockType.NICE) @Mock(type = MockType.NICE)
private KeyValueStore<Bytes, byte[]> inner; private KeyValueStore<Bytes, byte[]> inner;
@ -105,9 +105,9 @@ public class MeteredKeyValueStoreTest {
init(); init();
final JmxReporter reporter = new JmxReporter("kafka.streams"); final JmxReporter reporter = new JmxReporter("kafka.streams");
metrics.addReporter(reporter); metrics.addReporter(reporter);
assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-%s-metrics,client-id=%s,task-id=%s,%s-id=%s", assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-%s-state-metrics,client-id=%s,task-id=%s,%s-state-id=%s",
"scope", "test", taskId.toString(), "scope", "metered"))); "scope", "test", taskId.toString(), "scope", "metered")));
assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-%s-metrics,client-id=%s,task-id=%s,%s-id=%s", assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-%s-state-metrics,client-id=%s,task-id=%s,%s-state-id=%s",
"scope", "test", taskId.toString(), "scope", "all"))); "scope", "test", taskId.toString(), "scope", "all")));
} }
@ -149,7 +149,7 @@ public class MeteredKeyValueStoreTest {
} }
private KafkaMetric metric(final String name) { private KafkaMetric metric(final String name) {
return this.metrics.metric(new MetricName(name, "stream-scope-metrics", "", this.tags)); return this.metrics.metric(new MetricName(name, "stream-scope-state-metrics", "", this.tags));
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -204,7 +204,7 @@ public class MeteredKeyValueStoreTest {
assertFalse(iterator.hasNext()); assertFalse(iterator.hasNext());
iterator.close(); iterator.close();
final KafkaMetric metric = metric(new MetricName("all-rate", "stream-scope-metrics", "", tags)); final KafkaMetric metric = metric(new MetricName("all-rate", "stream-scope-state-metrics", "", tags));
assertTrue((Double) metric.metricValue() > 0); assertTrue((Double) metric.metricValue() > 0);
verify(inner); verify(inner);
} }

View File

@ -67,7 +67,7 @@ public class MeteredSessionStoreTest {
private final Map<String, String> tags = mkMap( private final Map<String, String> tags = mkMap(
mkEntry("client-id", "test"), mkEntry("client-id", "test"),
mkEntry("task-id", taskId.toString()), mkEntry("task-id", taskId.toString()),
mkEntry("scope-id", "metered") mkEntry("scope-state-id", "metered")
); );
private final Metrics metrics = new Metrics(); private final Metrics metrics = new Metrics();
private MeteredSessionStore<String, String> metered; private MeteredSessionStore<String, String> metered;
@ -104,9 +104,9 @@ public class MeteredSessionStoreTest {
init(); init();
final JmxReporter reporter = new JmxReporter("kafka.streams"); final JmxReporter reporter = new JmxReporter("kafka.streams");
metrics.addReporter(reporter); metrics.addReporter(reporter);
assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-%s-metrics,client-id=%s,task-id=%s,%s-id=%s", assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-%s-state-metrics,client-id=%s,task-id=%s,%s-state-id=%s",
"scope", "test", taskId.toString(), "scope", "metered"))); "scope", "test", taskId.toString(), "scope", "metered")));
assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-%s-metrics,client-id=%s,task-id=%s,%s-id=%s", assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-%s-state-metrics,client-id=%s,task-id=%s,%s-state-id=%s",
"scope", "test", taskId.toString(), "scope", "all"))); "scope", "test", taskId.toString(), "scope", "all")));
} }
@ -287,7 +287,7 @@ public class MeteredSessionStoreTest {
} }
private KafkaMetric metric(final String name) { private KafkaMetric metric(final String name) {
return this.metrics.metric(new MetricName(name, "stream-scope-metrics", "", this.tags)); return this.metrics.metric(new MetricName(name, "stream-scope-state-metrics", "", this.tags));
} }
} }

View File

@ -68,7 +68,7 @@ public class MeteredTimestampedKeyValueStoreTest {
private final Map<String, String> tags = mkMap( private final Map<String, String> tags = mkMap(
mkEntry("client-id", "test"), mkEntry("client-id", "test"),
mkEntry("task-id", taskId.toString()), mkEntry("task-id", taskId.toString()),
mkEntry("scope-id", "metered") mkEntry("scope-state-id", "metered")
); );
@Mock(type = MockType.NICE) @Mock(type = MockType.NICE)
private KeyValueStore<Bytes, byte[]> inner; private KeyValueStore<Bytes, byte[]> inner;
@ -110,9 +110,9 @@ public class MeteredTimestampedKeyValueStoreTest {
init(); init();
final JmxReporter reporter = new JmxReporter("kafka.streams"); final JmxReporter reporter = new JmxReporter("kafka.streams");
metrics.addReporter(reporter); metrics.addReporter(reporter);
assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-%s-metrics,client-id=%s,task-id=%s,%s-id=%s", assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-%s-state-metrics,client-id=%s,task-id=%s,%s-state-id=%s",
"scope", "test", taskId.toString(), "scope", "metered"))); "scope", "test", taskId.toString(), "scope", "metered")));
assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-%s-metrics,client-id=%s,task-id=%s,%s-id=%s", assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-%s-state-metrics,client-id=%s,task-id=%s,%s-state-id=%s",
"scope", "test", taskId.toString(), "scope", "all"))); "scope", "test", taskId.toString(), "scope", "all")));
} }
@Test @Test
@ -153,7 +153,7 @@ public class MeteredTimestampedKeyValueStoreTest {
} }
private KafkaMetric metric(final String name) { private KafkaMetric metric(final String name) {
return this.metrics.metric(new MetricName(name, "stream-scope-metrics", "", this.tags)); return this.metrics.metric(new MetricName(name, "stream-scope-state-metrics", "", this.tags));
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -209,7 +209,7 @@ public class MeteredTimestampedKeyValueStoreTest {
assertFalse(iterator.hasNext()); assertFalse(iterator.hasNext());
iterator.close(); iterator.close();
final KafkaMetric metric = metric(new MetricName("all-rate", "stream-scope-metrics", "", tags)); final KafkaMetric metric = metric(new MetricName("all-rate", "stream-scope-state-metrics", "", tags));
assertTrue((Double) metric.metricValue() > 0); assertTrue((Double) metric.metricValue() > 0);
verify(inner); verify(inner);
} }

View File

@ -94,9 +94,9 @@ public class MeteredWindowStoreTest {
store.init(context, store); store.init(context, store);
final JmxReporter reporter = new JmxReporter("kafka.streams"); final JmxReporter reporter = new JmxReporter("kafka.streams");
metrics.addReporter(reporter); metrics.addReporter(reporter);
assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-%s-metrics,client-id=%s,task-id=%s,%s-id=%s", assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-%s-state-metrics,client-id=%s,task-id=%s,%s-state-id=%s",
"scope", "test", context.taskId().toString(), "scope", "mocked-store"))); "scope", "test", context.taskId().toString(), "scope", "mocked-store")));
assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-%s-metrics,client-id=%s,task-id=%s,%s-id=%s", assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-%s-state-metrics,client-id=%s,task-id=%s,%s-state-id=%s",
"scope", "test", context.taskId().toString(), "scope", "all"))); "scope", "test", context.taskId().toString(), "scope", "all")));
} }
@ -107,8 +107,8 @@ public class MeteredWindowStoreTest {
replay(innerStoreMock); replay(innerStoreMock);
store.init(context, store); store.init(context, store);
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics(); final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
assertEquals(1.0, getMetricByNameFilterByTags(metrics, "restore-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue()); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "restore-total", "stream-scope-state-metrics", singletonMap("scope-state-id", "all")).metricValue());
assertEquals(1.0, getMetricByNameFilterByTags(metrics, "restore-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue()); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "restore-total", "stream-scope-state-metrics", singletonMap("scope-state-id", "mocked-store")).metricValue());
} }
@Test @Test
@ -121,8 +121,8 @@ public class MeteredWindowStoreTest {
store.init(context, store); store.init(context, store);
store.put("a", "a"); store.put("a", "a");
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics(); final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
assertEquals(1.0, getMetricByNameFilterByTags(metrics, "put-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue()); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "put-total", "stream-scope-state-metrics", singletonMap("scope-state-id", "all")).metricValue());
assertEquals(1.0, getMetricByNameFilterByTags(metrics, "put-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue()); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "put-total", "stream-scope-state-metrics", singletonMap("scope-state-id", "mocked-store")).metricValue());
verify(innerStoreMock); verify(innerStoreMock);
} }
@ -134,8 +134,8 @@ public class MeteredWindowStoreTest {
store.init(context, store); store.init(context, store);
store.fetch("a", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; store.fetch("a", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics(); final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue()); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-state-metrics", singletonMap("scope-state-id", "all")).metricValue());
assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue()); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-state-metrics", singletonMap("scope-state-id", "mocked-store")).metricValue());
verify(innerStoreMock); verify(innerStoreMock);
} }
@ -147,8 +147,8 @@ public class MeteredWindowStoreTest {
store.init(context, store); store.init(context, store);
store.fetch("a", "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; store.fetch("a", "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics(); final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue()); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-state-metrics", singletonMap("scope-state-id", "all")).metricValue());
assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue()); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-state-metrics", singletonMap("scope-state-id", "mocked-store")).metricValue());
verify(innerStoreMock); verify(innerStoreMock);
} }
@ -161,8 +161,8 @@ public class MeteredWindowStoreTest {
store.init(context, store); store.init(context, store);
store.flush(); store.flush();
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics(); final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
assertEquals(1.0, getMetricByNameFilterByTags(metrics, "flush-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue()); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "flush-total", "stream-scope-state-metrics", singletonMap("scope-state-id", "all")).metricValue());
assertEquals(1.0, getMetricByNameFilterByTags(metrics, "flush-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue()); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "flush-total", "stream-scope-state-metrics", singletonMap("scope-state-id", "mocked-store")).metricValue());
verify(innerStoreMock); verify(innerStoreMock);
} }