diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java index e081ff72d6d..c22c914bd29 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java @@ -365,7 +365,7 @@ public class IQv2IntegrationTest { } @Override - public void initMetricsIfNeeded() { + public void assignThread() { } @Override diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java index 7228a1d43d4..a0eeca81ba3 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java @@ -821,7 +821,7 @@ public class VersionedKeyValueStoreIntegrationTest { } @Override - public void initMetricsIfNeeded() { + public void assignThread() { } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java index 3b9d3c814a3..d299c883a6a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java @@ -71,7 +71,7 @@ public interface StateStore { */ void init(final StateStoreContext stateStoreContext, final StateStore root); - void initMetricsIfNeeded(); + void assignThread(); /** * Flush any cached data diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java index 613870b9b5f..a2d8d1b52c9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java @@ -49,7 +49,7 @@ abstract class AbstractReadWriteDecorator extends Wr } @Override - public void initMetricsIfNeeded() { + public void assignThread() { throw new UnsupportedOperationException(ERROR_MESSAGE); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java index c3dc68c669a..acee901af33 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java @@ -79,7 +79,7 @@ public class ReadOnlyTask implements Task { } @Override - public void initializeMetricsIfNeeded() { + public void assignThread() { throw new UnsupportedOperationException("This task is read-only"); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 314165fb64e..4513e065ce7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -129,11 +129,11 @@ public class StandbyTask extends AbstractTask implements Task { } @Override - public void initializeMetricsIfNeeded() { + public void assignThread() { closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetrics); updateSensor = TaskMetrics.updateSensor(Thread.currentThread().getName(), id.toString(), streamsMetrics); for (final StateStore stateStore : topology.stateStores()) { - stateStore.initMetricsIfNeeded(); + stateStore.assignThread(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 19ed146830d..fb15b88626d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -280,9 +280,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, } @Override - public void initializeMetricsIfNeeded() { + public void assignThread() { for (final StateStore stateStore : topology.stateStores()) { - stateStore.initMetricsIfNeeded(); + stateStore.assignThread(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java index 7fc64f13e18..3f338beea39 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java @@ -110,7 +110,7 @@ public interface Task { */ void initializeIfNeeded(); - void initializeMetricsIfNeeded(); + void assignThread(); default void addPartitionsForOffsetReset(final Set partitionsForOffsetReset) { throw new UnsupportedOperationException(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index c30ceb1e09a..c8a922f9533 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -343,7 +343,7 @@ public class TaskManager { final TaskId taskId = entry.getKey(); final Task task = stateDirectory.removeStartupTask(taskId); if (task != null) { - task.initializeMetricsIfNeeded(); + task.assignThread(); // replace our dummy values with the real ones, now we know our thread and assignment final Set inputPartitions = entry.getValue(); @@ -930,7 +930,7 @@ public class TaskManager { for (final Task task : tasks.allTasks()) { try { task.initializeIfNeeded(); - task.initializeMetricsIfNeeded(); + task.assignThread(); task.clearTaskTimeout(); } catch (final LockException lockException) { // it is possible that if there are multiple threads within the instance that one thread @@ -1085,7 +1085,7 @@ public class TaskManager { try { if (canTryInitializeTask(task.id(), nowMs)) { task.initializeIfNeeded(); - task.initializeMetricsIfNeeded(); + task.assignThread(); taskIdToBackoffRecord.remove(task.id()); stateUpdater.add(task); } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java index f4afc669e88..788e474c9e5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java @@ -273,7 +273,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore implements Se } @Override - public void initMetricsIfNeeded() { + public void assignThread() { registerMetrics(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 83343d04494..f03273b4fb6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -104,6 +104,11 @@ public class CachingKeyValueStore } }); super.init(stateStoreContext, root); + } + + @Override + public void assignThread() { + super.assignThread(); // save the stream thread as we only ever want to trigger a flush // when the stream thread is the current thread. streamThread = Thread.currentThread(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index fe847f33909..01ca5116559 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -99,7 +99,7 @@ public class InMemoryKeyValueStore implements KeyValueStore { } @Override - public void initMetricsIfNeeded() { + public void assignThread() { } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index a0b563301f2..db6d95f0079 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -135,7 +135,7 @@ public class InMemorySessionStore implements SessionStore { } @Override - public void initMetricsIfNeeded() { + public void assignThread() { if (context != null) { registerMetrics(); } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java index 0c74852fe31..d3587cc1472 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java @@ -209,7 +209,7 @@ public final class InMemoryTimeOrderedKeyValueChangeBuffer implements T } @Override - public void initMetricsIfNeeded() { + public void assignThread() { registerMetrics(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index ab8517f61b7..2d9f2f3290d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -140,7 +140,7 @@ public class InMemoryWindowStore implements WindowStore { } @Override - public void initMetricsIfNeeded() { + public void assignThread() { registerMetrics(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java index ccc60c6d36d..e7b2c39170c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java @@ -65,7 +65,7 @@ class KeyValueSegments extends AbstractSegments { @Override public void openExisting(final StateStoreContext context, final long streamTime) { metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId()); - metricsRecorder.initMetricsIfNeeded(); + metricsRecorder.assignThread(); super.openExisting(context, streamTime); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java index 81995836fe7..b17ce42fbf8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java @@ -127,8 +127,8 @@ public class KeyValueStoreWrapper implements StateStore { } @Override - public void initMetricsIfNeeded() { - store.initMetricsIfNeeded(); + public void assignThread() { + store.assignThread(); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java index d908c136c1e..4ff06f9db41 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java @@ -100,8 +100,8 @@ public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueSt } @Override - public void initMetricsIfNeeded() { - store.initMetricsIfNeeded(); + public void assignThread() { + store.assignThread(); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java index 71aced68f13..ca8635963ee 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java @@ -142,7 +142,7 @@ class LogicalKeyValueSegment implements Comparable, Segm } @Override - public void initMetricsIfNeeded() { + public void assignThread() { throw new UnsupportedOperationException("nothing to reassign"); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java index 03a6f7fc1db..f682b725111 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java @@ -105,7 +105,7 @@ public class LogicalKeyValueSegments extends AbstractSegments { } @Override - public void initMetricsIfNeeded() { + public void assignThread() { } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index e63697c2fd4..135c0537524 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -106,6 +106,7 @@ public class MeteredKeyValueStore (query, positionBound, config, store) -> runKeyQuery(query, positionBound, config) ) ); + private Sensor restoreSensor; MeteredKeyValueStore(final KeyValueStore inner, final String metricsScope, @@ -127,17 +128,13 @@ public class MeteredKeyValueStore initStoreSerde(stateStoreContext); streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics(); - final Sensor restoreSensor = - StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics); - - // register and possibly restore the state from the logs - maybeMeasureLatency(() -> super.init(stateStoreContext, root), time, restoreSensor); + super.init(stateStoreContext, root); } @Override - public void initMetricsIfNeeded() { + public void assignThread() { registerMetrics(); - super.initMetricsIfNeeded(); + super.assignThread(); } private void registerMetrics() { @@ -155,6 +152,7 @@ public class MeteredKeyValueStore StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics, (config, now) -> openIterators.sum()); openIterators = new OpenIterators(taskId, metricsScope, name(), streamsMetrics); + restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics); } protected Serde prepareValueSerdeForStore(final Serde valueSerde, final SerdeGetter getter) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 3f3c763edd1..1ecb8d5713e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -180,8 +180,8 @@ public class RocksDBStore implements KeyValueStore, BatchWritingS } @Override - public void initMetricsIfNeeded() { - metricsRecorder.initMetricsIfNeeded(); + public void assignThread() { + metricsRecorder.assignThread(); } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java index 0b85c0a5dc9..cae04f16793 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java @@ -195,7 +195,7 @@ public class RocksDBTimeOrderedKeyValueBuffer implements TimeOrderedKeyVal } @Override - public void initMetricsIfNeeded() { + public void assignThread() { } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java index 5049ac2e0ce..432191335e9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java @@ -383,9 +383,9 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore { @Override public void openExisting(final StateStoreContext context, final long streamTime) { metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId()); - metricsRecorder.initMetricsIfNeeded(); + metricsRecorder.assignThread(); super.openExisting(context, streamTime); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java index cf36895b787..072ebbe0e6b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java @@ -93,8 +93,8 @@ public class VersionedKeyValueToBytesStoreAdapter implements VersionedBytesStore } @Override - public void initMetricsIfNeeded() { - inner.initMetricsIfNeeded(); + public void assignThread() { + inner.assignThread(); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java index d55161c6aec..0bec148bb91 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java @@ -161,8 +161,8 @@ class WindowToTimestampedWindowByteStoreAdapter implements WindowStore implements S } @Override - public void initMetricsIfNeeded() { - wrapped.initMetricsIfNeeded(); + public void assignThread() { + wrapped.assignThread(); } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java index 4a85e568f5f..5f5f49c645f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java @@ -150,7 +150,7 @@ public class RocksDBMetricsRecorder { this.streamsMetrics = streamsMetrics; } - public void initMetricsIfNeeded() { + public void assignThread() { final RocksDBMetricContext metricContext = new RocksDBMetricContext(taskId.toString(), metricsScope, storeName); initSensors(streamsMetrics, metricContext); initGauges(streamsMetrics, metricContext); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 9abcde4d11d..608dab11ca6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -4922,7 +4922,7 @@ public class TaskManagerTest { } @Override - public void initializeMetricsIfNeeded() { + public void assignThread() { } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java index 816f8439bfd..882f803c23d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java @@ -59,7 +59,7 @@ public class NoOpWindowStore implements ReadOnlyWindowStore, StateStore { public void init(final StateStoreContext stateStoreContext, final StateStore root) {} @Override - public void initMetricsIfNeeded() { + public void assignThread() { } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java index fbb26ae1682..be2368dae92 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java @@ -381,7 +381,7 @@ public class ReadOnlyWindowStoreStub implements ReadOnlyWindowStore, public void init(final StateStoreContext stateStoreContext, final StateStore root) {} @Override - public void initMetricsIfNeeded() { + public void assignThread() { } @Override diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java index 1d6ae0a14f9..f51aa9ed37f 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java +++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java @@ -65,7 +65,7 @@ public class MockKeyValueStore implements KeyValueStore { } @Override - public void initMetricsIfNeeded() { + public void assignThread() { } @Override diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java index 3b863efbea5..6311eb15981 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java @@ -89,7 +89,7 @@ public class NoOpReadOnlyStore implements ReadOnlyKeyValueStore, Sta } @Override - public void initMetricsIfNeeded() { + public void assignThread() { } @Override diff --git a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java index 08cd79b06ac..324d7b69cac 100644 --- a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java @@ -187,7 +187,7 @@ public class ReadOnlySessionStoreStub implements ReadOnlySessionStore