diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java index 0b6539a3c7d..f0aaab64b56 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java @@ -189,7 +189,11 @@ public class GlobalStreamThread extends Thread { this.topology = topology; this.globalConsumer = globalConsumer; this.stateDirectory = stateDirectory; - this.streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId); + this.streamsMetrics = new StreamsMetricsImpl( + metrics, + threadClientId, + config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG) + ); this.logPrefix = String.format("global-stream-thread [%s] ", threadClientId); this.logContext = new LogContext(logPrefix); this.log = logContext.logger(getClass()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 1c88054d232..00b376d3a10 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -596,7 +596,11 @@ public class StreamThread extends Thread { threadProducer = clientSupplier.getProducer(producerConfigs); } - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( + metrics, + threadClientId, + config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG) + ); final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index 34338e1502d..1e7c1aee480 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.metrics.stats.Value; import org.apache.kafka.common.metrics.stats.WindowedCount; import org.apache.kafka.common.metrics.stats.WindowedSum; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import java.util.Arrays; @@ -42,10 +43,17 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; public class StreamsMetricsImpl implements StreamsMetrics { + + public enum Version { + LATEST, + FROM_100_TO_23 + } + private final Metrics metrics; private final Map parentSensors; private final String threadName; + private final Version version; private final Deque threadLevelSensors = new LinkedList<>(); private final Map> taskLevelSensors = new HashMap<>(); private final Map> nodeLevelSensors = new HashMap<>(); @@ -83,14 +91,28 @@ public class StreamsMetricsImpl implements StreamsMetrics { public static final String EXPIRED_WINDOW_RECORD_DROP = "expired-window-record-drop"; public static final String LATE_RECORD_DROP = "late-record-drop"; - public StreamsMetricsImpl(final Metrics metrics, final String threadName) { + public StreamsMetricsImpl(final Metrics metrics, final String threadName, final String builtInMetricsVersion) { Objects.requireNonNull(metrics, "Metrics cannot be null"); + Objects.requireNonNull(builtInMetricsVersion, "Built-in metrics version cannot be null"); this.metrics = metrics; this.threadName = threadName; + this.version = parseBuiltInMetricsVersion(builtInMetricsVersion); this.parentSensors = new HashMap<>(); } + private static Version parseBuiltInMetricsVersion(final String builtInMetricsVersion) { + if (builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST)) { + return Version.LATEST; + } else { + return Version.FROM_100_TO_23; + } + } + + public Version version() { + return version; + } + public final Sensor threadLevelSensor(final String sensorName, final RecordingLevel recordingLevel, final Sensor... parents) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java index bd3553083ff..273f4b9e45a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java @@ -17,11 +17,12 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; public class MockStreamsMetrics extends StreamsMetricsImpl { public MockStreamsMetrics(final Metrics metrics) { - super(metrics, "test"); + super(metrics, "test", StreamsConfig.METRICS_LATEST); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index df889798a86..033aeeefd7f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -160,7 +160,8 @@ public class StandbyTaskTest { private final byte[] recordKey = intSerializer.serialize(null, 1); private final String threadName = "threadName"; - private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), threadName); + private final StreamsMetricsImpl streamsMetrics = + new StreamsMetricsImpl(new Metrics(), threadName, StreamsConfig.METRICS_LATEST); @Before public void setup() throws Exception { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 5f7d9dd56da..d60c0069039 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -340,7 +340,7 @@ public class StreamThreadTest { final Consumer consumer = EasyMock.createNiceMock(Consumer.class); final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 1); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId, StreamsConfig.METRICS_LATEST); final StreamThread thread = new StreamThread( mockTime, config, @@ -466,7 +466,7 @@ public class StreamThreadTest { final Consumer consumer = EasyMock.createNiceMock(Consumer.class); final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId, StreamsConfig.METRICS_LATEST); final StreamThread thread = new StreamThread( mockTime, config, @@ -501,7 +501,7 @@ public class StreamThreadTest { final Consumer consumer = EasyMock.createNiceMock(Consumer.class); final TaskManager taskManager = mockTaskManagerCommit(consumer, 2, 1); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId, StreamsConfig.METRICS_LATEST); final StreamThread thread = new StreamThread( mockTime, config, @@ -651,7 +651,8 @@ public class StreamThreadTest { EasyMock.expectLastCall(); EasyMock.replay(taskManager, consumer); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); + final StreamsMetricsImpl streamsMetrics = + new StreamsMetricsImpl(metrics, clientId, StreamsConfig.METRICS_LATEST); final StreamThread thread = new StreamThread( mockTime, config, @@ -684,7 +685,8 @@ public class StreamThreadTest { EasyMock.expectLastCall(); EasyMock.replay(taskManager, consumer); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); + final StreamsMetricsImpl streamsMetrics = + new StreamsMetricsImpl(metrics, clientId, StreamsConfig.METRICS_LATEST); final StreamThread thread = new StreamThread( mockTime, config, @@ -756,7 +758,8 @@ public class StreamThreadTest { taskManager.setConsumer(mockStreamThreadConsumer); taskManager.setAssignmentMetadata(Collections.emptyMap(), Collections.emptyMap()); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); + final StreamsMetricsImpl streamsMetrics = + new StreamsMetricsImpl(metrics, clientId, StreamsConfig.METRICS_LATEST); final StreamThread thread = new StreamThread( mockTime, config, @@ -790,7 +793,8 @@ public class StreamThreadTest { EasyMock.expectLastCall(); EasyMock.replay(taskManager, consumer); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); + final StreamsMetricsImpl streamsMetrics = + new StreamsMetricsImpl(metrics, clientId, StreamsConfig.METRICS_LATEST); final StreamThread thread = new StreamThread( mockTime, config, @@ -1184,7 +1188,8 @@ public class StreamThreadTest { private StandbyTask createStandbyTask() { final LogContext logContext = new LogContext("test"); final Logger log = logContext.logger(StreamThreadTest.class); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); + final StreamsMetricsImpl streamsMetrics = + new StreamsMetricsImpl(metrics, clientId, StreamsConfig.METRICS_LATEST); final StreamThread.StandbyTaskCreator standbyTaskCreator = new StreamThread.StandbyTaskCreator( internalTopologyBuilder, config, @@ -1621,7 +1626,8 @@ public class StreamThreadTest { final Consumer consumer = EasyMock.createNiceMock(Consumer.class); final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); + final StreamsMetricsImpl streamsMetrics = + new StreamsMetricsImpl(metrics, clientId, StreamsConfig.METRICS_LATEST); final StreamThread thread = new StreamThread( mockTime, config, @@ -1660,7 +1666,7 @@ public class StreamThreadTest { final Consumer consumer = EasyMock.createNiceMock(Consumer.class); final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId, StreamsConfig.METRICS_LATEST); final StreamThread thread = new StreamThread( mockTime, config, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java index 4fd6f88a95d..f1d951bab96 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java @@ -23,6 +23,8 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.junit.Test; @@ -50,6 +52,8 @@ public class StreamsMetricsImplTest extends EasyMockSupport { private final static String SENSOR_PREFIX_DELIMITER = "."; private final static String SENSOR_NAME_DELIMITER = ".s."; private final static String INTERNAL_PREFIX = "internal"; + private final static String THREAD_NAME = "test-thread"; + private final static String VERSION = StreamsConfig.METRICS_LATEST; private final Metrics metrics = new Metrics(); private final Sensor sensor = metrics.sensor("dummy"); @@ -59,21 +63,21 @@ public class StreamsMetricsImplTest extends EasyMockSupport { private final String description1 = "description number one"; private final String description2 = "description number two"; private final MockTime time = new MockTime(0); + private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_NAME, VERSION); @Test public void shouldGetThreadLevelSensor() { final Metrics metrics = mock(Metrics.class); - final String threadName = "thread1"; + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_NAME, VERSION); final String sensorName = "sensor1"; final String expectedFullSensorName = - INTERNAL_PREFIX + SENSOR_PREFIX_DELIMITER + threadName + SENSOR_NAME_DELIMITER + sensorName; + INTERNAL_PREFIX + SENSOR_PREFIX_DELIMITER + THREAD_NAME + SENSOR_NAME_DELIMITER + sensorName; final RecordingLevel recordingLevel = RecordingLevel.DEBUG; final Sensor[] parents = {}; EasyMock.expect(metrics.sensor(expectedFullSensorName, recordingLevel, parents)).andReturn(null); replayAll(); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName); final Sensor sensor = streamsMetrics.threadLevelSensor(sensorName, recordingLevel); verifyAll(); @@ -83,12 +87,11 @@ public class StreamsMetricsImplTest extends EasyMockSupport { @Test(expected = NullPointerException.class) public void testNullMetrics() { - new StreamsMetricsImpl(null, ""); + new StreamsMetricsImpl(null, "", VERSION); } @Test(expected = NullPointerException.class) public void testRemoveNullSensor() { - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), ""); streamsMetrics.removeSensor(null); } @@ -98,7 +101,6 @@ public class StreamsMetricsImplTest extends EasyMockSupport { final String scope = "scope"; final String entity = "entity"; final String operation = "put"; - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), ""); final Sensor sensor1 = streamsMetrics.addSensor(sensorName, Sensor.RecordingLevel.DEBUG); streamsMetrics.removeSensor(sensor1); @@ -116,9 +118,9 @@ public class StreamsMetricsImplTest extends EasyMockSupport { } @Test - public void testMutiLevelSensorRemoval() { + public void testMultiLevelSensorRemoval() { final Metrics registry = new Metrics(); - final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, ""); + final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, THREAD_NAME, VERSION); for (final MetricName defaultMetric : registry.metrics().keySet()) { registry.removeMetric(defaultMetric); } @@ -169,7 +171,6 @@ public class StreamsMetricsImplTest extends EasyMockSupport { @Test public void testLatencyMetrics() { - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), ""); final int defaultMetrics = streamsMetrics.metrics().size(); final String scope = "scope"; @@ -189,7 +190,6 @@ public class StreamsMetricsImplTest extends EasyMockSupport { @Test public void testThroughputMetrics() { - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), ""); final int defaultMetrics = streamsMetrics.metrics().size(); final String scope = "scope"; @@ -211,7 +211,7 @@ public class StreamsMetricsImplTest extends EasyMockSupport { final MockTime time = new MockTime(1); final MetricConfig config = new MetricConfig().timeWindow(1, TimeUnit.MILLISECONDS); final Metrics metrics = new Metrics(config, time); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, ""); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "", VERSION); final String scope = "scope"; final String entity = "entity"; @@ -245,8 +245,6 @@ public class StreamsMetricsImplTest extends EasyMockSupport { @Test public void shouldGetStoreLevelTagMap() { - final String threadName = "test-thread"; - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName); final String taskName = "test-task"; final String storeType = "remote-window"; final String storeName = "window-keeper"; @@ -254,7 +252,7 @@ public class StreamsMetricsImplTest extends EasyMockSupport { final Map tagMap = streamsMetrics.storeLevelTagMap(taskName, storeType, storeName); assertThat(tagMap.size(), equalTo(3)); - assertThat(tagMap.get(StreamsMetricsImpl.THREAD_ID_TAG), equalTo(threadName)); + assertThat(tagMap.get(StreamsMetricsImpl.THREAD_ID_TAG), equalTo(THREAD_NAME)); assertThat(tagMap.get(StreamsMetricsImpl.TASK_ID_TAG), equalTo(taskName)); assertThat(tagMap.get(storeType + "-" + StreamsMetricsImpl.STORE_ID_TAG), equalTo(storeName)); } @@ -327,6 +325,22 @@ public class StreamsMetricsImplTest extends EasyMockSupport { assertThat(metrics.metrics().size(), equalTo(2 + 1)); // one metric is added automatically in the constructor of Metrics } + @Test + public void shouldReturnMetricsVersionCurrent() { + assertThat( + new StreamsMetricsImpl(metrics, THREAD_NAME, StreamsConfig.METRICS_LATEST).version(), + equalTo(Version.LATEST) + ); + } + + @Test + public void shouldReturnMetricsVersionFrom100To23() { + assertThat( + new StreamsMetricsImpl(metrics, THREAD_NAME, StreamsConfig.METRICS_0100_TO_23).version(), + equalTo(Version.FROM_100_TO_23) + ); + } + private void verifyMetric(final String name, final String description, final double valueToRecord1, diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java index 9c76e14d253..1ab8684e905 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; @@ -90,7 +91,9 @@ public class GlobalStateStoreProviderTest { final ProcessorContextImpl mockContext = mock(ProcessorContextImpl.class); expect(mockContext.applicationId()).andReturn("appId").anyTimes(); - expect(mockContext.metrics()).andReturn(new StreamsMetricsImpl(new Metrics(), "threadName")).anyTimes(); + expect(mockContext.metrics()) + .andReturn(new StreamsMetricsImpl(new Metrics(), "threadName", StreamsConfig.METRICS_LATEST)) + .anyTimes(); expect(mockContext.taskId()).andReturn(new TaskId(0, 0)).anyTimes(); expect(mockContext.recordCollector()).andReturn(null).anyTimes(); replay(mockContext); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java index e3378faf885..53096d2bf13 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java @@ -59,7 +59,8 @@ public class MeteredTimestampedWindowStoreTest { @Before public void setUp() { - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test"); + final StreamsMetricsImpl streamsMetrics = + new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST); context = new InternalMockProcessorContext( TestUtils.tempDirectory(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index bd5bce1632f..e62ab3a8a42 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -75,7 +75,8 @@ public class MeteredWindowStoreTest { @Before public void setUp() { - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test"); + final StreamsMetricsImpl streamsMetrics = + new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST); context = new InternalMockProcessorContext( TestUtils.tempDirectory(), diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index 4b9267959ac..0935ae300f7 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -69,7 +69,7 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple this(null, null, null, - new StreamsMetricsImpl(new Metrics(), "mock"), + new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST), new StreamsConfig(StreamsTestUtils.getStreamsConfig()), null, null @@ -78,14 +78,30 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple public InternalMockProcessorContext(final File stateDir, final StreamsConfig config) { - this(stateDir, null, null, new StreamsMetricsImpl(new Metrics(), "mock"), config, null, null); + this( + stateDir, + null, + null, + new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST), + config, + null, + null + ); } public InternalMockProcessorContext(final File stateDir, final Serde keySerde, final Serde valSerde, final StreamsConfig config) { - this(stateDir, keySerde, valSerde, new StreamsMetricsImpl(new Metrics(), "mock"), config, null, null); + this( + stateDir, + keySerde, + valSerde, + new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST), + config, + null, + null + ); } public InternalMockProcessorContext(final StateSerdes serdes, @@ -100,7 +116,7 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple null, serdes.keySerde(), serdes.valueSerde(), - new StreamsMetricsImpl(metrics, "mock"), + new StreamsMetricsImpl(metrics, "mock", StreamsConfig.METRICS_LATEST), new StreamsConfig(StreamsTestUtils.getStreamsConfig()), () -> collector, null @@ -115,7 +131,7 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple this(stateDir, keySerde, valSerde, - new StreamsMetricsImpl(new Metrics(), "mock"), + new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST), new StreamsConfig(StreamsTestUtils.getStreamsConfig()), () -> collector, cache diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index d0ca6eae253..0a0351caede 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -280,7 +280,11 @@ public class TopologyTestDriver implements Closeable { metrics = new Metrics(metricConfig, mockWallClockTime); final String threadName = "topology-test-driver-virtual-thread"; - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( + metrics, + threadName, + StreamsConfig.METRICS_LATEST + ); final Sensor skippedRecordsSensor = streamsMetrics.threadLevelSensor("skipped-records", Sensor.RecordingLevel.INFO); final String threadLevelGroup = "stream-metrics"; skippedRecordsSensor.add(new MetricName("skipped-records-rate", diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java index c478b6ba2bf..041bf0d65d8 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java @@ -214,7 +214,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S final MetricConfig metricConfig = new MetricConfig(); metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG); final String threadName = "mock-processor-context-virtual-thread"; - this.metrics = new StreamsMetricsImpl(new Metrics(metricConfig), threadName); + this.metrics = new StreamsMetricsImpl(new Metrics(metricConfig), threadName, StreamsConfig.METRICS_LATEST); ThreadMetrics.skipRecordSensor(metrics); }