KAFKA-15541: Use LongAdder instead of AtomicInteger (#16076)

`LongAdder` performs better than `AtomicInteger` when under contention
from many threads. Since it's possible that many Interactive Query
threads could create a large number of `KeyValueIterator`s, we don't
want contention on a metric to be a performance bottleneck.

The trade-off is memory, as `LongAdder` uses more memory to space out
independent counters across different cache lines. In practice, I don't
expect this to cause too many problems, as we're only constructing 1
per-store.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Nick Telford 2024-05-25 20:22:56 +01:00 committed by GitHub
parent a8d166c00e
commit d9ee9c96dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 46 additions and 46 deletions

View File

@ -52,7 +52,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function; import java.util.function.Function;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
@ -95,7 +95,7 @@ public class MeteredKeyValueStore<K, V>
private StreamsMetricsImpl streamsMetrics; private StreamsMetricsImpl streamsMetrics;
private TaskId taskId; private TaskId taskId;
protected AtomicInteger numOpenIterators = new AtomicInteger(0); protected LongAdder numOpenIterators = new LongAdder();
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
private final Map<Class, QueryHandler> queryHandlers = private final Map<Class, QueryHandler> queryHandlers =
@ -168,7 +168,7 @@ public class MeteredKeyValueStore<K, V>
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics); e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics); iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics, StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.get()); (config, now) -> numOpenIterators.sum());
} }
protected Serde<V> prepareValueSerdeForStore(final Serde<V> valueSerde, final SerdeGetter getter) { protected Serde<V> prepareValueSerdeForStore(final Serde<V> valueSerde, final SerdeGetter getter) {
@ -467,7 +467,7 @@ public class MeteredKeyValueStore<K, V>
this.iter = iter; this.iter = iter;
this.sensor = sensor; this.sensor = sensor;
this.startNs = time.nanoseconds(); this.startNs = time.nanoseconds();
numOpenIterators.incrementAndGet(); numOpenIterators.increment();
} }
@Override @Override
@ -491,7 +491,7 @@ public class MeteredKeyValueStore<K, V>
final long duration = time.nanoseconds() - startNs; final long duration = time.nanoseconds() - startNs;
sensor.record(duration); sensor.record(duration);
iteratorDurationSensor.record(duration); iteratorDurationSensor.record(duration);
numOpenIterators.decrementAndGet(); numOpenIterators.decrement();
} }
} }
@ -515,7 +515,7 @@ public class MeteredKeyValueStore<K, V>
this.sensor = sensor; this.sensor = sensor;
this.valueDeserializer = valueDeserializer; this.valueDeserializer = valueDeserializer;
this.startNs = time.nanoseconds(); this.startNs = time.nanoseconds();
numOpenIterators.incrementAndGet(); numOpenIterators.increment();
} }
@Override @Override
@ -539,7 +539,7 @@ public class MeteredKeyValueStore<K, V>
final long duration = time.nanoseconds() - startNs; final long duration = time.nanoseconds() - startNs;
sensor.record(duration); sensor.record(duration);
iteratorDurationSensor.record(duration); iteratorDurationSensor.record(duration);
numOpenIterators.decrementAndGet(); numOpenIterators.decrement();
} }
} }

View File

@ -16,7 +16,7 @@
*/ */
package org.apache.kafka.streams.state.internals; package org.apache.kafka.streams.state.internals;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function; import java.util.function.Function;
import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor;
@ -28,7 +28,7 @@ public class MeteredMultiVersionedKeyQueryIterator<V> implements VersionedRecord
private final VersionedRecordIterator<byte[]> iterator; private final VersionedRecordIterator<byte[]> iterator;
private final Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue; private final Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue;
private final AtomicInteger numOpenIterators; private final LongAdder numOpenIterators;
private final Sensor sensor; private final Sensor sensor;
private final Time time; private final Time time;
private final long startNs; private final long startNs;
@ -37,14 +37,14 @@ public class MeteredMultiVersionedKeyQueryIterator<V> implements VersionedRecord
final Sensor sensor, final Sensor sensor,
final Time time, final Time time,
final Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue, final Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue,
final AtomicInteger numOpenIterators) { final LongAdder numOpenIterators) {
this.iterator = iterator; this.iterator = iterator;
this.deserializeValue = deserializeValue; this.deserializeValue = deserializeValue;
this.numOpenIterators = numOpenIterators; this.numOpenIterators = numOpenIterators;
this.sensor = sensor; this.sensor = sensor;
this.time = time; this.time = time;
this.startNs = time.nanoseconds(); this.startNs = time.nanoseconds();
numOpenIterators.incrementAndGet(); numOpenIterators.increment();
} }
@ -54,7 +54,7 @@ public class MeteredMultiVersionedKeyQueryIterator<V> implements VersionedRecord
iterator.close(); iterator.close();
} finally { } finally {
sensor.record(time.nanoseconds() - startNs); sensor.record(time.nanoseconds() - startNs);
numOpenIterators.decrementAndGet(); numOpenIterators.decrement();
} }
} }

View File

@ -47,7 +47,7 @@ import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkMap;
@ -72,7 +72,7 @@ public class MeteredSessionStore<K, V>
private InternalProcessorContext<?, ?> context; private InternalProcessorContext<?, ?> context;
private TaskId taskId; private TaskId taskId;
private AtomicInteger numOpenIterators = new AtomicInteger(0); private LongAdder numOpenIterators = new LongAdder();
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
private final Map<Class, QueryHandler> queryHandlers = private final Map<Class, QueryHandler> queryHandlers =
@ -137,7 +137,7 @@ public class MeteredSessionStore<K, V>
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics); e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics); iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics, StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.get()); (config, now) -> numOpenIterators.sum());
} }

View File

@ -325,7 +325,7 @@ public class MeteredTimestampedKeyValueStore<K, V>
this.valueAndTimestampDeserializer = valueAndTimestampDeserializer; this.valueAndTimestampDeserializer = valueAndTimestampDeserializer;
this.startNs = time.nanoseconds(); this.startNs = time.nanoseconds();
this.returnPlainValue = returnPlainValue; this.returnPlainValue = returnPlainValue;
numOpenIterators.incrementAndGet(); numOpenIterators.increment();
} }
@Override @Override
@ -353,7 +353,7 @@ public class MeteredTimestampedKeyValueStore<K, V>
final long duration = time.nanoseconds() - startNs; final long duration = time.nanoseconds() - startNs;
sensor.record(duration); sensor.record(duration);
iteratorDurationSensor.record(duration); iteratorDurationSensor.record(duration);
numOpenIterators.decrementAndGet(); numOpenIterators.decrement();
} }
} }

View File

@ -49,7 +49,7 @@ import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkMap;
@ -76,7 +76,7 @@ public class MeteredWindowStore<K, V>
private InternalProcessorContext<?, ?> context; private InternalProcessorContext<?, ?> context;
private TaskId taskId; private TaskId taskId;
private AtomicInteger numOpenIterators = new AtomicInteger(0); private LongAdder numOpenIterators = new LongAdder();
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
private final Map<Class, QueryHandler> queryHandlers = private final Map<Class, QueryHandler> queryHandlers =
@ -156,7 +156,7 @@ public class MeteredWindowStore<K, V>
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics); e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics); iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics, StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.get()); (config, now) -> numOpenIterators.sum());
} }
@Deprecated @Deprecated

View File

@ -22,7 +22,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.streams.state.WindowStoreIterator;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function; import java.util.function.Function;
class MeteredWindowStoreIterator<V> implements WindowStoreIterator<V> { class MeteredWindowStoreIterator<V> implements WindowStoreIterator<V> {
@ -34,7 +34,7 @@ class MeteredWindowStoreIterator<V> implements WindowStoreIterator<V> {
private final Function<byte[], V> valueFrom; private final Function<byte[], V> valueFrom;
private final long startNs; private final long startNs;
private final Time time; private final Time time;
private final AtomicInteger numOpenIterators; private final LongAdder numOpenIterators;
MeteredWindowStoreIterator(final WindowStoreIterator<byte[]> iter, MeteredWindowStoreIterator(final WindowStoreIterator<byte[]> iter,
final Sensor operationSensor, final Sensor operationSensor,
@ -42,7 +42,7 @@ class MeteredWindowStoreIterator<V> implements WindowStoreIterator<V> {
final StreamsMetrics metrics, final StreamsMetrics metrics,
final Function<byte[], V> valueFrom, final Function<byte[], V> valueFrom,
final Time time, final Time time,
final AtomicInteger numOpenIterators) { final LongAdder numOpenIterators) {
this.iter = iter; this.iter = iter;
this.operationSensor = operationSensor; this.operationSensor = operationSensor;
this.iteratorSensor = iteratorSensor; this.iteratorSensor = iteratorSensor;
@ -51,7 +51,7 @@ class MeteredWindowStoreIterator<V> implements WindowStoreIterator<V> {
this.startNs = time.nanoseconds(); this.startNs = time.nanoseconds();
this.time = time; this.time = time;
this.numOpenIterators = numOpenIterators; this.numOpenIterators = numOpenIterators;
numOpenIterators.incrementAndGet(); numOpenIterators.increment();
} }
@Override @Override
@ -73,7 +73,7 @@ class MeteredWindowStoreIterator<V> implements WindowStoreIterator<V> {
final long duration = time.nanoseconds() - startNs; final long duration = time.nanoseconds() - startNs;
operationSensor.record(duration); operationSensor.record(duration);
iteratorSensor.record(duration); iteratorSensor.record(duration);
numOpenIterators.decrementAndGet(); numOpenIterators.decrement();
} }
} }

View File

@ -24,7 +24,7 @@ import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueIterator;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function; import java.util.function.Function;
class MeteredWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed<K>, V> { class MeteredWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed<K>, V> {
@ -37,7 +37,7 @@ class MeteredWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed
private final Function<byte[], V> deserializeValue; private final Function<byte[], V> deserializeValue;
private final long startNs; private final long startNs;
private final Time time; private final Time time;
private final AtomicInteger numOpenIterators; private final LongAdder numOpenIterators;
MeteredWindowedKeyValueIterator(final KeyValueIterator<Windowed<Bytes>, byte[]> iter, MeteredWindowedKeyValueIterator(final KeyValueIterator<Windowed<Bytes>, byte[]> iter,
final Sensor operationSensor, final Sensor operationSensor,
@ -46,7 +46,7 @@ class MeteredWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed
final Function<byte[], K> deserializeKey, final Function<byte[], K> deserializeKey,
final Function<byte[], V> deserializeValue, final Function<byte[], V> deserializeValue,
final Time time, final Time time,
final AtomicInteger numOpenIterators) { final LongAdder numOpenIterators) {
this.iter = iter; this.iter = iter;
this.operationSensor = operationSensor; this.operationSensor = operationSensor;
this.iteratorSensor = iteratorSensor; this.iteratorSensor = iteratorSensor;
@ -56,7 +56,7 @@ class MeteredWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed
this.startNs = time.nanoseconds(); this.startNs = time.nanoseconds();
this.time = time; this.time = time;
this.numOpenIterators = numOpenIterators; this.numOpenIterators = numOpenIterators;
numOpenIterators.incrementAndGet(); numOpenIterators.increment();
} }
@Override @Override
@ -83,7 +83,7 @@ class MeteredWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed
final long duration = time.nanoseconds() - startNs; final long duration = time.nanoseconds() - startNs;
operationSensor.record(duration); operationSensor.record(duration);
iteratorSensor.record(duration); iteratorSensor.record(duration);
numOpenIterators.decrementAndGet(); numOpenIterators.decrement();
} }
} }

View File

@ -438,7 +438,7 @@ public class StateStoreMetrics {
final String storeType, final String storeType,
final String storeName, final String storeName,
final StreamsMetricsImpl streamsMetrics, final StreamsMetricsImpl streamsMetrics,
final Gauge<Integer> numOpenIteratorsGauge) { final Gauge<Long> numOpenIteratorsGauge) {
streamsMetrics.addStoreLevelMutableMetric( streamsMetrics.addStoreLevelMutableMetric(
taskId, taskId,
storeType, storeType,

View File

@ -451,13 +451,13 @@ public class MeteredKeyValueStoreTest {
final KafkaMetric openIteratorsMetric = metric("num-open-iterators"); final KafkaMetric openIteratorsMetric = metric("num-open-iterators");
assertThat(openIteratorsMetric, not(nullValue())); assertThat(openIteratorsMetric, not(nullValue()));
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
try (final KeyValueIterator<String, String> iterator = metered.prefixScan(KEY, stringSerializer)) { try (final KeyValueIterator<String, String> iterator = metered.prefixScan(KEY, stringSerializer)) {
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1)); assertThat((Long) openIteratorsMetric.metricValue(), equalTo(1L));
} }
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
} }
@Test @Test

View File

@ -615,13 +615,13 @@ public class MeteredSessionStoreTest {
final KafkaMetric openIteratorsMetric = metric("num-open-iterators"); final KafkaMetric openIteratorsMetric = metric("num-open-iterators");
assertThat(openIteratorsMetric, not(nullValue())); assertThat(openIteratorsMetric, not(nullValue()));
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
try (final KeyValueIterator<Windowed<String>, String> iterator = store.backwardFetch(KEY)) { try (final KeyValueIterator<Windowed<String>, String> iterator = store.backwardFetch(KEY)) {
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1)); assertThat((Long) openIteratorsMetric.metricValue(), equalTo(1L));
} }
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
} }
@Test @Test

View File

@ -449,13 +449,13 @@ public class MeteredTimestampedKeyValueStoreTest {
final KafkaMetric openIteratorsMetric = metric("num-open-iterators"); final KafkaMetric openIteratorsMetric = metric("num-open-iterators");
assertThat(openIteratorsMetric, not(nullValue())); assertThat(openIteratorsMetric, not(nullValue()));
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
try (final KeyValueIterator<String, ValueAndTimestamp<String>> iterator = metered.all()) { try (final KeyValueIterator<String, ValueAndTimestamp<String>> iterator = metered.all()) {
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1)); assertThat((Long) openIteratorsMetric.metricValue(), equalTo(1L));
} }
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
} }
@Test @Test

View File

@ -380,15 +380,15 @@ public class MeteredVersionedKeyValueStoreTest {
final KafkaMetric openIteratorsMetric = getMetric("num-open-iterators"); final KafkaMetric openIteratorsMetric = getMetric("num-open-iterators");
assertThat(openIteratorsMetric, not(nullValue())); assertThat(openIteratorsMetric, not(nullValue()));
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
final QueryResult<VersionedRecordIterator<String>> result = store.query(query, bound, config); final QueryResult<VersionedRecordIterator<String>> result = store.query(query, bound, config);
try (final VersionedRecordIterator<String> iterator = result.getResult()) { try (final VersionedRecordIterator<String> iterator = result.getResult()) {
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1)); assertThat((Long) openIteratorsMetric.metricValue(), equalTo(1L));
} }
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
} }
@Test @Test

View File

@ -456,13 +456,13 @@ public class MeteredWindowStoreTest {
final KafkaMetric openIteratorsMetric = metric("num-open-iterators"); final KafkaMetric openIteratorsMetric = metric("num-open-iterators");
assertThat(openIteratorsMetric, not(nullValue())); assertThat(openIteratorsMetric, not(nullValue()));
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
try (final KeyValueIterator<Windowed<String>, String> iterator = store.all()) { try (final KeyValueIterator<Windowed<String>, String> iterator = store.all()) {
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1)); assertThat((Long) openIteratorsMetric.metricValue(), equalTo(1L));
} }
assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
} }
@Test @Test