mirror of https://github.com/apache/kafka.git
KAFKA-17954: Error getting oldest-iterator-open-since-ms from JMX (#17713)
The thread that evaluates the gauge for the oldest-iterator-open-since-ms runs concurrently with threads that open/close iterators (stream threads and interactive query threads). This PR fixed a race condition between `openIterators.isEmpty()` and `openIterators.first()`, by catching a potential exception. Because we except the race condition to be rare, we rather catch the exception in favor of introducing a guard via locking. Reviewers: Matthias J. Sax <matthias@confluent.io>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
parent
8f63a77ba1
commit
57299cfbb1
|
@ -52,6 +52,7 @@ import java.util.Comparator;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
|
@ -154,7 +155,13 @@ public class MeteredKeyValueStore<K, V>
|
|||
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
|
||||
(config, now) -> numOpenIterators.sum());
|
||||
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
|
||||
(config, now) -> openIterators.isEmpty() ? null : openIterators.first().startTimestamp()
|
||||
(config, now) -> {
|
||||
try {
|
||||
return openIterators.isEmpty() ? null : openIterators.first().startTimestamp();
|
||||
} catch (final NoSuchElementException ignored) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
|
|||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
|
@ -124,7 +125,13 @@ public class MeteredSessionStore<K, V>
|
|||
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
|
||||
(config, now) -> numOpenIterators.sum());
|
||||
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
|
||||
(config, now) -> openIterators.isEmpty() ? null : openIterators.first().startTimestamp()
|
||||
(config, now) -> {
|
||||
try {
|
||||
return openIterators.isEmpty() ? null : openIterators.first().startTimestamp();
|
||||
} catch (final NoSuchElementException ignored) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
|
|||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
|
@ -142,7 +143,13 @@ public class MeteredWindowStore<K, V>
|
|||
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
|
||||
(config, now) -> numOpenIterators.sum());
|
||||
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
|
||||
(config, now) -> openIterators.isEmpty() ? null : openIterators.first().startTimestamp()
|
||||
(config, now) -> {
|
||||
try {
|
||||
return openIterators.isEmpty() ? null : openIterators.first().startTimestamp();
|
||||
} catch (final NoSuchElementException ignored) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue