From 5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c Mon Sep 17 00:00:00 2001 From: Nick Telford Date: Wed, 22 May 2024 07:29:50 +0100 Subject: [PATCH] KAFKA-15541: Add num-open-iterators metric (#15975) Part of [KIP-989](https://cwiki.apache.org/confluence/x/9KCzDw). This new `StateStore` metric tracks the number of `Iterator` instances that have been created, but not yet closed (via `AutoCloseable#close()`). This will aid users in detecting leaked iterators, which can cause major performance problems. Reviewers: Lucas Brutschy , Matthias J. Sax --- .../state/internals/MeteredKeyValueStore.java | 9 +++++ ...MeteredMultiVersionedKeyQueryIterator.java | 14 ++++++-- .../state/internals/MeteredSessionStore.java | 35 +++++++++++++------ .../MeteredTimestampedKeyValueStore.java | 2 ++ .../MeteredVersionedKeyValueStore.java | 2 +- .../state/internals/MeteredWindowStore.java | 35 +++++++++++++------ .../internals/MeteredWindowStoreIterator.java | 8 ++++- .../MeteredWindowedKeyValueIterator.java | 8 ++++- .../internals/metrics/StateStoreMetrics.java | 22 ++++++++++++ .../internals/MeteredKeyValueStoreTest.java | 19 ++++++++++ .../internals/MeteredSessionStoreTest.java | 18 ++++++++++ .../MeteredTimestampedKeyValueStoreTest.java | 19 ++++++++++ .../MeteredVersionedKeyValueStoreTest.java | 28 +++++++++++++++ .../internals/MeteredWindowStoreTest.java | 21 +++++++++++ 14 files changed, 214 insertions(+), 26 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 57fd8cb15fe..88b49cd2180 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -52,6 +52,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import static org.apache.kafka.common.utils.Utils.mkEntry; @@ -93,6 +94,8 @@ public class MeteredKeyValueStore private StreamsMetricsImpl streamsMetrics; private TaskId taskId; + protected AtomicInteger numOpenIterators = new AtomicInteger(0); + @SuppressWarnings("rawtypes") private final Map queryHandlers = mkMap( @@ -162,6 +165,8 @@ public class MeteredKeyValueStore flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics); deleteSensor = StateStoreMetrics.deleteSensor(taskId.toString(), metricsScope, name(), streamsMetrics); e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics); + StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics, + (config, now) -> numOpenIterators.get()); } protected Serde prepareValueSerdeForStore(final Serde valueSerde, final SerdeGetter getter) { @@ -460,6 +465,7 @@ public class MeteredKeyValueStore this.iter = iter; this.sensor = sensor; this.startNs = time.nanoseconds(); + numOpenIterators.incrementAndGet(); } @Override @@ -481,6 +487,7 @@ public class MeteredKeyValueStore iter.close(); } finally { sensor.record(time.nanoseconds() - startNs); + numOpenIterators.decrementAndGet(); } } @@ -504,6 +511,7 @@ public class MeteredKeyValueStore this.sensor = sensor; this.valueDeserializer = valueDeserializer; this.startNs = time.nanoseconds(); + numOpenIterators.incrementAndGet(); } @Override @@ -525,6 +533,7 @@ public class MeteredKeyValueStore iter.close(); } finally { sensor.record(time.nanoseconds() - startNs); + numOpenIterators.decrementAndGet(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java index 47f98d933c2..be695501caf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import org.apache.kafka.streams.state.VersionedRecordIterator; import org.apache.kafka.streams.state.VersionedRecord; @@ -24,18 +25,25 @@ public class MeteredMultiVersionedKeyQueryIterator implements VersionedRecord private final VersionedRecordIterator iterator; private final Function, VersionedRecord> deserializeValue; - + private final AtomicInteger numOpenIterators; public MeteredMultiVersionedKeyQueryIterator(final VersionedRecordIterator iterator, - final Function, VersionedRecord> deserializeValue) { + final Function, VersionedRecord> deserializeValue, + final AtomicInteger numOpenIterators) { this.iterator = iterator; this.deserializeValue = deserializeValue; + this.numOpenIterators = numOpenIterators; + numOpenIterators.incrementAndGet(); } @Override public void close() { - iterator.close(); + try { + iterator.close(); + } finally { + numOpenIterators.decrementAndGet(); + } } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index b697a2602f6..4bcbb483a31 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -47,6 +47,7 @@ import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; @@ -70,6 +71,8 @@ public class MeteredSessionStore private InternalProcessorContext context; private TaskId taskId; + private AtomicInteger numOpenIterators = new AtomicInteger(0); + @SuppressWarnings("rawtypes") private final Map queryHandlers = mkMap( @@ -131,6 +134,8 @@ public class MeteredSessionStore flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics); removeSensor = StateStoreMetrics.removeSensor(taskId.toString(), metricsScope, name(), streamsMetrics); e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics); + StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics, + (config, now) -> numOpenIterators.get()); } @@ -248,7 +253,8 @@ public class MeteredSessionStore streamsMetrics, serdes::keyFrom, serdes::valueFrom, - time); + time, + numOpenIterators); } @Override @@ -260,7 +266,8 @@ public class MeteredSessionStore streamsMetrics, serdes::keyFrom, serdes::valueFrom, - time + time, + numOpenIterators ); } @@ -273,7 +280,8 @@ public class MeteredSessionStore streamsMetrics, serdes::keyFrom, serdes::valueFrom, - time); + time, + numOpenIterators); } @Override @@ -285,7 +293,8 @@ public class MeteredSessionStore streamsMetrics, serdes::keyFrom, serdes::valueFrom, - time + time, + numOpenIterators ); } @@ -304,7 +313,8 @@ public class MeteredSessionStore streamsMetrics, serdes::keyFrom, serdes::valueFrom, - time); + time, + numOpenIterators); } @Override @@ -323,7 +333,8 @@ public class MeteredSessionStore streamsMetrics, serdes::keyFrom, serdes::valueFrom, - time + time, + numOpenIterators ); } @@ -344,7 +355,8 @@ public class MeteredSessionStore streamsMetrics, serdes::keyFrom, serdes::valueFrom, - time); + time, + numOpenIterators); } @Override @@ -356,7 +368,8 @@ public class MeteredSessionStore streamsMetrics, serdes::keyFrom, serdes::valueFrom, - time); + time, + numOpenIterators); } @Override @@ -377,7 +390,8 @@ public class MeteredSessionStore streamsMetrics, serdes::keyFrom, serdes::valueFrom, - time + time, + numOpenIterators ); } @@ -447,7 +461,8 @@ public class MeteredSessionStore streamsMetrics, serdes::keyFrom, StoreQueryUtils.getDeserializeValue(serdes, wrapped()), - time + time, + numOpenIterators ); final QueryResult> typedQueryResult = InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java index b5c774fef3c..d0fcb0cf0ed 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java @@ -325,6 +325,7 @@ public class MeteredTimestampedKeyValueStore this.valueAndTimestampDeserializer = valueAndTimestampDeserializer; this.startNs = time.nanoseconds(); this.returnPlainValue = returnPlainValue; + numOpenIterators.incrementAndGet(); } @Override @@ -350,6 +351,7 @@ public class MeteredTimestampedKeyValueStore iter.close(); } finally { sensor.record(time.nanoseconds() - startNs); + numOpenIterators.decrementAndGet(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java index eaf1e20c5fe..0c929308d84 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java @@ -264,7 +264,7 @@ public class MeteredVersionedKeyValueStore final QueryResult> rawResult = wrapped().query(rawKeyQuery, positionBound, config); if (rawResult.isSuccess()) { final MeteredMultiVersionedKeyQueryIterator typedResult = - new MeteredMultiVersionedKeyQueryIterator(rawResult.getResult(), StoreQueryUtils.getDeserializeValue(plainValueSerdes)); + new MeteredMultiVersionedKeyQueryIterator(rawResult.getResult(), StoreQueryUtils.getDeserializeValue(plainValueSerdes), numOpenIterators); final QueryResult> typedQueryResult = InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult); result = (QueryResult) typedQueryResult; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 12284b48d5a..2d63e3ca7c5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -49,6 +49,7 @@ import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; @@ -74,6 +75,8 @@ public class MeteredWindowStore private InternalProcessorContext context; private TaskId taskId; + private AtomicInteger numOpenIterators = new AtomicInteger(0); + @SuppressWarnings("rawtypes") private final Map queryHandlers = mkMap( @@ -150,6 +153,8 @@ public class MeteredWindowStore fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(), metricsScope, name(), streamsMetrics); flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics); e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics); + StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics, + (config, now) -> numOpenIterators.get()); } @Deprecated @@ -236,7 +241,8 @@ public class MeteredWindowStore fetchSensor, streamsMetrics, serdes::valueFrom, - time + time, + numOpenIterators ); } @@ -250,7 +256,8 @@ public class MeteredWindowStore fetchSensor, streamsMetrics, serdes::valueFrom, - time + time, + numOpenIterators ); } @@ -269,7 +276,8 @@ public class MeteredWindowStore streamsMetrics, serdes::keyFrom, serdes::valueFrom, - time); + time, + numOpenIterators); } @Override @@ -287,7 +295,8 @@ public class MeteredWindowStore streamsMetrics, serdes::keyFrom, serdes::valueFrom, - time); + time, + numOpenIterators); } @Override @@ -299,7 +308,8 @@ public class MeteredWindowStore streamsMetrics, serdes::keyFrom, serdes::valueFrom, - time); + time, + numOpenIterators); } @Override @@ -311,7 +321,8 @@ public class MeteredWindowStore streamsMetrics, serdes::keyFrom, serdes::valueFrom, - time); + time, + numOpenIterators); } @Override @@ -322,7 +333,8 @@ public class MeteredWindowStore streamsMetrics, serdes::keyFrom, serdes::valueFrom, - time + time, + numOpenIterators ); } @@ -334,7 +346,8 @@ public class MeteredWindowStore streamsMetrics, serdes::keyFrom, serdes::valueFrom, - time + time, + numOpenIterators ); } @@ -410,7 +423,8 @@ public class MeteredWindowStore streamsMetrics, serdes::keyFrom, getDeserializeValue(serdes, wrapped()), - time + time, + numOpenIterators ); final QueryResult> typedQueryResult = InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult); @@ -459,7 +473,8 @@ public class MeteredWindowStore fetchSensor, streamsMetrics, getDeserializeValue(serdes, wrapped()), - time + time, + numOpenIterators ); final QueryResult> typedQueryResult = InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java index b368b7b8f89..7f5cf99c025 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.state.WindowStoreIterator; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; class MeteredWindowStoreIterator implements WindowStoreIterator { @@ -32,18 +33,22 @@ class MeteredWindowStoreIterator implements WindowStoreIterator { private final Function valueFrom; private final long startNs; private final Time time; + private final AtomicInteger numOpenIterators; MeteredWindowStoreIterator(final WindowStoreIterator iter, final Sensor sensor, final StreamsMetrics metrics, final Function valueFrom, - final Time time) { + final Time time, + final AtomicInteger numOpenIterators) { this.iter = iter; this.sensor = sensor; this.metrics = metrics; this.valueFrom = valueFrom; this.startNs = time.nanoseconds(); this.time = time; + this.numOpenIterators = numOpenIterators; + numOpenIterators.incrementAndGet(); } @Override @@ -63,6 +68,7 @@ class MeteredWindowStoreIterator implements WindowStoreIterator { iter.close(); } finally { sensor.record(time.nanoseconds() - startNs); + numOpenIterators.decrementAndGet(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java index 68093801a29..a9354c863c8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.KeyValueIterator; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; class MeteredWindowedKeyValueIterator implements KeyValueIterator, V> { @@ -35,13 +36,15 @@ class MeteredWindowedKeyValueIterator implements KeyValueIterator deserializeValue; private final long startNs; private final Time time; + private final AtomicInteger numOpenIterators; MeteredWindowedKeyValueIterator(final KeyValueIterator, byte[]> iter, final Sensor sensor, final StreamsMetrics metrics, final Function deserializeKey, final Function deserializeValue, - final Time time) { + final Time time, + final AtomicInteger numOpenIterators) { this.iter = iter; this.sensor = sensor; this.metrics = metrics; @@ -49,6 +52,8 @@ class MeteredWindowedKeyValueIterator implements KeyValueIterator implements KeyValueIterator numOpenIteratorsGauge) { + streamsMetrics.addStoreLevelMutableMetric( + taskId, + storeType, + storeName, + NUM_OPEN_ITERATORS, + NUM_OPEN_ITERATORS_DESCRIPTION, + RecordingLevel.INFO, + numOpenIteratorsGauge + ); + + } + private static Sensor sizeOrCountSensor(final String taskId, final String storeType, final String storeName, diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java index 4482363de63..00151c67981 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java @@ -56,6 +56,7 @@ import java.util.stream.Collectors; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.greaterThan; @@ -439,6 +440,24 @@ public class MeteredKeyValueStoreTest { assertTrue((Double) metric.metricValue() > 0); } + @Test + public void shouldTrackOpenIteratorsMetric() { + final StringSerializer stringSerializer = new StringSerializer(); + when(inner.prefixScan(KEY, stringSerializer)).thenReturn(KeyValueIterators.emptyIterator()); + init(); + + final KafkaMetric openIteratorsMetric = metric("num-open-iterators"); + assertThat(openIteratorsMetric, not(nullValue())); + + assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); + + try (final KeyValueIterator iterator = metered.prefixScan(KEY, stringSerializer)) { + assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1)); + } + + assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); + } + private KafkaMetric metric(final MetricName metricName) { return this.metrics.metric(metricName); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java index f1c64c2d4fb..b7c99032f89 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java @@ -58,6 +58,7 @@ import java.util.stream.Collectors; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.not; @@ -604,6 +605,23 @@ public class MeteredSessionStoreTest { assertThat(storeMetrics(), empty()); } + @Test + public void shouldTrackOpenIteratorsMetric() { + when(innerStore.backwardFetch(KEY_BYTES)).thenReturn(KeyValueIterators.emptyIterator()); + init(); + + final KafkaMetric openIteratorsMetric = metric("num-open-iterators"); + assertThat(openIteratorsMetric, not(nullValue())); + + assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); + + try (final KeyValueIterator, String> iterator = store.backwardFetch(KEY)) { + assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1)); + } + + assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); + } + private KafkaMetric metric(final String name) { return this.metrics.metric(new MetricName(name, STORE_LEVEL_GROUP, "", this.tags)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java index b2febbd517c..f629acfc9a8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java @@ -57,6 +57,8 @@ import java.util.Map; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -437,4 +439,21 @@ public class MeteredTimestampedKeyValueStoreTest { throw exception; } } + + @Test + public void shouldTrackOpenIteratorsMetric() { + when(inner.all()).thenReturn(KeyValueIterators.emptyIterator()); + init(); + + final KafkaMetric openIteratorsMetric = metric("num-open-iterators"); + assertThat(openIteratorsMetric, not(nullValue())); + + assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); + + try (final KeyValueIterator> iterator = metered.all()) { + assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1)); + } + + assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java index b9d711abab5..99f76839041 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java @@ -19,6 +19,8 @@ package org.apache.kafka.streams.state.internals; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.greaterThan; @@ -26,6 +28,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -33,6 +36,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.time.Instant; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -62,9 +66,11 @@ import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryConfig; import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.query.RangeQuery; +import org.apache.kafka.streams.query.ResultOrder; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.VersionedBytesStore; import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.VersionedRecordIterator; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -362,6 +368,28 @@ public class MeteredVersionedKeyValueStoreTest { assertThat(store.getPosition(), is(position)); } + @Test + public void shouldTrackOpenIteratorsMetric() { + final MultiVersionedKeyQuery query = MultiVersionedKeyQuery.withKey(KEY); + final PositionBound bound = PositionBound.unbounded(); + final QueryConfig config = new QueryConfig(false); + when(inner.query(any(), any(), any())).thenReturn( + QueryResult.forResult(new LogicalSegmentIterator(Collections.emptyListIterator(), RAW_KEY, 0L, 0L, ResultOrder.ANY))); + + final KafkaMetric openIteratorsMetric = getMetric("num-open-iterators"); + assertThat(openIteratorsMetric, not(nullValue())); + + assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); + + final QueryResult> result = store.query(query, bound, config); + + try (final VersionedRecordIterator iterator = result.getResult()) { + assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1)); + } + + assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); + } + private KafkaMetric getMetric(final String name) { return metrics.metric(new MetricName(name, STORE_LEVEL_GROUP, "", tags)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index 952a52dbff2..3997a5e549e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -33,10 +33,12 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockRecordCollector; @@ -55,6 +57,8 @@ import java.util.stream.Collectors; import static java.time.Instant.ofEpochMilli; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.greaterThan; @@ -442,6 +446,23 @@ public class MeteredWindowStoreTest { assertThrows(NullPointerException.class, () -> store.backwardFetch(null, 0L, 1L)); } + @Test + public void shouldTrackOpenIteratorsMetric() { + when(innerStoreMock.all()).thenReturn(KeyValueIterators.emptyIterator()); + store.init((StateStoreContext) context, store); + + final KafkaMetric openIteratorsMetric = metric("num-open-iterators"); + assertThat(openIteratorsMetric, not(nullValue())); + + assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); + + try (final KeyValueIterator, String> iterator = store.all()) { + assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1)); + } + + assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); + } + private KafkaMetric metric(final String name) { return metrics.metric(new MetricName(name, STORE_LEVEL_GROUP, "", tags)); }