mirror of https://github.com/apache/kafka.git
KAFKA-19398: (De)Register oldest-iterator-open-since-ms metric dynamically (#20022)
CI / build (push) Waiting to run
Details
CI / build (push) Waiting to run
Details
The metric for oldest-iterator-open-since-ms might report a null value if there is not open iterator. This PR changes the behavior to dynamically register/deregister the entire metric instead of allowing it to return a null value. Reviewers: Bill Bejeck <bbejeck@apache.org>
This commit is contained in:
parent
6b2013a001
commit
4387132926
|
@ -0,0 +1,73 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.streams.internals.metrics;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.MetricName;
|
||||||
|
import org.apache.kafka.streams.processor.TaskId;
|
||||||
|
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||||
|
import org.apache.kafka.streams.state.internals.MeteredIterator;
|
||||||
|
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
|
||||||
|
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.NavigableSet;
|
||||||
|
import java.util.concurrent.ConcurrentSkipListSet;
|
||||||
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
|
|
||||||
|
public class OpenIterators {
|
||||||
|
private final TaskId taskId;
|
||||||
|
private final String metricsScope;
|
||||||
|
private final String name;
|
||||||
|
private final StreamsMetricsImpl streamsMetrics;
|
||||||
|
|
||||||
|
private final LongAdder numOpenIterators = new LongAdder();
|
||||||
|
private final NavigableSet<MeteredIterator> openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
|
||||||
|
|
||||||
|
private MetricName metricName;
|
||||||
|
|
||||||
|
public OpenIterators(final TaskId taskId,
|
||||||
|
final String metricsScope,
|
||||||
|
final String name,
|
||||||
|
final StreamsMetricsImpl streamsMetrics) {
|
||||||
|
this.taskId = taskId;
|
||||||
|
this.metricsScope = metricsScope;
|
||||||
|
this.name = name;
|
||||||
|
this.streamsMetrics = streamsMetrics;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void add(final MeteredIterator iterator) {
|
||||||
|
openIterators.add(iterator);
|
||||||
|
numOpenIterators.increment();
|
||||||
|
|
||||||
|
if (numOpenIterators.intValue() == 1) {
|
||||||
|
metricName = StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name, streamsMetrics,
|
||||||
|
(config, now) -> openIterators.first().startTimestamp()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void remove(final MeteredIterator iterator) {
|
||||||
|
if (numOpenIterators.intValue() == 1) {
|
||||||
|
streamsMetrics.removeMetric(metricName);
|
||||||
|
}
|
||||||
|
numOpenIterators.decrement();
|
||||||
|
openIterators.remove(iterator);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long sum() {
|
||||||
|
return numOpenIterators.sum();
|
||||||
|
}
|
||||||
|
}
|
|
@ -335,6 +335,10 @@ public class StreamsMetricsImpl implements StreamsMetrics {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void removeMetric(final MetricName metricName) {
|
||||||
|
metrics.removeMetric(metricName);
|
||||||
|
}
|
||||||
|
|
||||||
public Map<String, String> taskLevelTagMap(final String threadId, final String taskId) {
|
public Map<String, String> taskLevelTagMap(final String threadId, final String taskId) {
|
||||||
final Map<String, String> tagMap = new LinkedHashMap<>();
|
final Map<String, String> tagMap = new LinkedHashMap<>();
|
||||||
tagMap.put(THREAD_ID_TAG, threadId);
|
tagMap.put(THREAD_ID_TAG, threadId);
|
||||||
|
@ -517,7 +521,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
|
||||||
return getSensors(storeLevelSensors, sensorSuffix, sensorPrefix, recordingLevel, parents);
|
return getSensors(storeLevelSensors, sensorSuffix, sensorPrefix, recordingLevel, parents);
|
||||||
}
|
}
|
||||||
|
|
||||||
public <T> void addStoreLevelMutableMetric(final String taskId,
|
public <T> MetricName addStoreLevelMutableMetric(final String taskId,
|
||||||
final String metricsScope,
|
final String metricsScope,
|
||||||
final String storeName,
|
final String storeName,
|
||||||
final String name,
|
final String name,
|
||||||
|
@ -535,6 +539,8 @@ public class StreamsMetricsImpl implements StreamsMetrics {
|
||||||
final String key = storeSensorPrefix(Thread.currentThread().getName(), taskId, storeName);
|
final String key = storeSensorPrefix(Thread.currentThread().getName(), taskId, storeName);
|
||||||
storeLevelMetrics.computeIfAbsent(key, ignored -> new LinkedList<>()).push(metricName);
|
storeLevelMetrics.computeIfAbsent(key, ignored -> new LinkedList<>()).push(metricName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return metricName;
|
||||||
}
|
}
|
||||||
|
|
||||||
public final void removeAllStoreLevelSensorsAndMetrics(final String taskId,
|
public final void removeAllStoreLevelSensorsAndMetrics(final String taskId,
|
||||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Bytes;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.streams.KeyValue;
|
import org.apache.kafka.streams.KeyValue;
|
||||||
import org.apache.kafka.streams.errors.ProcessorStateException;
|
import org.apache.kafka.streams.errors.ProcessorStateException;
|
||||||
|
import org.apache.kafka.streams.internals.metrics.OpenIterators;
|
||||||
import org.apache.kafka.streams.kstream.internals.Change;
|
import org.apache.kafka.streams.kstream.internals.Change;
|
||||||
import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
|
import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
|
||||||
import org.apache.kafka.streams.processor.StateStore;
|
import org.apache.kafka.streams.processor.StateStore;
|
||||||
|
@ -48,14 +49,9 @@ import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
|
||||||
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
|
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.NavigableSet;
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.ConcurrentSkipListSet;
|
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
import static org.apache.kafka.common.utils.Utils.mkEntry;
|
import static org.apache.kafka.common.utils.Utils.mkEntry;
|
||||||
|
@ -96,8 +92,7 @@ public class MeteredKeyValueStore<K, V>
|
||||||
private StreamsMetricsImpl streamsMetrics;
|
private StreamsMetricsImpl streamsMetrics;
|
||||||
private TaskId taskId;
|
private TaskId taskId;
|
||||||
|
|
||||||
protected LongAdder numOpenIterators = new LongAdder();
|
protected OpenIterators openIterators;
|
||||||
protected NavigableSet<MeteredIterator> openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
|
|
||||||
|
|
||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
private final Map<Class, QueryHandler> queryHandlers =
|
private final Map<Class, QueryHandler> queryHandlers =
|
||||||
|
@ -153,13 +148,8 @@ public class MeteredKeyValueStore<K, V>
|
||||||
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
|
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
|
||||||
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
|
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
|
||||||
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
|
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
|
||||||
(config, now) -> numOpenIterators.sum());
|
(config, now) -> openIterators.sum());
|
||||||
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
|
openIterators = new OpenIterators(taskId, metricsScope, name(), streamsMetrics);
|
||||||
(config, now) -> {
|
|
||||||
final Iterator<MeteredIterator> openIteratorsIterator = openIterators.iterator();
|
|
||||||
return openIteratorsIterator.hasNext() ? openIteratorsIterator.next().startTimestamp() : null;
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Serde<V> prepareValueSerdeForStore(final Serde<V> valueSerde, final SerdeGetter getter) {
|
protected Serde<V> prepareValueSerdeForStore(final Serde<V> valueSerde, final SerdeGetter getter) {
|
||||||
|
@ -445,7 +435,6 @@ public class MeteredKeyValueStore<K, V>
|
||||||
this.sensor = sensor;
|
this.sensor = sensor;
|
||||||
this.startTimestamp = time.milliseconds();
|
this.startTimestamp = time.milliseconds();
|
||||||
this.startNs = time.nanoseconds();
|
this.startNs = time.nanoseconds();
|
||||||
numOpenIterators.increment();
|
|
||||||
openIterators.add(this);
|
openIterators.add(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -475,7 +464,6 @@ public class MeteredKeyValueStore<K, V>
|
||||||
final long duration = time.nanoseconds() - startNs;
|
final long duration = time.nanoseconds() - startNs;
|
||||||
sensor.record(duration);
|
sensor.record(duration);
|
||||||
iteratorDurationSensor.record(duration);
|
iteratorDurationSensor.record(duration);
|
||||||
numOpenIterators.decrement();
|
|
||||||
openIterators.remove(this);
|
openIterators.remove(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -502,7 +490,6 @@ public class MeteredKeyValueStore<K, V>
|
||||||
this.valueDeserializer = valueDeserializer;
|
this.valueDeserializer = valueDeserializer;
|
||||||
this.startTimestamp = time.milliseconds();
|
this.startTimestamp = time.milliseconds();
|
||||||
this.startNs = time.nanoseconds();
|
this.startNs = time.nanoseconds();
|
||||||
numOpenIterators.increment();
|
|
||||||
openIterators.add(this);
|
openIterators.add(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -532,7 +519,6 @@ public class MeteredKeyValueStore<K, V>
|
||||||
final long duration = time.nanoseconds() - startNs;
|
final long duration = time.nanoseconds() - startNs;
|
||||||
sensor.record(duration);
|
sensor.record(duration);
|
||||||
iteratorDurationSensor.record(duration);
|
iteratorDurationSensor.record(duration);
|
||||||
numOpenIterators.decrement();
|
|
||||||
openIterators.remove(this);
|
openIterators.remove(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,39 +18,34 @@ package org.apache.kafka.streams.state.internals;
|
||||||
|
|
||||||
import org.apache.kafka.common.metrics.Sensor;
|
import org.apache.kafka.common.metrics.Sensor;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
|
import org.apache.kafka.streams.internals.metrics.OpenIterators;
|
||||||
import org.apache.kafka.streams.state.VersionedRecord;
|
import org.apache.kafka.streams.state.VersionedRecord;
|
||||||
import org.apache.kafka.streams.state.VersionedRecordIterator;
|
import org.apache.kafka.streams.state.VersionedRecordIterator;
|
||||||
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
class MeteredMultiVersionedKeyQueryIterator<V> implements VersionedRecordIterator<V>, MeteredIterator {
|
class MeteredMultiVersionedKeyQueryIterator<V> implements VersionedRecordIterator<V>, MeteredIterator {
|
||||||
|
|
||||||
private final VersionedRecordIterator<byte[]> iterator;
|
private final VersionedRecordIterator<byte[]> iterator;
|
||||||
private final Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue;
|
private final Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue;
|
||||||
private final LongAdder numOpenIterators;
|
|
||||||
private final Sensor sensor;
|
private final Sensor sensor;
|
||||||
private final Time time;
|
private final Time time;
|
||||||
private final long startNs;
|
private final long startNs;
|
||||||
private final long startTimestampMs;
|
private final long startTimestampMs;
|
||||||
private final Set<MeteredIterator> openIterators;
|
private final OpenIterators openIterators;
|
||||||
|
|
||||||
public MeteredMultiVersionedKeyQueryIterator(final VersionedRecordIterator<byte[]> iterator,
|
public MeteredMultiVersionedKeyQueryIterator(final VersionedRecordIterator<byte[]> iterator,
|
||||||
final Sensor sensor,
|
final Sensor sensor,
|
||||||
final Time time,
|
final Time time,
|
||||||
final Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue,
|
final Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue,
|
||||||
final LongAdder numOpenIterators,
|
final OpenIterators openIterators) {
|
||||||
final Set<MeteredIterator> openIterators) {
|
|
||||||
this.iterator = iterator;
|
this.iterator = iterator;
|
||||||
this.deserializeValue = deserializeValue;
|
this.deserializeValue = deserializeValue;
|
||||||
this.numOpenIterators = numOpenIterators;
|
|
||||||
this.openIterators = openIterators;
|
this.openIterators = openIterators;
|
||||||
this.sensor = sensor;
|
this.sensor = sensor;
|
||||||
this.time = time;
|
this.time = time;
|
||||||
this.startNs = time.nanoseconds();
|
this.startNs = time.nanoseconds();
|
||||||
this.startTimestampMs = time.milliseconds();
|
this.startTimestampMs = time.milliseconds();
|
||||||
numOpenIterators.increment();
|
|
||||||
openIterators.add(this);
|
openIterators.add(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -65,7 +60,6 @@ class MeteredMultiVersionedKeyQueryIterator<V> implements VersionedRecordIterato
|
||||||
iterator.close();
|
iterator.close();
|
||||||
} finally {
|
} finally {
|
||||||
sensor.record(time.nanoseconds() - startNs);
|
sensor.record(time.nanoseconds() - startNs);
|
||||||
numOpenIterators.decrement();
|
|
||||||
openIterators.remove(this);
|
openIterators.remove(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -326,7 +326,6 @@ public class MeteredTimestampedKeyValueStore<K, V>
|
||||||
this.startNs = time.nanoseconds();
|
this.startNs = time.nanoseconds();
|
||||||
this.startTimestampMs = time.milliseconds();
|
this.startTimestampMs = time.milliseconds();
|
||||||
this.returnPlainValue = returnPlainValue;
|
this.returnPlainValue = returnPlainValue;
|
||||||
numOpenIterators.increment();
|
|
||||||
openIterators.add(this);
|
openIterators.add(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -360,7 +359,6 @@ public class MeteredTimestampedKeyValueStore<K, V>
|
||||||
final long duration = time.nanoseconds() - startNs;
|
final long duration = time.nanoseconds() - startNs;
|
||||||
sensor.record(duration);
|
sensor.record(duration);
|
||||||
iteratorDurationSensor.record(duration);
|
iteratorDurationSensor.record(duration);
|
||||||
numOpenIterators.decrement();
|
|
||||||
openIterators.remove(this);
|
openIterators.remove(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -269,7 +269,6 @@ public class MeteredVersionedKeyValueStore<K, V>
|
||||||
iteratorDurationSensor,
|
iteratorDurationSensor,
|
||||||
time,
|
time,
|
||||||
StoreQueryUtils.deserializeValue(plainValueSerdes),
|
StoreQueryUtils.deserializeValue(plainValueSerdes),
|
||||||
numOpenIterators,
|
|
||||||
openIterators
|
openIterators
|
||||||
);
|
);
|
||||||
final QueryResult<MeteredMultiVersionedKeyQueryIterator<V>> typedQueryResult =
|
final QueryResult<MeteredMultiVersionedKeyQueryIterator<V>> typedQueryResult =
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.streams.state.internals.metrics;
|
package org.apache.kafka.streams.state.internals.metrics;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.MetricName;
|
||||||
import org.apache.kafka.common.metrics.Gauge;
|
import org.apache.kafka.common.metrics.Gauge;
|
||||||
import org.apache.kafka.common.metrics.Sensor;
|
import org.apache.kafka.common.metrics.Sensor;
|
||||||
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
|
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
|
||||||
|
@ -455,12 +456,12 @@ public class StateStoreMetrics {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void addOldestOpenIteratorGauge(final String taskId,
|
public static MetricName addOldestOpenIteratorGauge(final String taskId,
|
||||||
final String storeType,
|
final String storeType,
|
||||||
final String storeName,
|
final String storeName,
|
||||||
final StreamsMetricsImpl streamsMetrics,
|
final StreamsMetricsImpl streamsMetrics,
|
||||||
final Gauge<Long> oldestOpenIteratorGauge) {
|
final Gauge<Long> oldestOpenIteratorGauge) {
|
||||||
streamsMetrics.addStoreLevelMutableMetric(
|
return streamsMetrics.addStoreLevelMutableMetric(
|
||||||
taskId,
|
taskId,
|
||||||
storeType,
|
storeType,
|
||||||
storeName,
|
storeName,
|
||||||
|
|
|
@ -525,15 +525,16 @@ public class MeteredKeyValueStoreTest {
|
||||||
when(inner.all()).thenReturn(KeyValueIterators.emptyIterator());
|
when(inner.all()).thenReturn(KeyValueIterators.emptyIterator());
|
||||||
init();
|
init();
|
||||||
|
|
||||||
final KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms");
|
KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms");
|
||||||
assertThat(oldestIteratorTimestampMetric, not(nullValue()));
|
assertThat(oldestIteratorTimestampMetric, nullValue());
|
||||||
|
|
||||||
assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue());
|
|
||||||
|
|
||||||
KeyValueIterator<String, String> second = null;
|
KeyValueIterator<String, String> second = null;
|
||||||
final long secondTimestamp;
|
final long secondTimestamp;
|
||||||
try {
|
try {
|
||||||
try (final KeyValueIterator<String, String> unused = metered.all()) {
|
try (final KeyValueIterator<String, String> unused = metered.all()) {
|
||||||
|
oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms");
|
||||||
|
assertThat(oldestIteratorTimestampMetric, not(nullValue()));
|
||||||
|
|
||||||
final long oldestTimestamp = mockTime.milliseconds();
|
final long oldestTimestamp = mockTime.milliseconds();
|
||||||
assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp));
|
assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp));
|
||||||
mockTime.sleep(100);
|
mockTime.sleep(100);
|
||||||
|
@ -553,7 +554,8 @@ public class MeteredKeyValueStoreTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assertThat((Integer) oldestIteratorTimestampMetric.metricValue(), nullValue());
|
oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms");
|
||||||
|
assertThat(oldestIteratorTimestampMetric, nullValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
private KafkaMetric metric(final MetricName metricName) {
|
private KafkaMetric metric(final MetricName metricName) {
|
||||||
|
|
|
@ -503,15 +503,16 @@ public class MeteredTimestampedKeyValueStoreTest {
|
||||||
when(inner.all()).thenReturn(KeyValueIterators.emptyIterator());
|
when(inner.all()).thenReturn(KeyValueIterators.emptyIterator());
|
||||||
init();
|
init();
|
||||||
|
|
||||||
final KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms");
|
KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms");
|
||||||
assertThat(oldestIteratorTimestampMetric, not(nullValue()));
|
assertThat(oldestIteratorTimestampMetric, nullValue());
|
||||||
|
|
||||||
assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue());
|
|
||||||
|
|
||||||
KeyValueIterator<String, ValueAndTimestamp<String>> second = null;
|
KeyValueIterator<String, ValueAndTimestamp<String>> second = null;
|
||||||
final long secondTimestamp;
|
final long secondTimestamp;
|
||||||
try {
|
try {
|
||||||
try (final KeyValueIterator<String, ValueAndTimestamp<String>> unused = metered.all()) {
|
try (final KeyValueIterator<String, ValueAndTimestamp<String>> unused = metered.all()) {
|
||||||
|
oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms");
|
||||||
|
assertThat(oldestIteratorTimestampMetric, not(nullValue()));
|
||||||
|
|
||||||
final long oldestTimestamp = mockTime.milliseconds();
|
final long oldestTimestamp = mockTime.milliseconds();
|
||||||
assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp));
|
assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp));
|
||||||
mockTime.sleep(100);
|
mockTime.sleep(100);
|
||||||
|
@ -531,6 +532,7 @@ public class MeteredTimestampedKeyValueStoreTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assertThat((Integer) oldestIteratorTimestampMetric.metricValue(), nullValue());
|
oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms");
|
||||||
|
assertThat(oldestIteratorTimestampMetric, nullValue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -426,16 +426,17 @@ public class MeteredVersionedKeyValueStoreTest {
|
||||||
when(inner.query(any(), any(), any())).thenReturn(
|
when(inner.query(any(), any(), any())).thenReturn(
|
||||||
QueryResult.forResult(new LogicalSegmentIterator(Collections.emptyListIterator(), RAW_KEY, 0L, 0L, ResultOrder.ANY)));
|
QueryResult.forResult(new LogicalSegmentIterator(Collections.emptyListIterator(), RAW_KEY, 0L, 0L, ResultOrder.ANY)));
|
||||||
|
|
||||||
final KafkaMetric oldestIteratorTimestampMetric = getMetric("oldest-iterator-open-since-ms");
|
KafkaMetric oldestIteratorTimestampMetric = getMetric("oldest-iterator-open-since-ms");
|
||||||
assertThat(oldestIteratorTimestampMetric, not(nullValue()));
|
assertThat(oldestIteratorTimestampMetric, nullValue());
|
||||||
|
|
||||||
assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue());
|
|
||||||
|
|
||||||
final QueryResult<VersionedRecordIterator<String>> first = store.query(query, bound, config);
|
final QueryResult<VersionedRecordIterator<String>> first = store.query(query, bound, config);
|
||||||
VersionedRecordIterator<String> secondIterator = null;
|
VersionedRecordIterator<String> secondIterator = null;
|
||||||
final long secondTime;
|
final long secondTime;
|
||||||
try {
|
try {
|
||||||
try (final VersionedRecordIterator<String> unused = first.getResult()) {
|
try (final VersionedRecordIterator<String> unused = first.getResult()) {
|
||||||
|
oldestIteratorTimestampMetric = getMetric("oldest-iterator-open-since-ms");
|
||||||
|
assertThat(oldestIteratorTimestampMetric, not(nullValue()));
|
||||||
|
|
||||||
final long oldestTimestamp = mockTime.milliseconds();
|
final long oldestTimestamp = mockTime.milliseconds();
|
||||||
assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp));
|
assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp));
|
||||||
mockTime.sleep(100);
|
mockTime.sleep(100);
|
||||||
|
@ -457,7 +458,8 @@ public class MeteredVersionedKeyValueStoreTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assertThat((Integer) oldestIteratorTimestampMetric.metricValue(), nullValue());
|
oldestIteratorTimestampMetric = getMetric("oldest-iterator-open-since-ms");
|
||||||
|
assertThat(oldestIteratorTimestampMetric, nullValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
private KafkaMetric getMetric(final String name) {
|
private KafkaMetric getMetric(final String name) {
|
||||||
|
|
Loading…
Reference in New Issue