KAFKA-19748: fix metrics leak in Kafka Streams (#20633)
CI / build (push) Has been cancelled Details

This PR fixes a leak in StreamsMetricImpl not removing a
store-level-metric correctly, and thus leaking objects.

Reviewers: Eduwer Camacaro <eduwerc@gmail.com>, Bill Bejeck
 <bbejeck@apache.org>
This commit is contained in:
Matthias J. Sax 2025-10-03 15:27:51 -07:00
parent a10c1f3ea1
commit ce248ab0d6
4 changed files with 34 additions and 5 deletions

View File

@ -66,6 +66,7 @@ public class OpenIterators {
public void remove(final MeteredIterator iterator) {
if (numOpenIterators.intValue() == 1) {
streamsMetrics.removeMetric(metricName);
streamsMetrics.removeStoreLevelMetric(metricName);
}
numOpenIterators.decrement();
openIterators.remove(iterator);

View File

@ -41,12 +41,14 @@ import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
public class StreamsMetricsImpl implements StreamsMetrics {
@ -339,6 +341,30 @@ public class StreamsMetricsImpl implements StreamsMetrics {
metrics.removeMetric(metricName);
}
public void removeStoreLevelMetric(final MetricName metricName) {
metrics.removeMetric(metricName);
final List<String> metricsScopeCandidates = metricName.tags().keySet().stream()
.filter(tag -> !tag.equals(THREAD_ID_TAG) && !tag.equals(TASK_ID_TAG))
.collect(Collectors.toList());
if (metricsScopeCandidates.size() != 1) {
// should never happen
throw new IllegalStateException("Expected exactly one metric scope tag, but found " + metricsScopeCandidates);
}
final Deque<MetricName> metricsForStore = storeLevelMetrics.get(
storeSensorPrefix(
metricName.tags().get(THREAD_ID_TAG),
metricName.tags().get(TASK_ID_TAG),
metricName.tags().get(metricsScopeCandidates.get(0))
)
);
if (metricsForStore != null) {
metricsForStore.remove(metricName);
}
}
public Map<String, String> taskLevelTagMap(final String threadId, final String taskId) {
final Map<String, String> tagMap = new LinkedHashMap<>();
tagMap.put(THREAD_ID_TAG, threadId);

View File

@ -482,9 +482,11 @@ public class MeteredKeyValueStore<K, V>
private final long startTimestamp;
private final Function<byte[], V> valueDeserializer;
private MeteredKeyValueTimestampedIterator(final KeyValueIterator<Bytes, byte[]> iter,
final Sensor sensor,
final Function<byte[], V> valueDeserializer) {
private MeteredKeyValueTimestampedIterator(
final KeyValueIterator<Bytes, byte[]> iter,
final Sensor sensor,
final Function<byte[], V> valueDeserializer
) {
this.iter = iter;
this.sensor = sensor;
this.valueDeserializer = valueDeserializer;

View File

@ -58,11 +58,11 @@ public class OpenIteratorsTest {
assertThat(gauge.value(null, 0), is(2L));
openIterators.remove(meteredIterator2);
verify(streamsMetrics, never()).removeMetric(any());
verify(streamsMetrics, never()).removeStoreLevelMetric(any());
assertThat(gauge.value(null, 0), is(5L));
openIterators.remove(meteredIterator1);
verify(streamsMetrics).removeMetric(any());
verify(streamsMetrics).removeStoreLevelMetric(any());
assertThat(gauge.value(null, 0), is(5L));
openIterators.add(meteredIterator3);