KAFKA-19744: Move restore time calculation to ChangelogMetadata (#20613)
CI / build (push) Waiting to run Details

- Move restore time calculation to ChangelogMetadata.
- Introduced a new interface to propagate the calculated value to the
stores to avoid modifications in the public interface.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Nikita Shupletsov 2025-10-02 21:24:36 -07:00 committed by GitHub
parent 8468317dac
commit 28e7803037
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 196 additions and 32 deletions

View File

@ -112,6 +112,9 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.wa
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@ -685,6 +688,52 @@ public class RestoreIntegrationTest {
}
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void shouldRecordRestoreMetrics(final boolean useNewProtocol) throws Exception {
final AtomicInteger numReceived = new AtomicInteger(0);
final StreamsBuilder builder = new StreamsBuilder();
final Properties props = props();
if (useNewProtocol) {
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name());
}
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
createStateForRestoration(inputStream, 10000);
final CountDownLatch shutdownLatch = new CountDownLatch(1);
builder.table(inputStream, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as("store"))
.toStream()
.foreach((key, value) -> {
if (numReceived.incrementAndGet() == numberOfKeys) {
shutdownLatch.countDown();
}
});
kafkaStreams = new KafkaStreams(builder.build(), props);
final AtomicLong restored = new AtomicLong(0);
final TrackingStateRestoreListener restoreListener = new TrackingStateRestoreListener(restored);
kafkaStreams.setGlobalStateRestoreListener(restoreListener);
kafkaStreams.start();
assertTrue(shutdownLatch.await(30, TimeUnit.SECONDS));
assertThat(numReceived.get(), equalTo(numberOfKeys));
final Map<String, Long> taskIdToMetricValue = kafkaStreams.metrics().entrySet().stream()
.filter(e -> e.getKey().name().equals("restore-latency-max"))
.collect(Collectors.toMap(e -> e.getKey().tags().get("task-id"), e -> ((Double) e.getValue().metricValue()).longValue()));
for (final Map.Entry<TopicPartition, Long> entry : restoreListener.changelogToRestoreTime().entrySet()) {
final long lowerBound = entry.getValue() - TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS);
final long upperBound = entry.getValue() + TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS);
assertThat(taskIdToMetricValue.get("0_" + entry.getKey().partition()), allOf(greaterThanOrEqualTo(lowerBound), lessThanOrEqualTo(upperBound)));
}
}
private void validateReceivedMessages(final List<KeyValue<Integer, Integer>> expectedRecords,
final String outputTopic) throws Exception {
final Properties consumerProperties = new Properties();
@ -971,4 +1020,4 @@ public class RestoreIntegrationTest {
}
}
}
}
}

View File

@ -1337,6 +1337,8 @@ public class IntegrationTestUtils {
public final Map<TopicPartition, AtomicLong> changelogToStartOffset = new ConcurrentHashMap<>();
public final Map<TopicPartition, AtomicLong> changelogToEndOffset = new ConcurrentHashMap<>();
public final Map<TopicPartition, AtomicLong> changelogToTotalNumRestored = new ConcurrentHashMap<>();
private final Map<TopicPartition, AtomicLong> changelogToRestoreStartTime = new ConcurrentHashMap<>();
private final Map<TopicPartition, AtomicLong> changelogToRestoreEndTime = new ConcurrentHashMap<>();
private final AtomicLong restored;
public TrackingStateRestoreListener() {
@ -1355,6 +1357,7 @@ public class IntegrationTestUtils {
changelogToStartOffset.put(topicPartition, new AtomicLong(startingOffset));
changelogToEndOffset.put(topicPartition, new AtomicLong(endingOffset));
changelogToTotalNumRestored.put(topicPartition, new AtomicLong(0L));
changelogToRestoreStartTime.put(topicPartition, new AtomicLong(System.nanoTime()));
}
@Override
@ -1372,6 +1375,7 @@ public class IntegrationTestUtils {
if (restored != null) {
restored.addAndGet(totalRestored);
}
changelogToRestoreEndTime.put(topicPartition, new AtomicLong(System.nanoTime()));
}
public long totalNumRestored() {
@ -1381,6 +1385,11 @@ public class IntegrationTestUtils {
}
return totalNumRestored;
}
public Map<TopicPartition, Long> changelogToRestoreTime() {
return changelogToRestoreStartTime.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> changelogToRestoreEndTime.get(e.getKey()).get() - e.getValue().get()));
}
}
public static class TrackingStandbyUpdateListener implements StandbyUpdateListener {

View File

@ -41,6 +41,7 @@ import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.state.internals.MeteredStateStore;
import org.slf4j.Logger;
@ -138,6 +139,8 @@ public class StoreChangelogReader implements ChangelogReader {
// either due to limit offset (standby) or committed end offset (active)
private int bufferedLimitIndex;
private long restoreStartTimeNs;
private ChangelogMetadata(final StateStoreMetadata storeMetadata, final ProcessorStateManager stateManager) {
this.changelogState = ChangelogState.REGISTERED;
this.storeMetadata = storeMetadata;
@ -188,6 +191,10 @@ public class StoreChangelogReader implements ChangelogReader {
int bufferedLimitIndex() {
return bufferedLimitIndex;
}
long calculateRestoreTime(final long restoreEndTimeNs) {
return restoreEndTimeNs - restoreStartTimeNs;
}
}
private static final long DEFAULT_OFFSET_UPDATE_MS = Duration.ofMinutes(5L).toMillis();
@ -695,6 +702,9 @@ public class StoreChangelogReader implements ChangelogReader {
changelogMetadata.transitTo(ChangelogState.COMPLETED);
pauseChangelogsFromRestoreConsumer(Collections.singleton(partition));
if (storeMetadata.store() instanceof MeteredStateStore) {
((MeteredStateStore) storeMetadata.store()).recordRestoreTime(changelogMetadata.calculateRestoreTime(time.nanoseconds()));
}
try {
stateRestoreListener.onRestoreEnd(partition, storeName, changelogMetadata.totalRestored);
@ -1026,6 +1036,7 @@ public class StoreChangelogReader implements ChangelogReader {
// no records to restore; in this case we just initialize the sensor to zero
final long recordsToRestore = Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L);
task.recordRestoration(time, recordsToRestore, true);
changelogMetadata.restoreStartTimeNs = time.nanoseconds();
} else if (changelogMetadata.stateManager.taskType() == TaskType.STANDBY) {
try {
standbyUpdateListener.onUpdateStart(partition, storeName, startOffset);

View File

@ -69,7 +69,7 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
*/
public class MeteredKeyValueStore<K, V>
extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, K, V>
implements KeyValueStore<K, V> {
implements KeyValueStore<K, V>, MeteredStateStore {
final Serde<K> keySerde;
final Serde<V> valueSerde;
@ -91,6 +91,7 @@ public class MeteredKeyValueStore<K, V>
protected InternalProcessorContext<?, ?> internalContext;
private StreamsMetricsImpl streamsMetrics;
private TaskId taskId;
private Sensor restoreSensor;
protected OpenIterators openIterators;
@ -128,11 +129,10 @@ public class MeteredKeyValueStore<K, V>
streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics();
registerMetrics();
final Sensor restoreSensor =
StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
// register and possibly restore the state from the logs
maybeMeasureLatency(() -> super.init(stateStoreContext, root), time, restoreSensor);
restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
super.init(stateStoreContext, root);
}
private void registerMetrics() {
@ -152,6 +152,11 @@ public class MeteredKeyValueStore<K, V>
openIterators = new OpenIterators(taskId, metricsScope, name(), streamsMetrics);
}
@Override
public void recordRestoreTime(final long restoreTimeNs) {
restoreSensor.record(restoreTimeNs);
}
protected Serde<V> prepareValueSerdeForStore(final Serde<V> valueSerde, final SerdeGetter getter) {
return WrappingNullableUtils.prepareValueSerde(valueSerde, getter);
}

View File

@ -57,7 +57,7 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
public class MeteredSessionStore<K, V>
extends WrappedStateStore<SessionStore<Bytes, byte[]>, Windowed<K>, V>
implements SessionStore<K, V> {
implements SessionStore<K, V>, MeteredStateStore {
private final String metricsScope;
private final Serde<K> keySerde;
@ -73,6 +73,7 @@ public class MeteredSessionStore<K, V>
private Sensor iteratorDurationSensor;
private InternalProcessorContext<?, ?> internalContext;
private TaskId taskId;
private Sensor restoreSensor;
private final LongAdder numOpenIterators = new LongAdder();
private final NavigableSet<MeteredIterator> openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
@ -108,11 +109,9 @@ public class MeteredSessionStore<K, V>
streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics();
registerMetrics();
final Sensor restoreSensor =
StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
// register and possibly restore the state from the logs
maybeMeasureLatency(() -> super.init(stateStoreContext, root), time, restoreSensor);
super.init(stateStoreContext, root);
}
private void registerMetrics() {
@ -132,6 +131,11 @@ public class MeteredSessionStore<K, V>
);
}
@Override
public void recordRestoreTime(final long restoreTimeNs) {
restoreSensor.record(restoreTimeNs);
}
private void initStoreSerde(final StateStoreContext context) {
final String storeName = name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
public interface MeteredStateStore {
void recordRestoreTime(final long restoreTimeNs);
}

View File

@ -60,7 +60,7 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
public class MeteredWindowStore<K, V>
extends WrappedStateStore<WindowStore<Bytes, byte[]>, Windowed<K>, V>
implements WindowStore<K, V> {
implements WindowStore<K, V>, MeteredStateStore {
private final long windowSizeMs;
private final String metricsScope;
@ -76,6 +76,7 @@ public class MeteredWindowStore<K, V>
private Sensor iteratorDurationSensor;
private InternalProcessorContext<?, ?> internalContext;
private TaskId taskId;
private Sensor restoreSensor;
private final LongAdder numOpenIterators = new LongAdder();
private final NavigableSet<MeteredIterator> openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
@ -124,8 +125,8 @@ public class MeteredWindowStore<K, V>
streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics();
registerMetrics();
final Sensor restoreSensor =
StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
// register and possibly restore the state from the logs
maybeMeasureLatency(() -> super.init(stateStoreContext, root), time, restoreSensor);
@ -150,6 +151,11 @@ public class MeteredWindowStore<K, V>
);
}
@Override
public void recordRestoreTime(final long restoreTimeNs) {
restoreSensor.record(restoreTimeNs);
}
private void initStoreSerde(final StateStoreContext context) {
final String storeName = name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);

View File

@ -41,6 +41,7 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
import org.apache.kafka.streams.state.internals.MeteredKeyValueStore;
import org.apache.kafka.test.MockStandbyUpdateListener;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.StreamsTestUtils;
@ -89,7 +90,9 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@ -1364,6 +1367,58 @@ public class StoreChangelogReaderTest {
}
}
@Test
public void shouldCallRecordRestoreTimeAtTheEndOfRestore() {
setupActiveStateManager();
final MeteredKeyValueStore<?, ?> meteredStateStore = mock(MeteredKeyValueStore.class);
when(storeMetadata.changelogPartition()).thenReturn(tp);
when(storeMetadata.store()).thenReturn(meteredStateStore);
when(meteredStateStore.name()).thenReturn(storeName);
final TaskId taskId = new TaskId(0, 0);
when(storeMetadata.offset()).thenReturn(0L);
when(activeStateManager.taskId()).thenReturn(taskId);
setupConsumer(2, tp);
consumer.updateEndOffsets(Collections.singletonMap(tp, 2L));
adminClient.updateEndOffsets(Collections.singletonMap(tp, 2L));
changelogReader.register(tp, activeStateManager);
changelogReader.restore(Collections.singletonMap(taskId, mock(Task.class)));
assertEquals(1L, changelogReader.changelogMetadata(tp).totalRestored());
verify(meteredStateStore).recordRestoreTime(anyLong());
}
@Test
public void shouldNotCallRecordRestoreTimeIfRestoreDoesNotComplete() {
setupActiveStateManager();
final MeteredKeyValueStore<?, ?> meteredStateStore = mock(MeteredKeyValueStore.class);
when(storeMetadata.changelogPartition()).thenReturn(tp);
when(storeMetadata.store()).thenReturn(meteredStateStore);
when(meteredStateStore.name()).thenReturn(storeName);
final TaskId taskId = new TaskId(0, 0);
when(storeMetadata.offset()).thenReturn(0L);
when(activeStateManager.taskId()).thenReturn(taskId);
setupConsumer(2, tp);
consumer.updateEndOffsets(Collections.singletonMap(tp, 3L));
adminClient.updateEndOffsets(Collections.singletonMap(tp, 3L));
changelogReader.register(tp, activeStateManager);
changelogReader.restore(Collections.singletonMap(taskId, mock(Task.class)));
assertEquals(1L, changelogReader.changelogMetadata(tp).totalRestored());
verify(meteredStateStore, never()).recordRestoreTime(anyLong());
}
private void setupConsumer(final long messages, final TopicPartition topicPartition) {
assignPartition(messages, topicPartition);
addRecords(messages, topicPartition);

View File

@ -58,7 +58,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@ -210,16 +209,19 @@ public class MeteredKeyValueStoreTest {
}
@Test
public void shouldRecordRestoreLatencyOnInit() {
public void shouldRecordRestoreLatencyOnRecordRestoreTime() {
setUp();
doNothing().when(inner).init(context, metered);
init();
final long restoreTimeNs = 1000L;
metered.recordRestoreTime(restoreTimeNs);
// it suffices to verify one restore metric since all restore metrics are recorded by the same sensor
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("restore-rate");
assertThat((Double) metric.metricValue(), greaterThan(0.0));
final KafkaMetric metric = metric("restore-latency-max");
assertThat((Double) metric.metricValue(), equalTo((double) restoreTimeNs));
}
@Test

View File

@ -458,14 +458,17 @@ public class MeteredSessionStoreTest {
}
@Test
public void shouldRecordRestoreTimeOnInit() {
public void shouldRecordRestoreLatencyOnRecordRestoreTime() {
setUp();
init();
final long restoreTimeNs = 1000L;
store.recordRestoreTime(restoreTimeNs);
// it suffices to verify one restore metric since all restore metrics are recorded by the same sensor
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("restore-rate");
assertTrue((Double) metric.metricValue() > 0);
final KafkaMetric metric = metric("restore-latency-max");
assertThat((Double) metric.metricValue(), equalTo((double) restoreTimeNs));
}
@Test

View File

@ -183,13 +183,6 @@ public class MeteredVersionedKeyValueStoreTest {
verify(valueSerializer).serialize(changelogTopicName, VALUE);
}
@Test
public void shouldRecordMetricsOnInit() {
// init is called in setUp(). it suffices to verify one restore metric since all restore
// metrics are recorded by the same sensor, and the sensor is tested elsewhere.
assertThat((Double) getMetric("restore-rate").metricValue(), greaterThan(0.0));
}
@Test
public void shouldDelegateAndRecordMetricsOnPut() {
when(inner.put(RAW_KEY, RAW_VALUE, TIMESTAMP)).thenReturn(PUT_RETURN_CODE_VALID_TO_UNDEFINED);
@ -473,4 +466,4 @@ public class MeteredVersionedKeyValueStoreTest {
.filter(name -> name.group().equals(STORE_LEVEL_GROUP) && name.tags().equals(tags))
.collect(Collectors.toList());
}
}
}

View File

@ -210,14 +210,19 @@ public class MeteredWindowStoreTest {
}
@Test
public void shouldRecordRestoreLatencyOnInit() {
public void shouldRecordRestoreLatencyOnRecordRestoreTime() {
setUp();
doNothing().when(innerStoreMock).init(context, store);
store.init(context, store);
final long restoreTimeNs = 1000L;
store.recordRestoreTime(restoreTimeNs);
// it suffices to verify one restore metric since all restore metrics are recorded by the same sensor
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("restore-rate");
assertThat((Double) metric.metricValue(), greaterThan(0.0));
final KafkaMetric metric = metric("restore-latency-max");
assertThat((Double) metric.metricValue(), equalTo((double) restoreTimeNs));
}
@Test