From 57299cfbb1f33b9cf603473c96b2c77e8a5b6bf0 Mon Sep 17 00:00:00 2001 From: Nick Telford Date: Tue, 19 Nov 2024 01:45:49 +0000 Subject: [PATCH] KAFKA-17954: Error getting oldest-iterator-open-since-ms from JMX (#17713) The thread that evaluates the gauge for the oldest-iterator-open-since-ms runs concurrently with threads that open/close iterators (stream threads and interactive query threads). This PR fixed a race condition between `openIterators.isEmpty()` and `openIterators.first()`, by catching a potential exception. Because we except the race condition to be rare, we rather catch the exception in favor of introducing a guard via locking. Reviewers: Matthias J. Sax , Anna Sophie Blee-Goldman --- .../streams/state/internals/MeteredKeyValueStore.java | 9 ++++++++- .../streams/state/internals/MeteredSessionStore.java | 9 ++++++++- .../streams/state/internals/MeteredWindowStore.java | 9 ++++++++- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index e8e613acdbf..0825de0af17 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -52,6 +52,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.NavigableSet; +import java.util.NoSuchElementException; import java.util.Objects; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.LongAdder; @@ -154,7 +155,13 @@ public class MeteredKeyValueStore StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics, (config, now) -> numOpenIterators.sum()); StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics, - (config, now) -> openIterators.isEmpty() ? null : openIterators.first().startTimestamp() + (config, now) -> { + try { + return openIterators.isEmpty() ? null : openIterators.first().startTimestamp(); + } catch (final NoSuchElementException ignored) { + return null; + } + } ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index a6546c1edb5..266e8a7e4bb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -46,6 +46,7 @@ import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics; import java.util.Comparator; import java.util.Map; import java.util.NavigableSet; +import java.util.NoSuchElementException; import java.util.Objects; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.LongAdder; @@ -124,7 +125,13 @@ public class MeteredSessionStore StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics, (config, now) -> numOpenIterators.sum()); StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics, - (config, now) -> openIterators.isEmpty() ? null : openIterators.first().startTimestamp() + (config, now) -> { + try { + return openIterators.isEmpty() ? null : openIterators.first().startTimestamp(); + } catch (final NoSuchElementException ignored) { + return null; + } + } ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index e59665fb2eb..2ed95ac9b3b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -49,6 +49,7 @@ import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics; import java.util.Comparator; import java.util.Map; import java.util.NavigableSet; +import java.util.NoSuchElementException; import java.util.Objects; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.LongAdder; @@ -142,7 +143,13 @@ public class MeteredWindowStore StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics, (config, now) -> numOpenIterators.sum()); StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics, - (config, now) -> openIterators.isEmpty() ? null : openIterators.first().startTimestamp() + (config, now) -> { + try { + return openIterators.isEmpty() ? null : openIterators.first().startTimestamp(); + } catch (final NoSuchElementException ignored) { + return null; + } + } ); }