KAFKA-19398: (De)Register oldest-iterator-open-since-ms metric dynamically (#20022)

The metric for oldest-iterator-open-since-ms might report a null value
if there is not open iterator.

This PR changes the behavior to dynamically register/deregister the
entire metric instead of allowing it to return a null value.

Reviewers: Bill Bejeck <bbejeck@apache.org>
This commit is contained in:
Matthias J. Sax 2025-06-24 17:01:36 -07:00
parent b80aa15c17
commit fb054b590e
10 changed files with 121 additions and 58 deletions

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.internals.metrics;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.MeteredIterator;
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
import java.util.Comparator;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.LongAdder;
public class OpenIterators {
private final TaskId taskId;
private final String metricsScope;
private final String name;
private final StreamsMetricsImpl streamsMetrics;
private final LongAdder numOpenIterators = new LongAdder();
private final NavigableSet<MeteredIterator> openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
private MetricName metricName;
public OpenIterators(final TaskId taskId,
final String metricsScope,
final String name,
final StreamsMetricsImpl streamsMetrics) {
this.taskId = taskId;
this.metricsScope = metricsScope;
this.name = name;
this.streamsMetrics = streamsMetrics;
}
public void add(final MeteredIterator iterator) {
openIterators.add(iterator);
numOpenIterators.increment();
if (numOpenIterators.intValue() == 1) {
metricName = StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name, streamsMetrics,
(config, now) -> openIterators.first().startTimestamp()
);
}
}
public void remove(final MeteredIterator iterator) {
if (numOpenIterators.intValue() == 1) {
streamsMetrics.removeMetric(metricName);
}
numOpenIterators.decrement();
openIterators.remove(iterator);
}
public long sum() {
return numOpenIterators.sum();
}
}

View File

@ -335,6 +335,10 @@ public class StreamsMetricsImpl implements StreamsMetrics {
} }
} }
public void removeMetric(final MetricName metricName) {
metrics.removeMetric(metricName);
}
public Map<String, String> taskLevelTagMap(final String threadId, final String taskId) { public Map<String, String> taskLevelTagMap(final String threadId, final String taskId) {
final Map<String, String> tagMap = new LinkedHashMap<>(); final Map<String, String> tagMap = new LinkedHashMap<>();
tagMap.put(THREAD_ID_TAG, threadId); tagMap.put(THREAD_ID_TAG, threadId);
@ -517,7 +521,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
return getSensors(storeLevelSensors, sensorSuffix, sensorPrefix, recordingLevel, parents); return getSensors(storeLevelSensors, sensorSuffix, sensorPrefix, recordingLevel, parents);
} }
public <T> void addStoreLevelMutableMetric(final String taskId, public <T> MetricName addStoreLevelMutableMetric(final String taskId,
final String metricsScope, final String metricsScope,
final String storeName, final String storeName,
final String name, final String name,
@ -535,6 +539,8 @@ public class StreamsMetricsImpl implements StreamsMetrics {
final String key = storeSensorPrefix(Thread.currentThread().getName(), taskId, storeName); final String key = storeSensorPrefix(Thread.currentThread().getName(), taskId, storeName);
storeLevelMetrics.computeIfAbsent(key, ignored -> new LinkedList<>()).push(metricName); storeLevelMetrics.computeIfAbsent(key, ignored -> new LinkedList<>()).push(metricName);
} }
return metricName;
} }
public final void removeAllStoreLevelSensorsAndMetrics(final String taskId, public final void removeAllStoreLevelSensorsAndMetrics(final String taskId,

View File

@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.internals.metrics.OpenIterators;
import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStore;
@ -48,14 +49,9 @@ import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics; import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function; import java.util.function.Function;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
@ -96,8 +92,7 @@ public class MeteredKeyValueStore<K, V>
private StreamsMetricsImpl streamsMetrics; private StreamsMetricsImpl streamsMetrics;
private TaskId taskId; private TaskId taskId;
protected LongAdder numOpenIterators = new LongAdder(); protected OpenIterators openIterators;
protected NavigableSet<MeteredIterator> openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
private final Map<Class, QueryHandler> queryHandlers = private final Map<Class, QueryHandler> queryHandlers =
@ -153,13 +148,8 @@ public class MeteredKeyValueStore<K, V>
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics); e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics); iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics, StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.sum()); (config, now) -> openIterators.sum());
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics, openIterators = new OpenIterators(taskId, metricsScope, name(), streamsMetrics);
(config, now) -> {
final Iterator<MeteredIterator> openIteratorsIterator = openIterators.iterator();
return openIteratorsIterator.hasNext() ? openIteratorsIterator.next().startTimestamp() : null;
}
);
} }
protected Serde<V> prepareValueSerdeForStore(final Serde<V> valueSerde, final SerdeGetter getter) { protected Serde<V> prepareValueSerdeForStore(final Serde<V> valueSerde, final SerdeGetter getter) {
@ -445,7 +435,6 @@ public class MeteredKeyValueStore<K, V>
this.sensor = sensor; this.sensor = sensor;
this.startTimestamp = time.milliseconds(); this.startTimestamp = time.milliseconds();
this.startNs = time.nanoseconds(); this.startNs = time.nanoseconds();
numOpenIterators.increment();
openIterators.add(this); openIterators.add(this);
} }
@ -475,7 +464,6 @@ public class MeteredKeyValueStore<K, V>
final long duration = time.nanoseconds() - startNs; final long duration = time.nanoseconds() - startNs;
sensor.record(duration); sensor.record(duration);
iteratorDurationSensor.record(duration); iteratorDurationSensor.record(duration);
numOpenIterators.decrement();
openIterators.remove(this); openIterators.remove(this);
} }
} }
@ -502,7 +490,6 @@ public class MeteredKeyValueStore<K, V>
this.valueDeserializer = valueDeserializer; this.valueDeserializer = valueDeserializer;
this.startTimestamp = time.milliseconds(); this.startTimestamp = time.milliseconds();
this.startNs = time.nanoseconds(); this.startNs = time.nanoseconds();
numOpenIterators.increment();
openIterators.add(this); openIterators.add(this);
} }
@ -532,7 +519,6 @@ public class MeteredKeyValueStore<K, V>
final long duration = time.nanoseconds() - startNs; final long duration = time.nanoseconds() - startNs;
sensor.record(duration); sensor.record(duration);
iteratorDurationSensor.record(duration); iteratorDurationSensor.record(duration);
numOpenIterators.decrement();
openIterators.remove(this); openIterators.remove(this);
} }
} }

View File

@ -18,39 +18,34 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.internals.metrics.OpenIterators;
import org.apache.kafka.streams.state.VersionedRecord; import org.apache.kafka.streams.state.VersionedRecord;
import org.apache.kafka.streams.state.VersionedRecordIterator; import org.apache.kafka.streams.state.VersionedRecordIterator;
import java.util.Set;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function; import java.util.function.Function;
class MeteredMultiVersionedKeyQueryIterator<V> implements VersionedRecordIterator<V>, MeteredIterator { class MeteredMultiVersionedKeyQueryIterator<V> implements VersionedRecordIterator<V>, MeteredIterator {
private final VersionedRecordIterator<byte[]> iterator; private final VersionedRecordIterator<byte[]> iterator;
private final Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue; private final Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue;
private final LongAdder numOpenIterators;
private final Sensor sensor; private final Sensor sensor;
private final Time time; private final Time time;
private final long startNs; private final long startNs;
private final long startTimestampMs; private final long startTimestampMs;
private final Set<MeteredIterator> openIterators; private final OpenIterators openIterators;
public MeteredMultiVersionedKeyQueryIterator(final VersionedRecordIterator<byte[]> iterator, public MeteredMultiVersionedKeyQueryIterator(final VersionedRecordIterator<byte[]> iterator,
final Sensor sensor, final Sensor sensor,
final Time time, final Time time,
final Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue, final Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue,
final LongAdder numOpenIterators, final OpenIterators openIterators) {
final Set<MeteredIterator> openIterators) {
this.iterator = iterator; this.iterator = iterator;
this.deserializeValue = deserializeValue; this.deserializeValue = deserializeValue;
this.numOpenIterators = numOpenIterators;
this.openIterators = openIterators; this.openIterators = openIterators;
this.sensor = sensor; this.sensor = sensor;
this.time = time; this.time = time;
this.startNs = time.nanoseconds(); this.startNs = time.nanoseconds();
this.startTimestampMs = time.milliseconds(); this.startTimestampMs = time.milliseconds();
numOpenIterators.increment();
openIterators.add(this); openIterators.add(this);
} }
@ -65,7 +60,6 @@ class MeteredMultiVersionedKeyQueryIterator<V> implements VersionedRecordIterato
iterator.close(); iterator.close();
} finally { } finally {
sensor.record(time.nanoseconds() - startNs); sensor.record(time.nanoseconds() - startNs);
numOpenIterators.decrement();
openIterators.remove(this); openIterators.remove(this);
} }
} }

View File

@ -326,7 +326,6 @@ public class MeteredTimestampedKeyValueStore<K, V>
this.startNs = time.nanoseconds(); this.startNs = time.nanoseconds();
this.startTimestampMs = time.milliseconds(); this.startTimestampMs = time.milliseconds();
this.returnPlainValue = returnPlainValue; this.returnPlainValue = returnPlainValue;
numOpenIterators.increment();
openIterators.add(this); openIterators.add(this);
} }
@ -360,7 +359,6 @@ public class MeteredTimestampedKeyValueStore<K, V>
final long duration = time.nanoseconds() - startNs; final long duration = time.nanoseconds() - startNs;
sensor.record(duration); sensor.record(duration);
iteratorDurationSensor.record(duration); iteratorDurationSensor.record(duration);
numOpenIterators.decrement();
openIterators.remove(this); openIterators.remove(this);
} }
} }

View File

@ -269,7 +269,6 @@ public class MeteredVersionedKeyValueStore<K, V>
iteratorDurationSensor, iteratorDurationSensor,
time, time,
StoreQueryUtils.deserializeValue(plainValueSerdes), StoreQueryUtils.deserializeValue(plainValueSerdes),
numOpenIterators,
openIterators openIterators
); );
final QueryResult<MeteredMultiVersionedKeyQueryIterator<V>> typedQueryResult = final QueryResult<MeteredMultiVersionedKeyQueryIterator<V>> typedQueryResult =

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.kafka.streams.state.internals.metrics; package org.apache.kafka.streams.state.internals.metrics;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge; import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
@ -455,12 +456,12 @@ public class StateStoreMetrics {
} }
public static void addOldestOpenIteratorGauge(final String taskId, public static MetricName addOldestOpenIteratorGauge(final String taskId,
final String storeType, final String storeType,
final String storeName, final String storeName,
final StreamsMetricsImpl streamsMetrics, final StreamsMetricsImpl streamsMetrics,
final Gauge<Long> oldestOpenIteratorGauge) { final Gauge<Long> oldestOpenIteratorGauge) {
streamsMetrics.addStoreLevelMutableMetric( return streamsMetrics.addStoreLevelMutableMetric(
taskId, taskId,
storeType, storeType,
storeName, storeName,

View File

@ -525,15 +525,16 @@ public class MeteredKeyValueStoreTest {
when(inner.all()).thenReturn(KeyValueIterators.emptyIterator()); when(inner.all()).thenReturn(KeyValueIterators.emptyIterator());
init(); init();
final KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms"); KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms");
assertThat(oldestIteratorTimestampMetric, not(nullValue())); assertThat(oldestIteratorTimestampMetric, nullValue());
assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue());
KeyValueIterator<String, String> second = null; KeyValueIterator<String, String> second = null;
final long secondTimestamp; final long secondTimestamp;
try { try {
try (final KeyValueIterator<String, String> unused = metered.all()) { try (final KeyValueIterator<String, String> unused = metered.all()) {
oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms");
assertThat(oldestIteratorTimestampMetric, not(nullValue()));
final long oldestTimestamp = mockTime.milliseconds(); final long oldestTimestamp = mockTime.milliseconds();
assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp)); assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp));
mockTime.sleep(100); mockTime.sleep(100);
@ -553,7 +554,8 @@ public class MeteredKeyValueStoreTest {
} }
} }
assertThat((Integer) oldestIteratorTimestampMetric.metricValue(), nullValue()); oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms");
assertThat(oldestIteratorTimestampMetric, nullValue());
} }
private KafkaMetric metric(final MetricName metricName) { private KafkaMetric metric(final MetricName metricName) {

View File

@ -503,15 +503,16 @@ public class MeteredTimestampedKeyValueStoreTest {
when(inner.all()).thenReturn(KeyValueIterators.emptyIterator()); when(inner.all()).thenReturn(KeyValueIterators.emptyIterator());
init(); init();
final KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms"); KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms");
assertThat(oldestIteratorTimestampMetric, not(nullValue())); assertThat(oldestIteratorTimestampMetric, nullValue());
assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue());
KeyValueIterator<String, ValueAndTimestamp<String>> second = null; KeyValueIterator<String, ValueAndTimestamp<String>> second = null;
final long secondTimestamp; final long secondTimestamp;
try { try {
try (final KeyValueIterator<String, ValueAndTimestamp<String>> unused = metered.all()) { try (final KeyValueIterator<String, ValueAndTimestamp<String>> unused = metered.all()) {
oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms");
assertThat(oldestIteratorTimestampMetric, not(nullValue()));
final long oldestTimestamp = mockTime.milliseconds(); final long oldestTimestamp = mockTime.milliseconds();
assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp)); assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp));
mockTime.sleep(100); mockTime.sleep(100);
@ -531,6 +532,7 @@ public class MeteredTimestampedKeyValueStoreTest {
} }
} }
assertThat((Integer) oldestIteratorTimestampMetric.metricValue(), nullValue()); oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms");
assertThat(oldestIteratorTimestampMetric, nullValue());
} }
} }

View File

@ -426,16 +426,17 @@ public class MeteredVersionedKeyValueStoreTest {
when(inner.query(any(), any(), any())).thenReturn( when(inner.query(any(), any(), any())).thenReturn(
QueryResult.forResult(new LogicalSegmentIterator(Collections.emptyListIterator(), RAW_KEY, 0L, 0L, ResultOrder.ANY))); QueryResult.forResult(new LogicalSegmentIterator(Collections.emptyListIterator(), RAW_KEY, 0L, 0L, ResultOrder.ANY)));
final KafkaMetric oldestIteratorTimestampMetric = getMetric("oldest-iterator-open-since-ms"); KafkaMetric oldestIteratorTimestampMetric = getMetric("oldest-iterator-open-since-ms");
assertThat(oldestIteratorTimestampMetric, not(nullValue())); assertThat(oldestIteratorTimestampMetric, nullValue());
assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue());
final QueryResult<VersionedRecordIterator<String>> first = store.query(query, bound, config); final QueryResult<VersionedRecordIterator<String>> first = store.query(query, bound, config);
VersionedRecordIterator<String> secondIterator = null; VersionedRecordIterator<String> secondIterator = null;
final long secondTime; final long secondTime;
try { try {
try (final VersionedRecordIterator<String> unused = first.getResult()) { try (final VersionedRecordIterator<String> unused = first.getResult()) {
oldestIteratorTimestampMetric = getMetric("oldest-iterator-open-since-ms");
assertThat(oldestIteratorTimestampMetric, not(nullValue()));
final long oldestTimestamp = mockTime.milliseconds(); final long oldestTimestamp = mockTime.milliseconds();
assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp)); assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp));
mockTime.sleep(100); mockTime.sleep(100);
@ -457,7 +458,8 @@ public class MeteredVersionedKeyValueStoreTest {
} }
} }
assertThat((Integer) oldestIteratorTimestampMetric.metricValue(), nullValue()); oldestIteratorTimestampMetric = getMetric("oldest-iterator-open-since-ms");
assertThat(oldestIteratorTimestampMetric, nullValue());
} }
private KafkaMetric getMetric(final String name) { private KafkaMetric getMetric(final String name) {