From 0dd84711743e8e0bd2e3840d1320896fa38d1a43 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 3 Oct 2025 15:27:51 -0700 Subject: [PATCH] KAFKA-19748: fix metrics leak in Kafka Streams (#20633) This PR fixes a leak in StreamsMetricImpl not removing a store-level-metric correctly, and thus leaking objects. Reviewers: Eduwer Camacaro , Bill Bejeck --- .../internals/metrics/OpenIterators.java | 2 +- .../internals/metrics/StreamsMetricsImpl.java | 26 +++++++++++++++++++ .../state/internals/MeteredKeyValueStore.java | 8 +++--- .../internals/metrics/OpenIteratorsTest.java | 4 +-- 4 files changed, 34 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java index 736af467bd2..b3cb052b104 100644 --- a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java +++ b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java @@ -62,7 +62,7 @@ public class OpenIterators { public void remove(final MeteredIterator iterator) { if (openIterators.size() == 1) { - streamsMetrics.removeMetric(metricName); + streamsMetrics.removeStoreLevelMetric(metricName); } openIterators.remove(iterator); updateOldestStartTimestamp(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index 7c22c94e9af..a0999a36c60 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -41,12 +41,14 @@ import java.util.Deque; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import java.util.stream.Collectors; public class StreamsMetricsImpl implements StreamsMetrics { @@ -339,6 +341,30 @@ public class StreamsMetricsImpl implements StreamsMetrics { metrics.removeMetric(metricName); } + public void removeStoreLevelMetric(final MetricName metricName) { + metrics.removeMetric(metricName); + + final List metricsScopeCandidates = metricName.tags().keySet().stream() + .filter(tag -> !tag.equals(THREAD_ID_TAG) && !tag.equals(TASK_ID_TAG)) + .collect(Collectors.toList()); + if (metricsScopeCandidates.size() != 1) { + // should never happen + throw new IllegalStateException("Expected exactly one metric scope tag, but found " + metricsScopeCandidates); + } + + final Deque metricsForStore = storeLevelMetrics.get( + storeSensorPrefix( + metricName.tags().get(THREAD_ID_TAG), + metricName.tags().get(TASK_ID_TAG), + metricName.tags().get(metricsScopeCandidates.get(0)) + ) + ); + + if (metricsForStore != null) { + metricsForStore.remove(metricName); + } + } + public Map taskLevelTagMap(final String threadId, final String taskId) { final Map tagMap = new LinkedHashMap<>(); tagMap.put(THREAD_ID_TAG, threadId); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 9c033d6bbd5..a89bf0c62c1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -487,9 +487,11 @@ public class MeteredKeyValueStore private final long startTimestamp; private final Function valueDeserializer; - private MeteredKeyValueTimestampedIterator(final KeyValueIterator iter, - final Sensor sensor, - final Function valueDeserializer) { + private MeteredKeyValueTimestampedIterator( + final KeyValueIterator iter, + final Sensor sensor, + final Function valueDeserializer + ) { this.iter = iter; this.sensor = sensor; this.valueDeserializer = valueDeserializer; diff --git a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/OpenIteratorsTest.java b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/OpenIteratorsTest.java index 3464ecbdaee..daaacb7bec6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/OpenIteratorsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/OpenIteratorsTest.java @@ -58,11 +58,11 @@ public class OpenIteratorsTest { assertThat(gauge.value(null, 0), is(2L)); openIterators.remove(meteredIterator2); - verify(streamsMetrics, never()).removeMetric(any()); + verify(streamsMetrics, never()).removeStoreLevelMetric(any()); assertThat(gauge.value(null, 0), is(5L)); openIterators.remove(meteredIterator1); - verify(streamsMetrics).removeMetric(any()); + verify(streamsMetrics).removeStoreLevelMetric(any()); assertThat(gauge.value(null, 0), is(5L)); openIterators.add(meteredIterator3);