diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java index e85ac344157..7ffeb6ac035 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -112,6 +112,9 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.wa import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -685,6 +688,52 @@ public class RestoreIntegrationTest { } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void shouldRecordRestoreMetrics(final boolean useNewProtocol) throws Exception { + final AtomicInteger numReceived = new AtomicInteger(0); + final StreamsBuilder builder = new StreamsBuilder(); + + final Properties props = props(); + + if (useNewProtocol) { + props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); + } + + props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); + + createStateForRestoration(inputStream, 10000); + + final CountDownLatch shutdownLatch = new CountDownLatch(1); + builder.table(inputStream, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as("store")) + .toStream() + .foreach((key, value) -> { + if (numReceived.incrementAndGet() == numberOfKeys) { + shutdownLatch.countDown(); + } + }); + + kafkaStreams = new KafkaStreams(builder.build(), props); + + final AtomicLong restored = new AtomicLong(0); + final TrackingStateRestoreListener restoreListener = new TrackingStateRestoreListener(restored); + kafkaStreams.setGlobalStateRestoreListener(restoreListener); + kafkaStreams.start(); + + assertTrue(shutdownLatch.await(30, TimeUnit.SECONDS)); + assertThat(numReceived.get(), equalTo(numberOfKeys)); + + final Map taskIdToMetricValue = kafkaStreams.metrics().entrySet().stream() + .filter(e -> e.getKey().name().equals("restore-latency-max")) + .collect(Collectors.toMap(e -> e.getKey().tags().get("task-id"), e -> ((Double) e.getValue().metricValue()).longValue())); + + for (final Map.Entry entry : restoreListener.changelogToRestoreTime().entrySet()) { + final long lowerBound = entry.getValue() - TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS); + final long upperBound = entry.getValue() + TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS); + assertThat(taskIdToMetricValue.get("0_" + entry.getKey().partition()), allOf(greaterThanOrEqualTo(lowerBound), lessThanOrEqualTo(upperBound))); + } + } + private void validateReceivedMessages(final List> expectedRecords, final String outputTopic) throws Exception { final Properties consumerProperties = new Properties(); @@ -971,4 +1020,4 @@ public class RestoreIntegrationTest { } } } -} \ No newline at end of file +} diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index a7b3f838f2a..44ea3c7fc89 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -1337,6 +1337,8 @@ public class IntegrationTestUtils { public final Map changelogToStartOffset = new ConcurrentHashMap<>(); public final Map changelogToEndOffset = new ConcurrentHashMap<>(); public final Map changelogToTotalNumRestored = new ConcurrentHashMap<>(); + private final Map changelogToRestoreStartTime = new ConcurrentHashMap<>(); + private final Map changelogToRestoreEndTime = new ConcurrentHashMap<>(); private final AtomicLong restored; public TrackingStateRestoreListener() { @@ -1355,6 +1357,7 @@ public class IntegrationTestUtils { changelogToStartOffset.put(topicPartition, new AtomicLong(startingOffset)); changelogToEndOffset.put(topicPartition, new AtomicLong(endingOffset)); changelogToTotalNumRestored.put(topicPartition, new AtomicLong(0L)); + changelogToRestoreStartTime.put(topicPartition, new AtomicLong(System.nanoTime())); } @Override @@ -1372,6 +1375,7 @@ public class IntegrationTestUtils { if (restored != null) { restored.addAndGet(totalRestored); } + changelogToRestoreEndTime.put(topicPartition, new AtomicLong(System.nanoTime())); } public long totalNumRestored() { @@ -1381,6 +1385,11 @@ public class IntegrationTestUtils { } return totalNumRestored; } + + public Map changelogToRestoreTime() { + return changelogToRestoreStartTime.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> changelogToRestoreEndTime.get(e.getKey()).get() - e.getValue().get())); + } } public static class TrackingStandbyUpdateListener implements StandbyUpdateListener { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index c91e9b38b99..5e09ceb62da 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -41,6 +41,7 @@ import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata; import org.apache.kafka.streams.processor.internals.Task.TaskType; +import org.apache.kafka.streams.state.internals.MeteredStateStore; import org.slf4j.Logger; @@ -138,6 +139,8 @@ public class StoreChangelogReader implements ChangelogReader { // either due to limit offset (standby) or committed end offset (active) private int bufferedLimitIndex; + private long restoreStartTimeNs; + private ChangelogMetadata(final StateStoreMetadata storeMetadata, final ProcessorStateManager stateManager) { this.changelogState = ChangelogState.REGISTERED; this.storeMetadata = storeMetadata; @@ -188,6 +191,10 @@ public class StoreChangelogReader implements ChangelogReader { int bufferedLimitIndex() { return bufferedLimitIndex; } + + long calculateRestoreTime(final long restoreEndTimeNs) { + return restoreEndTimeNs - restoreStartTimeNs; + } } private static final long DEFAULT_OFFSET_UPDATE_MS = Duration.ofMinutes(5L).toMillis(); @@ -695,6 +702,9 @@ public class StoreChangelogReader implements ChangelogReader { changelogMetadata.transitTo(ChangelogState.COMPLETED); pauseChangelogsFromRestoreConsumer(Collections.singleton(partition)); + if (storeMetadata.store() instanceof MeteredStateStore) { + ((MeteredStateStore) storeMetadata.store()).recordRestoreTime(changelogMetadata.calculateRestoreTime(time.nanoseconds())); + } try { stateRestoreListener.onRestoreEnd(partition, storeName, changelogMetadata.totalRestored); @@ -1026,6 +1036,7 @@ public class StoreChangelogReader implements ChangelogReader { // no records to restore; in this case we just initialize the sensor to zero final long recordsToRestore = Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L); task.recordRestoration(time, recordsToRestore, true); + changelogMetadata.restoreStartTimeNs = time.nanoseconds(); } else if (changelogMetadata.stateManager.taskType() == TaskType.STANDBY) { try { standbyUpdateListener.onUpdateStart(partition, storeName, startOffset); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 0962033b7ef..9c033d6bbd5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -69,7 +69,7 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric */ public class MeteredKeyValueStore extends WrappedStateStore, K, V> - implements KeyValueStore { + implements KeyValueStore, MeteredStateStore { final Serde keySerde; final Serde valueSerde; @@ -91,6 +91,7 @@ public class MeteredKeyValueStore protected InternalProcessorContext internalContext; private StreamsMetricsImpl streamsMetrics; private TaskId taskId; + private Sensor restoreSensor; protected OpenIterators openIterators; @@ -128,11 +129,10 @@ public class MeteredKeyValueStore streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics(); registerMetrics(); - final Sensor restoreSensor = - StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics); - // register and possibly restore the state from the logs - maybeMeasureLatency(() -> super.init(stateStoreContext, root), time, restoreSensor); + restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics); + + super.init(stateStoreContext, root); } private void registerMetrics() { @@ -152,6 +152,11 @@ public class MeteredKeyValueStore openIterators = new OpenIterators(taskId, metricsScope, name(), streamsMetrics); } + @Override + public void recordRestoreTime(final long restoreTimeNs) { + restoreSensor.record(restoreTimeNs); + } + protected Serde prepareValueSerdeForStore(final Serde valueSerde, final SerdeGetter getter) { return WrappingNullableUtils.prepareValueSerde(valueSerde, getter); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 234ac1220f7..7794a6ebc51 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -57,7 +57,7 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric public class MeteredSessionStore extends WrappedStateStore, Windowed, V> - implements SessionStore { + implements SessionStore, MeteredStateStore { private final String metricsScope; private final Serde keySerde; @@ -73,6 +73,7 @@ public class MeteredSessionStore private Sensor iteratorDurationSensor; private InternalProcessorContext internalContext; private TaskId taskId; + private Sensor restoreSensor; private final LongAdder numOpenIterators = new LongAdder(); private final NavigableSet openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp)); @@ -108,11 +109,9 @@ public class MeteredSessionStore streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics(); registerMetrics(); - final Sensor restoreSensor = - StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics); + restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics); - // register and possibly restore the state from the logs - maybeMeasureLatency(() -> super.init(stateStoreContext, root), time, restoreSensor); + super.init(stateStoreContext, root); } private void registerMetrics() { @@ -132,6 +131,11 @@ public class MeteredSessionStore ); } + @Override + public void recordRestoreTime(final long restoreTimeNs) { + restoreSensor.record(restoreTimeNs); + } + private void initStoreSerde(final StateStoreContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredStateStore.java new file mode 100644 index 00000000000..6b9f4eeb8f2 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredStateStore.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +public interface MeteredStateStore { + + void recordRestoreTime(final long restoreTimeNs); +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 2da877453ce..1ba37da6dab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -60,7 +60,7 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric public class MeteredWindowStore extends WrappedStateStore, Windowed, V> - implements WindowStore { + implements WindowStore, MeteredStateStore { private final long windowSizeMs; private final String metricsScope; @@ -76,6 +76,7 @@ public class MeteredWindowStore private Sensor iteratorDurationSensor; private InternalProcessorContext internalContext; private TaskId taskId; + private Sensor restoreSensor; private final LongAdder numOpenIterators = new LongAdder(); private final NavigableSet openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp)); @@ -124,8 +125,8 @@ public class MeteredWindowStore streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics(); registerMetrics(); - final Sensor restoreSensor = - StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics); + + restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics); // register and possibly restore the state from the logs maybeMeasureLatency(() -> super.init(stateStoreContext, root), time, restoreSensor); @@ -150,6 +151,11 @@ public class MeteredWindowStore ); } + @Override + public void recordRestoreTime(final long restoreTimeNs) { + restoreSensor.record(restoreTimeNs); + } + private void initStoreSerde(final StateStoreContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 69655b4d642..99a2b2519f9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -41,6 +41,7 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata; +import org.apache.kafka.streams.state.internals.MeteredKeyValueStore; import org.apache.kafka.test.MockStandbyUpdateListener; import org.apache.kafka.test.MockStateRestoreListener; import org.apache.kafka.test.StreamsTestUtils; @@ -89,7 +90,9 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -1364,6 +1367,58 @@ public class StoreChangelogReaderTest { } } + @Test + public void shouldCallRecordRestoreTimeAtTheEndOfRestore() { + setupActiveStateManager(); + + final MeteredKeyValueStore meteredStateStore = mock(MeteredKeyValueStore.class); + + when(storeMetadata.changelogPartition()).thenReturn(tp); + when(storeMetadata.store()).thenReturn(meteredStateStore); + when(meteredStateStore.name()).thenReturn(storeName); + final TaskId taskId = new TaskId(0, 0); + + when(storeMetadata.offset()).thenReturn(0L); + when(activeStateManager.taskId()).thenReturn(taskId); + + setupConsumer(2, tp); + consumer.updateEndOffsets(Collections.singletonMap(tp, 2L)); + adminClient.updateEndOffsets(Collections.singletonMap(tp, 2L)); + + changelogReader.register(tp, activeStateManager); + + changelogReader.restore(Collections.singletonMap(taskId, mock(Task.class))); + + assertEquals(1L, changelogReader.changelogMetadata(tp).totalRestored()); + verify(meteredStateStore).recordRestoreTime(anyLong()); + } + + @Test + public void shouldNotCallRecordRestoreTimeIfRestoreDoesNotComplete() { + setupActiveStateManager(); + + final MeteredKeyValueStore meteredStateStore = mock(MeteredKeyValueStore.class); + + when(storeMetadata.changelogPartition()).thenReturn(tp); + when(storeMetadata.store()).thenReturn(meteredStateStore); + when(meteredStateStore.name()).thenReturn(storeName); + final TaskId taskId = new TaskId(0, 0); + + when(storeMetadata.offset()).thenReturn(0L); + when(activeStateManager.taskId()).thenReturn(taskId); + + setupConsumer(2, tp); + consumer.updateEndOffsets(Collections.singletonMap(tp, 3L)); + adminClient.updateEndOffsets(Collections.singletonMap(tp, 3L)); + + changelogReader.register(tp, activeStateManager); + + changelogReader.restore(Collections.singletonMap(taskId, mock(Task.class))); + + assertEquals(1L, changelogReader.changelogMetadata(tp).totalRestored()); + verify(meteredStateStore, never()).recordRestoreTime(anyLong()); + } + private void setupConsumer(final long messages, final TopicPartition topicPartition) { assignPartition(messages, topicPartition); addRecords(messages, topicPartition); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java index 294af3944f2..1ba655a75ce 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java @@ -58,7 +58,6 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.not; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; @@ -210,16 +209,19 @@ public class MeteredKeyValueStoreTest { } @Test - public void shouldRecordRestoreLatencyOnInit() { + public void shouldRecordRestoreLatencyOnRecordRestoreTime() { setUp(); doNothing().when(inner).init(context, metered); init(); + final long restoreTimeNs = 1000L; + metered.recordRestoreTime(restoreTimeNs); + // it suffices to verify one restore metric since all restore metrics are recorded by the same sensor // and the sensor is tested elsewhere - final KafkaMetric metric = metric("restore-rate"); - assertThat((Double) metric.metricValue(), greaterThan(0.0)); + final KafkaMetric metric = metric("restore-latency-max"); + assertThat((Double) metric.metricValue(), equalTo((double) restoreTimeNs)); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java index ee1b686dade..f8b08a532d1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java @@ -458,14 +458,17 @@ public class MeteredSessionStoreTest { } @Test - public void shouldRecordRestoreTimeOnInit() { + public void shouldRecordRestoreLatencyOnRecordRestoreTime() { setUp(); init(); + final long restoreTimeNs = 1000L; + store.recordRestoreTime(restoreTimeNs); + // it suffices to verify one restore metric since all restore metrics are recorded by the same sensor // and the sensor is tested elsewhere - final KafkaMetric metric = metric("restore-rate"); - assertTrue((Double) metric.metricValue() > 0); + final KafkaMetric metric = metric("restore-latency-max"); + assertThat((Double) metric.metricValue(), equalTo((double) restoreTimeNs)); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java index 8e8e02b2722..5c4509bc7a3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java @@ -183,13 +183,6 @@ public class MeteredVersionedKeyValueStoreTest { verify(valueSerializer).serialize(changelogTopicName, VALUE); } - @Test - public void shouldRecordMetricsOnInit() { - // init is called in setUp(). it suffices to verify one restore metric since all restore - // metrics are recorded by the same sensor, and the sensor is tested elsewhere. - assertThat((Double) getMetric("restore-rate").metricValue(), greaterThan(0.0)); - } - @Test public void shouldDelegateAndRecordMetricsOnPut() { when(inner.put(RAW_KEY, RAW_VALUE, TIMESTAMP)).thenReturn(PUT_RETURN_CODE_VALID_TO_UNDEFINED); @@ -473,4 +466,4 @@ public class MeteredVersionedKeyValueStoreTest { .filter(name -> name.group().equals(STORE_LEVEL_GROUP) && name.tags().equals(tags)) .collect(Collectors.toList()); } -} \ No newline at end of file +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index 1c8935d1e1c..2726ce26aa7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -210,14 +210,19 @@ public class MeteredWindowStoreTest { } @Test - public void shouldRecordRestoreLatencyOnInit() { + public void shouldRecordRestoreLatencyOnRecordRestoreTime() { + setUp(); doNothing().when(innerStoreMock).init(context, store); + store.init(context, store); + final long restoreTimeNs = 1000L; + store.recordRestoreTime(restoreTimeNs); + // it suffices to verify one restore metric since all restore metrics are recorded by the same sensor // and the sensor is tested elsewhere - final KafkaMetric metric = metric("restore-rate"); - assertThat((Double) metric.metricValue(), greaterThan(0.0)); + final KafkaMetric metric = metric("restore-latency-max"); + assertThat((Double) metric.metricValue(), equalTo((double) restoreTimeNs)); } @Test