KAFKA-15541: Add num-open-iterators metric (#15975)

Part of [KIP-989](https://cwiki.apache.org/confluence/x/9KCzDw).

This new `StateStore` metric tracks the number of `Iterator` instances
that have been created, but not yet closed (via `AutoCloseable#close()`).

This will aid users in detecting leaked iterators, which can cause major
performance problems.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Nick Telford 2024-05-22 07:29:50 +01:00 committed by GitHub
parent 271c04bd17
commit 5552f5c26d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 214 additions and 26 deletions

View File

@ -52,6 +52,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import static org.apache.kafka.common.utils.Utils.mkEntry;
@ -93,6 +94,8 @@ public class MeteredKeyValueStore<K, V>
private StreamsMetricsImpl streamsMetrics;
private TaskId taskId;
protected AtomicInteger numOpenIterators = new AtomicInteger(0);
@SuppressWarnings("rawtypes")
private final Map<Class, QueryHandler> queryHandlers =
mkMap(
@ -162,6 +165,8 @@ public class MeteredKeyValueStore<K, V>
flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
deleteSensor = StateStoreMetrics.deleteSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.get());
}
protected Serde<V> prepareValueSerdeForStore(final Serde<V> valueSerde, final SerdeGetter getter) {
@ -460,6 +465,7 @@ public class MeteredKeyValueStore<K, V>
this.iter = iter;
this.sensor = sensor;
this.startNs = time.nanoseconds();
numOpenIterators.incrementAndGet();
}
@Override
@ -481,6 +487,7 @@ public class MeteredKeyValueStore<K, V>
iter.close();
} finally {
sensor.record(time.nanoseconds() - startNs);
numOpenIterators.decrementAndGet();
}
}
@ -504,6 +511,7 @@ public class MeteredKeyValueStore<K, V>
this.sensor = sensor;
this.valueDeserializer = valueDeserializer;
this.startNs = time.nanoseconds();
numOpenIterators.incrementAndGet();
}
@Override
@ -525,6 +533,7 @@ public class MeteredKeyValueStore<K, V>
iter.close();
} finally {
sensor.record(time.nanoseconds() - startNs);
numOpenIterators.decrementAndGet();
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.kafka.streams.state.VersionedRecordIterator;
import org.apache.kafka.streams.state.VersionedRecord;
@ -24,18 +25,25 @@ public class MeteredMultiVersionedKeyQueryIterator<V> implements VersionedRecord
private final VersionedRecordIterator<byte[]> iterator;
private final Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue;
private final AtomicInteger numOpenIterators;
public MeteredMultiVersionedKeyQueryIterator(final VersionedRecordIterator<byte[]> iterator,
final Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue) {
final Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue,
final AtomicInteger numOpenIterators) {
this.iterator = iterator;
this.deserializeValue = deserializeValue;
this.numOpenIterators = numOpenIterators;
numOpenIterators.incrementAndGet();
}
@Override
public void close() {
try {
iterator.close();
} finally {
numOpenIterators.decrementAndGet();
}
}
@Override

View File

@ -47,6 +47,7 @@ import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
@ -70,6 +71,8 @@ public class MeteredSessionStore<K, V>
private InternalProcessorContext<?, ?> context;
private TaskId taskId;
private AtomicInteger numOpenIterators = new AtomicInteger(0);
@SuppressWarnings("rawtypes")
private final Map<Class, QueryHandler> queryHandlers =
mkMap(
@ -131,6 +134,8 @@ public class MeteredSessionStore<K, V>
flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
removeSensor = StateStoreMetrics.removeSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.get());
}
@ -248,7 +253,8 @@ public class MeteredSessionStore<K, V>
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time);
time,
numOpenIterators);
}
@Override
@ -260,7 +266,8 @@ public class MeteredSessionStore<K, V>
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time
time,
numOpenIterators
);
}
@ -273,7 +280,8 @@ public class MeteredSessionStore<K, V>
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time);
time,
numOpenIterators);
}
@Override
@ -285,7 +293,8 @@ public class MeteredSessionStore<K, V>
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time
time,
numOpenIterators
);
}
@ -304,7 +313,8 @@ public class MeteredSessionStore<K, V>
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time);
time,
numOpenIterators);
}
@Override
@ -323,7 +333,8 @@ public class MeteredSessionStore<K, V>
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time
time,
numOpenIterators
);
}
@ -344,7 +355,8 @@ public class MeteredSessionStore<K, V>
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time);
time,
numOpenIterators);
}
@Override
@ -356,7 +368,8 @@ public class MeteredSessionStore<K, V>
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time);
time,
numOpenIterators);
}
@Override
@ -377,7 +390,8 @@ public class MeteredSessionStore<K, V>
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time
time,
numOpenIterators
);
}
@ -447,7 +461,8 @@ public class MeteredSessionStore<K, V>
streamsMetrics,
serdes::keyFrom,
StoreQueryUtils.getDeserializeValue(serdes, wrapped()),
time
time,
numOpenIterators
);
final QueryResult<MeteredWindowedKeyValueIterator<K, V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult);

View File

@ -325,6 +325,7 @@ public class MeteredTimestampedKeyValueStore<K, V>
this.valueAndTimestampDeserializer = valueAndTimestampDeserializer;
this.startNs = time.nanoseconds();
this.returnPlainValue = returnPlainValue;
numOpenIterators.incrementAndGet();
}
@Override
@ -350,6 +351,7 @@ public class MeteredTimestampedKeyValueStore<K, V>
iter.close();
} finally {
sensor.record(time.nanoseconds() - startNs);
numOpenIterators.decrementAndGet();
}
}

View File

@ -264,7 +264,7 @@ public class MeteredVersionedKeyValueStore<K, V>
final QueryResult<VersionedRecordIterator<byte[]>> rawResult = wrapped().query(rawKeyQuery, positionBound, config);
if (rawResult.isSuccess()) {
final MeteredMultiVersionedKeyQueryIterator<V> typedResult =
new MeteredMultiVersionedKeyQueryIterator<V>(rawResult.getResult(), StoreQueryUtils.getDeserializeValue(plainValueSerdes));
new MeteredMultiVersionedKeyQueryIterator<V>(rawResult.getResult(), StoreQueryUtils.getDeserializeValue(plainValueSerdes), numOpenIterators);
final QueryResult<MeteredMultiVersionedKeyQueryIterator<V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult);
result = (QueryResult<R>) typedQueryResult;

View File

@ -49,6 +49,7 @@ import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
@ -74,6 +75,8 @@ public class MeteredWindowStore<K, V>
private InternalProcessorContext<?, ?> context;
private TaskId taskId;
private AtomicInteger numOpenIterators = new AtomicInteger(0);
@SuppressWarnings("rawtypes")
private final Map<Class, QueryHandler> queryHandlers =
mkMap(
@ -150,6 +153,8 @@ public class MeteredWindowStore<K, V>
fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.get());
}
@Deprecated
@ -236,7 +241,8 @@ public class MeteredWindowStore<K, V>
fetchSensor,
streamsMetrics,
serdes::valueFrom,
time
time,
numOpenIterators
);
}
@ -250,7 +256,8 @@ public class MeteredWindowStore<K, V>
fetchSensor,
streamsMetrics,
serdes::valueFrom,
time
time,
numOpenIterators
);
}
@ -269,7 +276,8 @@ public class MeteredWindowStore<K, V>
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time);
time,
numOpenIterators);
}
@Override
@ -287,7 +295,8 @@ public class MeteredWindowStore<K, V>
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time);
time,
numOpenIterators);
}
@Override
@ -299,7 +308,8 @@ public class MeteredWindowStore<K, V>
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time);
time,
numOpenIterators);
}
@Override
@ -311,7 +321,8 @@ public class MeteredWindowStore<K, V>
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time);
time,
numOpenIterators);
}
@Override
@ -322,7 +333,8 @@ public class MeteredWindowStore<K, V>
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time
time,
numOpenIterators
);
}
@ -334,7 +346,8 @@ public class MeteredWindowStore<K, V>
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time
time,
numOpenIterators
);
}
@ -410,7 +423,8 @@ public class MeteredWindowStore<K, V>
streamsMetrics,
serdes::keyFrom,
getDeserializeValue(serdes, wrapped()),
time
time,
numOpenIterators
);
final QueryResult<MeteredWindowedKeyValueIterator<K, V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult);
@ -459,7 +473,8 @@ public class MeteredWindowStore<K, V>
fetchSensor,
streamsMetrics,
getDeserializeValue(serdes, wrapped()),
time
time,
numOpenIterators
);
final QueryResult<MeteredWindowStoreIterator<V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult);

View File

@ -22,6 +22,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.state.WindowStoreIterator;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
class MeteredWindowStoreIterator<V> implements WindowStoreIterator<V> {
@ -32,18 +33,22 @@ class MeteredWindowStoreIterator<V> implements WindowStoreIterator<V> {
private final Function<byte[], V> valueFrom;
private final long startNs;
private final Time time;
private final AtomicInteger numOpenIterators;
MeteredWindowStoreIterator(final WindowStoreIterator<byte[]> iter,
final Sensor sensor,
final StreamsMetrics metrics,
final Function<byte[], V> valueFrom,
final Time time) {
final Time time,
final AtomicInteger numOpenIterators) {
this.iter = iter;
this.sensor = sensor;
this.metrics = metrics;
this.valueFrom = valueFrom;
this.startNs = time.nanoseconds();
this.time = time;
this.numOpenIterators = numOpenIterators;
numOpenIterators.incrementAndGet();
}
@Override
@ -63,6 +68,7 @@ class MeteredWindowStoreIterator<V> implements WindowStoreIterator<V> {
iter.close();
} finally {
sensor.record(time.nanoseconds() - startNs);
numOpenIterators.decrementAndGet();
}
}

View File

@ -24,6 +24,7 @@ import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.KeyValueIterator;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
class MeteredWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed<K>, V> {
@ -35,13 +36,15 @@ class MeteredWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed
private final Function<byte[], V> deserializeValue;
private final long startNs;
private final Time time;
private final AtomicInteger numOpenIterators;
MeteredWindowedKeyValueIterator(final KeyValueIterator<Windowed<Bytes>, byte[]> iter,
final Sensor sensor,
final StreamsMetrics metrics,
final Function<byte[], K> deserializeKey,
final Function<byte[], V> deserializeValue,
final Time time) {
final Time time,
final AtomicInteger numOpenIterators) {
this.iter = iter;
this.sensor = sensor;
this.metrics = metrics;
@ -49,6 +52,8 @@ class MeteredWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed
this.deserializeValue = deserializeValue;
this.startNs = time.nanoseconds();
this.time = time;
this.numOpenIterators = numOpenIterators;
numOpenIterators.incrementAndGet();
}
@Override
@ -73,6 +78,7 @@ class MeteredWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed
iter.close();
} finally {
sensor.record(time.nanoseconds() - startNs);
numOpenIterators.decrementAndGet();
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals.metrics;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@ -144,6 +145,10 @@ public class StateStoreMetrics {
private static final String SUPPRESSION_BUFFER_SIZE_MAX_DESCRIPTION =
MAX_DESCRIPTION_PREFIX + SUPPRESSION_BUFFER_SIZE_DESCRIPTION;
private static final String NUM_OPEN_ITERATORS = "num-open-iterators";
private static final String NUM_OPEN_ITERATORS_DESCRIPTION =
"The current number of iterators on the store that have been created, but not yet closed";
public static Sensor putSensor(final String taskId,
final String storeType,
final String storeName,
@ -404,6 +409,23 @@ public class StateStoreMetrics {
return sensor;
}
public static void addNumOpenIteratorsGauge(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics,
final Gauge<Integer> numOpenIteratorsGauge) {
streamsMetrics.addStoreLevelMutableMetric(
taskId,
storeType,
storeName,
NUM_OPEN_ITERATORS,
NUM_OPEN_ITERATORS_DESCRIPTION,
RecordingLevel.INFO,
numOpenIteratorsGauge
);
}
private static Sensor sizeOrCountSensor(final String taskId,
final String storeType,
final String storeName,

View File

@ -56,6 +56,7 @@ import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThan;
@ -439,6 +440,24 @@ public class MeteredKeyValueStoreTest {
assertTrue((Double) metric.metricValue() > 0);
}
@Test
public void shouldTrackOpenIteratorsMetric() {
final StringSerializer stringSerializer = new StringSerializer();
when(inner.prefixScan(KEY, stringSerializer)).thenReturn(KeyValueIterators.emptyIterator());
init();
final KafkaMetric openIteratorsMetric = metric("num-open-iterators");
assertThat(openIteratorsMetric, not(nullValue()));
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
try (final KeyValueIterator<String, String> iterator = metered.prefixScan(KEY, stringSerializer)) {
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1));
}
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
}
private KafkaMetric metric(final MetricName metricName) {
return this.metrics.metric(metricName);
}

View File

@ -58,6 +58,7 @@ import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.not;
@ -604,6 +605,23 @@ public class MeteredSessionStoreTest {
assertThat(storeMetrics(), empty());
}
@Test
public void shouldTrackOpenIteratorsMetric() {
when(innerStore.backwardFetch(KEY_BYTES)).thenReturn(KeyValueIterators.emptyIterator());
init();
final KafkaMetric openIteratorsMetric = metric("num-open-iterators");
assertThat(openIteratorsMetric, not(nullValue()));
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
try (final KeyValueIterator<Windowed<String>, String> iterator = store.backwardFetch(KEY)) {
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1));
}
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
}
private KafkaMetric metric(final String name) {
return this.metrics.metric(new MetricName(name, STORE_LEVEL_GROUP, "", this.tags));
}

View File

@ -57,6 +57,8 @@ import java.util.Map;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@ -437,4 +439,21 @@ public class MeteredTimestampedKeyValueStoreTest {
throw exception;
}
}
@Test
public void shouldTrackOpenIteratorsMetric() {
when(inner.all()).thenReturn(KeyValueIterators.emptyIterator());
init();
final KafkaMetric openIteratorsMetric = metric("num-open-iterators");
assertThat(openIteratorsMetric, not(nullValue()));
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
try (final KeyValueIterator<String, ValueAndTimestamp<String>> iterator = metered.all()) {
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1));
}
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
}
}

View File

@ -19,6 +19,8 @@ package org.apache.kafka.streams.state.internals;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThan;
@ -26,6 +28,7 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@ -33,6 +36,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@ -62,9 +66,11 @@ import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.RangeQuery;
import org.apache.kafka.streams.query.ResultOrder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.VersionedBytesStore;
import org.apache.kafka.streams.state.VersionedRecord;
import org.apache.kafka.streams.state.VersionedRecordIterator;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -362,6 +368,28 @@ public class MeteredVersionedKeyValueStoreTest {
assertThat(store.getPosition(), is(position));
}
@Test
public void shouldTrackOpenIteratorsMetric() {
final MultiVersionedKeyQuery<String, String> query = MultiVersionedKeyQuery.withKey(KEY);
final PositionBound bound = PositionBound.unbounded();
final QueryConfig config = new QueryConfig(false);
when(inner.query(any(), any(), any())).thenReturn(
QueryResult.forResult(new LogicalSegmentIterator(Collections.emptyListIterator(), RAW_KEY, 0L, 0L, ResultOrder.ANY)));
final KafkaMetric openIteratorsMetric = getMetric("num-open-iterators");
assertThat(openIteratorsMetric, not(nullValue()));
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
final QueryResult<VersionedRecordIterator<String>> result = store.query(query, bound, config);
try (final VersionedRecordIterator<String> iterator = result.getResult()) {
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1));
}
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
}
private KafkaMetric getMetric(final String name) {
return metrics.metric(new MetricName(name, STORE_LEVEL_GROUP, "", tags));
}

View File

@ -33,10 +33,12 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
@ -55,6 +57,8 @@ import java.util.stream.Collectors;
import static java.time.Instant.ofEpochMilli;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThan;
@ -442,6 +446,23 @@ public class MeteredWindowStoreTest {
assertThrows(NullPointerException.class, () -> store.backwardFetch(null, 0L, 1L));
}
@Test
public void shouldTrackOpenIteratorsMetric() {
when(innerStoreMock.all()).thenReturn(KeyValueIterators.emptyIterator());
store.init((StateStoreContext) context, store);
final KafkaMetric openIteratorsMetric = metric("num-open-iterators");
assertThat(openIteratorsMetric, not(nullValue()));
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
try (final KeyValueIterator<Windowed<String>, String> iterator = store.all()) {
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1));
}
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
}
private KafkaMetric metric(final String name) {
return metrics.metric(new MetricName(name, STORE_LEVEL_GROUP, "", tags));
}