diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java new file mode 100644 index 00000000000..5e2307d0ac6 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.internals.metrics; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.internals.MeteredIterator; +import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics; + +import java.util.Comparator; +import java.util.NavigableSet; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.atomic.LongAdder; + +public class OpenIterators { + private final TaskId taskId; + private final String metricsScope; + private final String name; + private final StreamsMetricsImpl streamsMetrics; + + private final LongAdder numOpenIterators = new LongAdder(); + private final NavigableSet openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp)); + + private MetricName metricName; + + public OpenIterators(final TaskId taskId, + final String metricsScope, + final String name, + final StreamsMetricsImpl streamsMetrics) { + this.taskId = taskId; + this.metricsScope = metricsScope; + this.name = name; + this.streamsMetrics = streamsMetrics; + } + + public void add(final MeteredIterator iterator) { + openIterators.add(iterator); + numOpenIterators.increment(); + + if (numOpenIterators.intValue() == 1) { + metricName = StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name, streamsMetrics, + (config, now) -> openIterators.first().startTimestamp() + ); + } + } + + public void remove(final MeteredIterator iterator) { + if (numOpenIterators.intValue() == 1) { + streamsMetrics.removeMetric(metricName); + } + numOpenIterators.decrement(); + openIterators.remove(iterator); + } + + public long sum() { + return numOpenIterators.sum(); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index 0dd3f77f199..7c22c94e9af 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -335,6 +335,10 @@ public class StreamsMetricsImpl implements StreamsMetrics { } } + public void removeMetric(final MetricName metricName) { + metrics.removeMetric(metricName); + } + public Map taskLevelTagMap(final String threadId, final String taskId) { final Map tagMap = new LinkedHashMap<>(); tagMap.put(THREAD_ID_TAG, threadId); @@ -517,13 +521,13 @@ public class StreamsMetricsImpl implements StreamsMetrics { return getSensors(storeLevelSensors, sensorSuffix, sensorPrefix, recordingLevel, parents); } - public void addStoreLevelMutableMetric(final String taskId, - final String metricsScope, - final String storeName, - final String name, - final String description, - final RecordingLevel recordingLevel, - final Gauge valueProvider) { + public MetricName addStoreLevelMutableMetric(final String taskId, + final String metricsScope, + final String storeName, + final String name, + final String description, + final RecordingLevel recordingLevel, + final Gauge valueProvider) { final MetricName metricName = metrics.metricName( name, STATE_STORE_LEVEL_GROUP, @@ -535,6 +539,8 @@ public class StreamsMetricsImpl implements StreamsMetrics { final String key = storeSensorPrefix(Thread.currentThread().getName(), taskId, storeName); storeLevelMetrics.computeIfAbsent(key, ignored -> new LinkedList<>()).push(metricName); } + + return metricName; } public final void removeAllStoreLevelSensorsAndMetrics(final String taskId, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 7622f904c17..0962033b7ef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.internals.metrics.OpenIterators; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; import org.apache.kafka.streams.processor.StateStore; @@ -48,14 +49,9 @@ import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler; import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics; import java.util.ArrayList; -import java.util.Comparator; -import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.NavigableSet; import java.util.Objects; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; import static org.apache.kafka.common.utils.Utils.mkEntry; @@ -96,8 +92,7 @@ public class MeteredKeyValueStore private StreamsMetricsImpl streamsMetrics; private TaskId taskId; - protected LongAdder numOpenIterators = new LongAdder(); - protected NavigableSet openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp)); + protected OpenIterators openIterators; @SuppressWarnings("rawtypes") private final Map queryHandlers = @@ -153,13 +148,8 @@ public class MeteredKeyValueStore e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics); iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics); StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics, - (config, now) -> numOpenIterators.sum()); - StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics, - (config, now) -> { - final Iterator openIteratorsIterator = openIterators.iterator(); - return openIteratorsIterator.hasNext() ? openIteratorsIterator.next().startTimestamp() : null; - } - ); + (config, now) -> openIterators.sum()); + openIterators = new OpenIterators(taskId, metricsScope, name(), streamsMetrics); } protected Serde prepareValueSerdeForStore(final Serde valueSerde, final SerdeGetter getter) { @@ -445,7 +435,6 @@ public class MeteredKeyValueStore this.sensor = sensor; this.startTimestamp = time.milliseconds(); this.startNs = time.nanoseconds(); - numOpenIterators.increment(); openIterators.add(this); } @@ -475,7 +464,6 @@ public class MeteredKeyValueStore final long duration = time.nanoseconds() - startNs; sensor.record(duration); iteratorDurationSensor.record(duration); - numOpenIterators.decrement(); openIterators.remove(this); } } @@ -502,7 +490,6 @@ public class MeteredKeyValueStore this.valueDeserializer = valueDeserializer; this.startTimestamp = time.milliseconds(); this.startNs = time.nanoseconds(); - numOpenIterators.increment(); openIterators.add(this); } @@ -532,7 +519,6 @@ public class MeteredKeyValueStore final long duration = time.nanoseconds() - startNs; sensor.record(duration); iteratorDurationSensor.record(duration); - numOpenIterators.decrement(); openIterators.remove(this); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java index b1bb0b63158..b27e6a78d86 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java @@ -18,39 +18,34 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.internals.metrics.OpenIterators; import org.apache.kafka.streams.state.VersionedRecord; import org.apache.kafka.streams.state.VersionedRecordIterator; -import java.util.Set; -import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; class MeteredMultiVersionedKeyQueryIterator implements VersionedRecordIterator, MeteredIterator { private final VersionedRecordIterator iterator; private final Function, VersionedRecord> deserializeValue; - private final LongAdder numOpenIterators; private final Sensor sensor; private final Time time; private final long startNs; private final long startTimestampMs; - private final Set openIterators; + private final OpenIterators openIterators; public MeteredMultiVersionedKeyQueryIterator(final VersionedRecordIterator iterator, final Sensor sensor, final Time time, final Function, VersionedRecord> deserializeValue, - final LongAdder numOpenIterators, - final Set openIterators) { + final OpenIterators openIterators) { this.iterator = iterator; this.deserializeValue = deserializeValue; - this.numOpenIterators = numOpenIterators; this.openIterators = openIterators; this.sensor = sensor; this.time = time; this.startNs = time.nanoseconds(); this.startTimestampMs = time.milliseconds(); - numOpenIterators.increment(); openIterators.add(this); } @@ -65,7 +60,6 @@ class MeteredMultiVersionedKeyQueryIterator implements VersionedRecordIterato iterator.close(); } finally { sensor.record(time.nanoseconds() - startNs); - numOpenIterators.decrement(); openIterators.remove(this); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java index 6f90ef56d86..0cfb8936a5e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java @@ -326,7 +326,6 @@ public class MeteredTimestampedKeyValueStore this.startNs = time.nanoseconds(); this.startTimestampMs = time.milliseconds(); this.returnPlainValue = returnPlainValue; - numOpenIterators.increment(); openIterators.add(this); } @@ -360,7 +359,6 @@ public class MeteredTimestampedKeyValueStore final long duration = time.nanoseconds() - startNs; sensor.record(duration); iteratorDurationSensor.record(duration); - numOpenIterators.decrement(); openIterators.remove(this); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java index 66eb3206def..6afb4d1531d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java @@ -269,7 +269,6 @@ public class MeteredVersionedKeyValueStore iteratorDurationSensor, time, StoreQueryUtils.deserializeValue(plainValueSerdes), - numOpenIterators, openIterators ); final QueryResult> typedQueryResult = diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java index 2422fa9d5e3..bb60c304684 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals.metrics; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Gauge; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor.RecordingLevel; @@ -455,12 +456,12 @@ public class StateStoreMetrics { } - public static void addOldestOpenIteratorGauge(final String taskId, - final String storeType, - final String storeName, - final StreamsMetricsImpl streamsMetrics, - final Gauge oldestOpenIteratorGauge) { - streamsMetrics.addStoreLevelMutableMetric( + public static MetricName addOldestOpenIteratorGauge(final String taskId, + final String storeType, + final String storeName, + final StreamsMetricsImpl streamsMetrics, + final Gauge oldestOpenIteratorGauge) { + return streamsMetrics.addStoreLevelMutableMetric( taskId, storeType, storeName, diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java index 4a8c891355d..294af3944f2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java @@ -525,15 +525,16 @@ public class MeteredKeyValueStoreTest { when(inner.all()).thenReturn(KeyValueIterators.emptyIterator()); init(); - final KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms"); - assertThat(oldestIteratorTimestampMetric, not(nullValue())); - - assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue()); + KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms"); + assertThat(oldestIteratorTimestampMetric, nullValue()); KeyValueIterator second = null; final long secondTimestamp; try { try (final KeyValueIterator unused = metered.all()) { + oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms"); + assertThat(oldestIteratorTimestampMetric, not(nullValue())); + final long oldestTimestamp = mockTime.milliseconds(); assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp)); mockTime.sleep(100); @@ -553,7 +554,8 @@ public class MeteredKeyValueStoreTest { } } - assertThat((Integer) oldestIteratorTimestampMetric.metricValue(), nullValue()); + oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms"); + assertThat(oldestIteratorTimestampMetric, nullValue()); } private KafkaMetric metric(final MetricName metricName) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java index fa42cb07283..2e3c470387c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java @@ -503,15 +503,16 @@ public class MeteredTimestampedKeyValueStoreTest { when(inner.all()).thenReturn(KeyValueIterators.emptyIterator()); init(); - final KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms"); - assertThat(oldestIteratorTimestampMetric, not(nullValue())); - - assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue()); + KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms"); + assertThat(oldestIteratorTimestampMetric, nullValue()); KeyValueIterator> second = null; final long secondTimestamp; try { try (final KeyValueIterator> unused = metered.all()) { + oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms"); + assertThat(oldestIteratorTimestampMetric, not(nullValue())); + final long oldestTimestamp = mockTime.milliseconds(); assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp)); mockTime.sleep(100); @@ -531,6 +532,7 @@ public class MeteredTimestampedKeyValueStoreTest { } } - assertThat((Integer) oldestIteratorTimestampMetric.metricValue(), nullValue()); + oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms"); + assertThat(oldestIteratorTimestampMetric, nullValue()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java index f0a7f23c09c..8e8e02b2722 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java @@ -426,16 +426,17 @@ public class MeteredVersionedKeyValueStoreTest { when(inner.query(any(), any(), any())).thenReturn( QueryResult.forResult(new LogicalSegmentIterator(Collections.emptyListIterator(), RAW_KEY, 0L, 0L, ResultOrder.ANY))); - final KafkaMetric oldestIteratorTimestampMetric = getMetric("oldest-iterator-open-since-ms"); - assertThat(oldestIteratorTimestampMetric, not(nullValue())); - - assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue()); + KafkaMetric oldestIteratorTimestampMetric = getMetric("oldest-iterator-open-since-ms"); + assertThat(oldestIteratorTimestampMetric, nullValue()); final QueryResult> first = store.query(query, bound, config); VersionedRecordIterator secondIterator = null; final long secondTime; try { try (final VersionedRecordIterator unused = first.getResult()) { + oldestIteratorTimestampMetric = getMetric("oldest-iterator-open-since-ms"); + assertThat(oldestIteratorTimestampMetric, not(nullValue())); + final long oldestTimestamp = mockTime.milliseconds(); assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp)); mockTime.sleep(100); @@ -457,7 +458,8 @@ public class MeteredVersionedKeyValueStoreTest { } } - assertThat((Integer) oldestIteratorTimestampMetric.metricValue(), nullValue()); + oldestIteratorTimestampMetric = getMetric("oldest-iterator-open-since-ms"); + assertThat(oldestIteratorTimestampMetric, nullValue()); } private KafkaMetric getMetric(final String name) {