KAFKA-15541: Add oldest-iterator-open-since-ms metric (#16041)

Part of [KIP-989](https://cwiki.apache.org/confluence/x/9KCzDw).

This new `StateStore` metric tracks the timestamp that the oldest
surviving Iterator was created.

This timestamp should continue to climb, and closely track the current
time, as old iterators are closed and new ones created. If the timestamp
remains very low (i.e. old), that suggests an Iterator has leaked, which
should enable users to isolate the affected store.

It will report no data when there are no currently open Iterators.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Nick Telford 2024-05-29 00:23:23 +01:00 committed by GitHub
parent 4eb60b5104
commit 59ba555142
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 373 additions and 30 deletions

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
/**
* Common super-interface of all Metered Iterator types.
*
* This enables tracking the timestamp the Iterator was first created, for the oldest-iterator-open-since-ms metric.
*/
public interface MeteredIterator {
/**
* @return The UNIX timestamp, in milliseconds, that this Iterator was created/opened.
*/
long startTimestamp();
}

View File

@ -49,9 +49,12 @@ import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
@ -96,6 +99,7 @@ public class MeteredKeyValueStore<K, V>
private TaskId taskId;
protected LongAdder numOpenIterators = new LongAdder();
protected NavigableSet<MeteredIterator> openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
@SuppressWarnings("rawtypes")
private final Map<Class, QueryHandler> queryHandlers =
@ -169,6 +173,9 @@ public class MeteredKeyValueStore<K, V>
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.sum());
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> openIterators.isEmpty() ? null : openIterators.first().startTimestamp()
);
}
protected Serde<V> prepareValueSerdeForStore(final Serde<V> valueSerde, final SerdeGetter getter) {
@ -456,18 +463,26 @@ public class MeteredKeyValueStore<K, V>
}
}
private class MeteredKeyValueIterator implements KeyValueIterator<K, V> {
private class MeteredKeyValueIterator implements KeyValueIterator<K, V>, MeteredIterator {
private final KeyValueIterator<Bytes, byte[]> iter;
private final Sensor sensor;
private final long startNs;
private final long startTimestamp;
private MeteredKeyValueIterator(final KeyValueIterator<Bytes, byte[]> iter,
final Sensor sensor) {
this.iter = iter;
this.sensor = sensor;
this.startTimestamp = time.milliseconds();
this.startNs = time.nanoseconds();
numOpenIterators.increment();
openIterators.add(this);
}
@Override
public long startTimestamp() {
return startTimestamp;
}
@Override
@ -492,6 +507,7 @@ public class MeteredKeyValueStore<K, V>
sensor.record(duration);
iteratorDurationSensor.record(duration);
numOpenIterators.decrement();
openIterators.remove(this);
}
}
@ -501,11 +517,12 @@ public class MeteredKeyValueStore<K, V>
}
}
private class MeteredKeyValueTimestampedIterator implements KeyValueIterator<K, V> {
private class MeteredKeyValueTimestampedIterator implements KeyValueIterator<K, V>, MeteredIterator {
private final KeyValueIterator<Bytes, byte[]> iter;
private final Sensor sensor;
private final long startNs;
private final long startTimestamp;
private final Function<byte[], V> valueDeserializer;
private MeteredKeyValueTimestampedIterator(final KeyValueIterator<Bytes, byte[]> iter,
@ -514,8 +531,15 @@ public class MeteredKeyValueStore<K, V>
this.iter = iter;
this.sensor = sensor;
this.valueDeserializer = valueDeserializer;
this.startTimestamp = time.milliseconds();
this.startNs = time.nanoseconds();
numOpenIterators.increment();
openIterators.add(this);
}
@Override
public long startTimestamp() {
return startTimestamp;
}
@Override
@ -540,6 +564,7 @@ public class MeteredKeyValueStore<K, V>
sensor.record(duration);
iteratorDurationSensor.record(duration);
numOpenIterators.decrement();
openIterators.remove(this);
}
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;
import java.util.concurrent.atomic.LongAdder;
import java.util.Set;
import java.util.function.Function;
import org.apache.kafka.common.metrics.Sensor;
@ -24,7 +25,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.VersionedRecordIterator;
import org.apache.kafka.streams.state.VersionedRecord;
public class MeteredMultiVersionedKeyQueryIterator<V> implements VersionedRecordIterator<V> {
class MeteredMultiVersionedKeyQueryIterator<V> implements VersionedRecordIterator<V>, MeteredIterator {
private final VersionedRecordIterator<byte[]> iterator;
private final Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue;
@ -32,21 +33,31 @@ public class MeteredMultiVersionedKeyQueryIterator<V> implements VersionedRecord
private final Sensor sensor;
private final Time time;
private final long startNs;
private final long startTimestampMs;
private final Set<MeteredIterator> openIterators;
public MeteredMultiVersionedKeyQueryIterator(final VersionedRecordIterator<byte[]> iterator,
final Sensor sensor,
final Time time,
final Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue,
final LongAdder numOpenIterators) {
final LongAdder numOpenIterators,
final Set<MeteredIterator> openIterators) {
this.iterator = iterator;
this.deserializeValue = deserializeValue;
this.numOpenIterators = numOpenIterators;
this.openIterators = openIterators;
this.sensor = sensor;
this.time = time;
this.startNs = time.nanoseconds();
this.startTimestampMs = time.milliseconds();
numOpenIterators.increment();
openIterators.add(this);
}
@Override
public long startTimestamp() {
return startTimestampMs;
}
@Override
public void close() {
@ -55,6 +66,7 @@ public class MeteredMultiVersionedKeyQueryIterator<V> implements VersionedRecord
} finally {
sensor.record(time.nanoseconds() - startNs);
numOpenIterators.decrement();
openIterators.remove(this);
}
}

View File

@ -45,8 +45,11 @@ import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
import java.util.Comparator;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.LongAdder;
import static org.apache.kafka.common.utils.Utils.mkEntry;
@ -73,6 +76,7 @@ public class MeteredSessionStore<K, V>
private TaskId taskId;
private LongAdder numOpenIterators = new LongAdder();
private final NavigableSet<MeteredIterator> openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
@SuppressWarnings("rawtypes")
private final Map<Class, QueryHandler> queryHandlers =
@ -138,6 +142,9 @@ public class MeteredSessionStore<K, V>
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.sum());
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> openIterators.isEmpty() ? null : openIterators.first().startTimestamp()
);
}
@ -257,7 +264,8 @@ public class MeteredSessionStore<K, V>
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators);
numOpenIterators,
openIterators);
}
@Override
@ -271,7 +279,8 @@ public class MeteredSessionStore<K, V>
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators
numOpenIterators,
openIterators
);
}
@ -286,7 +295,8 @@ public class MeteredSessionStore<K, V>
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators);
numOpenIterators,
openIterators);
}
@Override
@ -300,7 +310,8 @@ public class MeteredSessionStore<K, V>
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators
numOpenIterators,
openIterators
);
}
@ -321,7 +332,8 @@ public class MeteredSessionStore<K, V>
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators);
numOpenIterators,
openIterators);
}
@Override
@ -342,7 +354,8 @@ public class MeteredSessionStore<K, V>
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators
numOpenIterators,
openIterators
);
}
@ -365,7 +378,8 @@ public class MeteredSessionStore<K, V>
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators);
numOpenIterators,
openIterators);
}
@Override
@ -379,7 +393,8 @@ public class MeteredSessionStore<K, V>
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators);
numOpenIterators,
openIterators);
}
@Override
@ -402,7 +417,8 @@ public class MeteredSessionStore<K, V>
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators
numOpenIterators,
openIterators
);
}
@ -474,7 +490,8 @@ public class MeteredSessionStore<K, V>
serdes::keyFrom,
StoreQueryUtils.getDeserializeValue(serdes, wrapped()),
time,
numOpenIterators
numOpenIterators,
openIterators
);
final QueryResult<MeteredWindowedKeyValueIterator<K, V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult);

View File

@ -307,11 +307,12 @@ public class MeteredTimestampedKeyValueStore<K, V>
}
@SuppressWarnings("unchecked")
private class MeteredTimestampedKeyValueStoreIterator implements KeyValueIterator<K, V> {
private class MeteredTimestampedKeyValueStoreIterator implements KeyValueIterator<K, V>, MeteredIterator {
private final KeyValueIterator<Bytes, byte[]> iter;
private final Sensor sensor;
private final long startNs;
private final long startTimestampMs;
private final Function<byte[], ValueAndTimestamp<V>> valueAndTimestampDeserializer;
private final boolean returnPlainValue;
@ -324,8 +325,15 @@ public class MeteredTimestampedKeyValueStore<K, V>
this.sensor = sensor;
this.valueAndTimestampDeserializer = valueAndTimestampDeserializer;
this.startNs = time.nanoseconds();
this.startTimestampMs = time.milliseconds();
this.returnPlainValue = returnPlainValue;
numOpenIterators.increment();
openIterators.add(this);
}
@Override
public long startTimestamp() {
return startTimestampMs;
}
@Override
@ -354,6 +362,7 @@ public class MeteredTimestampedKeyValueStore<K, V>
sensor.record(duration);
iteratorDurationSensor.record(duration);
numOpenIterators.decrement();
openIterators.remove(this);
}
}

View File

@ -269,7 +269,8 @@ public class MeteredVersionedKeyValueStore<K, V>
iteratorDurationSensor,
time,
StoreQueryUtils.getDeserializeValue(plainValueSerdes),
numOpenIterators
numOpenIterators,
openIterators
);
final QueryResult<MeteredMultiVersionedKeyQueryIterator<V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult);

View File

@ -47,8 +47,11 @@ import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
import java.util.Comparator;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.LongAdder;
import static org.apache.kafka.common.utils.Utils.mkEntry;
@ -77,6 +80,7 @@ public class MeteredWindowStore<K, V>
private TaskId taskId;
private LongAdder numOpenIterators = new LongAdder();
private NavigableSet<MeteredIterator> openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
@SuppressWarnings("rawtypes")
private final Map<Class, QueryHandler> queryHandlers =
@ -157,6 +161,9 @@ public class MeteredWindowStore<K, V>
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.sum());
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> openIterators.isEmpty() ? null : openIterators.first().startTimestamp()
);
}
@Deprecated
@ -245,7 +252,8 @@ public class MeteredWindowStore<K, V>
streamsMetrics,
serdes::valueFrom,
time,
numOpenIterators
numOpenIterators,
openIterators
);
}
@ -261,7 +269,8 @@ public class MeteredWindowStore<K, V>
streamsMetrics,
serdes::valueFrom,
time,
numOpenIterators
numOpenIterators,
openIterators
);
}
@ -282,7 +291,8 @@ public class MeteredWindowStore<K, V>
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators);
numOpenIterators,
openIterators);
}
@Override
@ -302,7 +312,8 @@ public class MeteredWindowStore<K, V>
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators);
numOpenIterators,
openIterators);
}
@Override
@ -316,7 +327,8 @@ public class MeteredWindowStore<K, V>
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators);
numOpenIterators,
openIterators);
}
@Override
@ -330,7 +342,8 @@ public class MeteredWindowStore<K, V>
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators);
numOpenIterators,
openIterators);
}
@Override
@ -343,7 +356,8 @@ public class MeteredWindowStore<K, V>
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators
numOpenIterators,
openIterators
);
}
@ -357,7 +371,8 @@ public class MeteredWindowStore<K, V>
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators
numOpenIterators,
openIterators
);
}
@ -435,7 +450,8 @@ public class MeteredWindowStore<K, V>
serdes::keyFrom,
getDeserializeValue(serdes, wrapped()),
time,
numOpenIterators
numOpenIterators,
openIterators
);
final QueryResult<MeteredWindowedKeyValueIterator<K, V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult);
@ -486,7 +502,8 @@ public class MeteredWindowStore<K, V>
streamsMetrics,
getDeserializeValue(serdes, wrapped()),
time,
numOpenIterators
numOpenIterators,
openIterators
);
final QueryResult<MeteredWindowStoreIterator<V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult);

View File

@ -23,9 +23,10 @@ import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.state.WindowStoreIterator;
import java.util.concurrent.atomic.LongAdder;
import java.util.Set;
import java.util.function.Function;
class MeteredWindowStoreIterator<V> implements WindowStoreIterator<V> {
class MeteredWindowStoreIterator<V> implements WindowStoreIterator<V>, MeteredIterator {
private final WindowStoreIterator<byte[]> iter;
private final Sensor operationSensor;
@ -33,8 +34,10 @@ class MeteredWindowStoreIterator<V> implements WindowStoreIterator<V> {
private final StreamsMetrics metrics;
private final Function<byte[], V> valueFrom;
private final long startNs;
private final long startTimestampMs;
private final Time time;
private final LongAdder numOpenIterators;
private final Set<MeteredIterator> openIterators;
MeteredWindowStoreIterator(final WindowStoreIterator<byte[]> iter,
final Sensor operationSensor,
@ -42,16 +45,25 @@ class MeteredWindowStoreIterator<V> implements WindowStoreIterator<V> {
final StreamsMetrics metrics,
final Function<byte[], V> valueFrom,
final Time time,
final LongAdder numOpenIterators) {
final LongAdder numOpenIterators,
final Set<MeteredIterator> openIterators) {
this.iter = iter;
this.operationSensor = operationSensor;
this.iteratorSensor = iteratorSensor;
this.metrics = metrics;
this.valueFrom = valueFrom;
this.startNs = time.nanoseconds();
this.startTimestampMs = time.milliseconds();
this.time = time;
this.numOpenIterators = numOpenIterators;
this.openIterators = openIterators;
numOpenIterators.increment();
openIterators.add(this);
}
@Override
public long startTimestamp() {
return startTimestampMs;
}
@Override
@ -74,6 +86,7 @@ class MeteredWindowStoreIterator<V> implements WindowStoreIterator<V> {
operationSensor.record(duration);
iteratorSensor.record(duration);
numOpenIterators.decrement();
openIterators.remove(this);
}
}

View File

@ -25,9 +25,10 @@ import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.KeyValueIterator;
import java.util.concurrent.atomic.LongAdder;
import java.util.Set;
import java.util.function.Function;
class MeteredWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed<K>, V> {
class MeteredWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed<K>, V>, MeteredIterator {
private final KeyValueIterator<Windowed<Bytes>, byte[]> iter;
private final Sensor operationSensor;
@ -36,8 +37,10 @@ class MeteredWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed
private final Function<byte[], K> deserializeKey;
private final Function<byte[], V> deserializeValue;
private final long startNs;
private final long startTimestampMs;
private final Time time;
private final LongAdder numOpenIterators;
private final Set<MeteredIterator> openIterators;
MeteredWindowedKeyValueIterator(final KeyValueIterator<Windowed<Bytes>, byte[]> iter,
final Sensor operationSensor,
@ -46,7 +49,8 @@ class MeteredWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed
final Function<byte[], K> deserializeKey,
final Function<byte[], V> deserializeValue,
final Time time,
final LongAdder numOpenIterators) {
final LongAdder numOpenIterators,
final Set<MeteredIterator> openIterators) {
this.iter = iter;
this.operationSensor = operationSensor;
this.iteratorSensor = iteratorSensor;
@ -54,9 +58,17 @@ class MeteredWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed
this.deserializeKey = deserializeKey;
this.deserializeValue = deserializeValue;
this.startNs = time.nanoseconds();
this.startTimestampMs = time.milliseconds();
this.time = time;
this.numOpenIterators = numOpenIterators;
this.openIterators = openIterators;
numOpenIterators.increment();
openIterators.add(this);
}
@Override
public long startTimestamp() {
return this.startTimestampMs;
}
@Override
@ -84,6 +96,7 @@ class MeteredWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed
operationSensor.record(duration);
iteratorSensor.record(duration);
numOpenIterators.decrement();
openIterators.remove(this);
}
}

View File

@ -157,6 +157,10 @@ public class StateStoreMetrics {
private static final String ITERATOR_DURATION_MAX_DESCRIPTION =
MAX_DESCRIPTION_PREFIX + ITERATOR_DURATION_DESCRIPTION;
private static final String OLDEST_ITERATOR_OPEN_SINCE_MS = "oldest-iterator-open-since-ms";
private static final String OLDEST_ITERATOR_OPEN_SINCE_MS_DESCRIPTION =
"The UNIX timestamp the oldest still open iterator was created, in milliseconds";
public static Sensor putSensor(final String taskId,
final String storeType,
final String storeName,
@ -451,6 +455,22 @@ public class StateStoreMetrics {
}
public static void addOldestOpenIteratorGauge(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics,
final Gauge<Long> oldestOpenIteratorGauge) {
streamsMetrics.addStoreLevelMutableMetric(
taskId,
storeType,
storeName,
OLDEST_ITERATOR_OPEN_SINCE_MS,
OLDEST_ITERATOR_OPEN_SINCE_MS_DESCRIPTION,
RecordingLevel.INFO,
oldestOpenIteratorGauge
);
}
private static Sensor sizeOrCountSensor(final String taskId,
final String storeType,
final String storeName,

View File

@ -490,6 +490,42 @@ public class MeteredKeyValueStoreTest {
assertThat((double) iteratorDurationMaxMetric.metricValue(), equalTo(3.0 * TimeUnit.MILLISECONDS.toNanos(1)));
}
@Test
public void shouldTrackOldestOpenIteratorTimestamp() {
when(inner.all()).thenReturn(KeyValueIterators.emptyIterator());
init();
final KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms");
assertThat(oldestIteratorTimestampMetric, not(nullValue()));
assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue());
KeyValueIterator<String, String> second = null;
final long secondTimestamp;
try {
try (final KeyValueIterator<String, String> first = metered.all()) {
final long oldestTimestamp = mockTime.milliseconds();
assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp));
mockTime.sleep(100);
// open a second iterator before closing the first to test that we still produce the first iterator's timestamp
second = metered.all();
secondTimestamp = mockTime.milliseconds();
assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp));
mockTime.sleep(100);
}
// now that the first iterator is closed, check that the timestamp has advanced to the still open second iterator
assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(secondTimestamp));
} finally {
if (second != null) {
second.close();
}
}
assertThat((Integer) oldestIteratorTimestampMetric.metricValue(), nullValue());
}
private KafkaMetric metric(final MetricName metricName) {
return this.metrics.metric(metricName);
}

View File

@ -654,6 +654,42 @@ public class MeteredSessionStoreTest {
assertThat((double) iteratorDurationMaxMetric.metricValue(), equalTo(3.0 * TimeUnit.MILLISECONDS.toNanos(1)));
}
@Test
public void shouldTrackOldestOpenIteratorTimestamp() {
when(innerStore.backwardFetch(KEY_BYTES)).thenReturn(KeyValueIterators.emptyIterator());
init();
final KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms");
assertThat(oldestIteratorTimestampMetric, not(nullValue()));
assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue());
KeyValueIterator<Windowed<String>, String> second = null;
final long secondTimestamp;
try {
try (final KeyValueIterator<Windowed<String>, String> first = store.backwardFetch(KEY)) {
final long oldestTimestamp = mockTime.milliseconds();
assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp));
mockTime.sleep(100);
// open a second iterator before closing the first to test that we still produce the first iterator's timestamp
second = store.backwardFetch(KEY);
secondTimestamp = mockTime.milliseconds();
assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp));
mockTime.sleep(100);
}
// now that the first iterator is closed, check that the timestamp has advanced to the still open second iterator
assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(secondTimestamp));
} finally {
if (second != null) {
second.close();
}
}
assertThat((Integer) oldestIteratorTimestampMetric.metricValue(), nullValue());
}
private KafkaMetric metric(final String name) {
return this.metrics.metric(new MetricName(name, STORE_LEVEL_GROUP, "", this.tags));
}

View File

@ -487,4 +487,40 @@ public class MeteredTimestampedKeyValueStoreTest {
assertThat((double) iteratorDurationAvgMetric.metricValue(), equalTo(2.5 * TimeUnit.MILLISECONDS.toNanos(1)));
assertThat((double) iteratorDurationMaxMetric.metricValue(), equalTo(3.0 * TimeUnit.MILLISECONDS.toNanos(1)));
}
@Test
public void shouldTrackOldestOpenIteratorTimestamp() {
when(inner.all()).thenReturn(KeyValueIterators.emptyIterator());
init();
final KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms");
assertThat(oldestIteratorTimestampMetric, not(nullValue()));
assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue());
KeyValueIterator<String, ValueAndTimestamp<String>> second = null;
final long secondTimestamp;
try {
try (final KeyValueIterator<String, ValueAndTimestamp<String>> first = metered.all()) {
final long oldestTimestamp = mockTime.milliseconds();
assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp));
mockTime.sleep(100);
// open a second iterator before closing the first to test that we still produce the first iterator's timestamp
second = metered.all();
secondTimestamp = mockTime.milliseconds();
assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp));
mockTime.sleep(100);
}
// now that the first iterator is closed, check that the timestamp has advanced to the still open second iterator
assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(secondTimestamp));
} finally {
if (second != null) {
second.close();
}
}
assertThat((Integer) oldestIteratorTimestampMetric.metricValue(), nullValue());
}
}

View File

@ -426,6 +426,48 @@ public class MeteredVersionedKeyValueStoreTest {
assertThat((double) iteratorDurationMaxMetric.metricValue(), equalTo(3.0 * TimeUnit.MILLISECONDS.toNanos(1)));
}
@Test
public void shouldTrackOldestOpenIteratorTimestamp() {
final MultiVersionedKeyQuery<String, String> query = MultiVersionedKeyQuery.withKey(KEY);
final PositionBound bound = PositionBound.unbounded();
final QueryConfig config = new QueryConfig(false);
when(inner.query(any(), any(), any())).thenReturn(
QueryResult.forResult(new LogicalSegmentIterator(Collections.emptyListIterator(), RAW_KEY, 0L, 0L, ResultOrder.ANY)));
final KafkaMetric oldestIteratorTimestampMetric = getMetric("oldest-iterator-open-since-ms");
assertThat(oldestIteratorTimestampMetric, not(nullValue()));
assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue());
final QueryResult<VersionedRecordIterator<String>> first = store.query(query, bound, config);
VersionedRecordIterator<String> secondIterator = null;
final long secondTime;
try {
try (final VersionedRecordIterator<String> iterator = first.getResult()) {
final long oldestTimestamp = mockTime.milliseconds();
assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp));
mockTime.sleep(100);
// open a second iterator before closing the first to test that we still produce the first iterator's timestamp
final QueryResult<VersionedRecordIterator<String>> second = store.query(query, bound, config);
secondIterator = second.getResult();
secondTime = mockTime.milliseconds();
assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp));
mockTime.sleep(100);
}
// now that the first iterator is closed, check that the timestamp has advanced to the still open second iterator
assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(secondTime));
} finally {
if (secondIterator != null) {
secondIterator.close();
}
}
assertThat((Integer) oldestIteratorTimestampMetric.metricValue(), nullValue());
}
private KafkaMetric getMetric(final String name) {
return metrics.metric(new MetricName(name, STORE_LEVEL_GROUP, "", tags));
}

View File

@ -495,6 +495,42 @@ public class MeteredWindowStoreTest {
assertThat((double) iteratorDurationMaxMetric.metricValue(), equalTo(3.0 * TimeUnit.MILLISECONDS.toNanos(1)));
}
@Test
public void shouldTrackOldestOpenIteratorTimestamp() {
when(innerStoreMock.all()).thenReturn(KeyValueIterators.emptyIterator());
store.init((StateStoreContext) context, store);
final KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms");
assertThat(oldestIteratorTimestampMetric, not(nullValue()));
assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue());
KeyValueIterator<Windowed<String>, String> second = null;
final long secondTimestamp;
try {
try (final KeyValueIterator<Windowed<String>, String> first = store.all()) {
final long oldestTimestamp = mockTime.milliseconds();
assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp));
mockTime.sleep(100);
// open a second iterator before closing the first to test that we still produce the first iterator's timestamp
second = store.all();
secondTimestamp = mockTime.milliseconds();
assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp));
mockTime.sleep(100);
}
// now that the first iterator is closed, check that the timestamp has advanced to the still open second iterator
assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(secondTimestamp));
} finally {
if (second != null) {
second.close();
}
}
assertThat((Integer) oldestIteratorTimestampMetric.metricValue(), nullValue());
}
private KafkaMetric metric(final String name) {
return metrics.metric(new MetricName(name, STORE_LEVEL_GROUP, "", tags));
}