diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java index 2ef50d54abf..bdeed2f8978 100644 --- a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java +++ b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java @@ -66,6 +66,7 @@ public class OpenIterators { public void remove(final MeteredIterator iterator) { if (numOpenIterators.intValue() == 1) { streamsMetrics.removeMetric(metricName); + streamsMetrics.removeStoreLevelMetric(metricName); } numOpenIterators.decrement(); openIterators.remove(iterator); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index 7c22c94e9af..a0999a36c60 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -41,12 +41,14 @@ import java.util.Deque; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import java.util.stream.Collectors; public class StreamsMetricsImpl implements StreamsMetrics { @@ -339,6 +341,30 @@ public class StreamsMetricsImpl implements StreamsMetrics { metrics.removeMetric(metricName); } + public void removeStoreLevelMetric(final MetricName metricName) { + metrics.removeMetric(metricName); + + final List metricsScopeCandidates = metricName.tags().keySet().stream() + .filter(tag -> !tag.equals(THREAD_ID_TAG) && !tag.equals(TASK_ID_TAG)) + .collect(Collectors.toList()); + if (metricsScopeCandidates.size() != 1) { + // should never happen + throw new IllegalStateException("Expected exactly one metric scope tag, but found " + metricsScopeCandidates); + } + + final Deque metricsForStore = storeLevelMetrics.get( + storeSensorPrefix( + metricName.tags().get(THREAD_ID_TAG), + metricName.tags().get(TASK_ID_TAG), + metricName.tags().get(metricsScopeCandidates.get(0)) + ) + ); + + if (metricsForStore != null) { + metricsForStore.remove(metricName); + } + } + public Map taskLevelTagMap(final String threadId, final String taskId) { final Map tagMap = new LinkedHashMap<>(); tagMap.put(THREAD_ID_TAG, threadId); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 8678111f989..32d1ee91437 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -482,9 +482,11 @@ public class MeteredKeyValueStore private final long startTimestamp; private final Function valueDeserializer; - private MeteredKeyValueTimestampedIterator(final KeyValueIterator iter, - final Sensor sensor, - final Function valueDeserializer) { + private MeteredKeyValueTimestampedIterator( + final KeyValueIterator iter, + final Sensor sensor, + final Function valueDeserializer + ) { this.iter = iter; this.sensor = sensor; this.valueDeserializer = valueDeserializer; diff --git a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/OpenIteratorsTest.java b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/OpenIteratorsTest.java index 3464ecbdaee..daaacb7bec6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/OpenIteratorsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/OpenIteratorsTest.java @@ -58,11 +58,11 @@ public class OpenIteratorsTest { assertThat(gauge.value(null, 0), is(2L)); openIterators.remove(meteredIterator2); - verify(streamsMetrics, never()).removeMetric(any()); + verify(streamsMetrics, never()).removeStoreLevelMetric(any()); assertThat(gauge.value(null, 0), is(5L)); openIterators.remove(meteredIterator1); - verify(streamsMetrics).removeMetric(any()); + verify(streamsMetrics).removeStoreLevelMetric(any()); assertThat(gauge.value(null, 0), is(5L)); openIterators.add(meteredIterator3);