KAFKA-8859: Expose built-in streams metrics version in `StreamsMetricsImpl` (#7323)

The streams config built.in.metrics.version is needed to add metrics in
a backward-compatible way. However, not in every location where metrics are
added a streams config is available to check built.in.metrics.version. Thus,
the config value needs to be exposed through the StreamsMetricsImpl object.

Reviewers: John Roesler <vvcephei@users.noreply.github.com>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Bruno Cadonna 2019-09-17 04:48:25 +00:00 committed by Guozhang Wang
parent c5dfb90b46
commit bab3e082dc
13 changed files with 116 additions and 39 deletions

View File

@ -189,7 +189,11 @@ public class GlobalStreamThread extends Thread {
this.topology = topology;
this.globalConsumer = globalConsumer;
this.stateDirectory = stateDirectory;
this.streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId);
this.streamsMetrics = new StreamsMetricsImpl(
metrics,
threadClientId,
config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG)
);
this.logPrefix = String.format("global-stream-thread [%s] ", threadClientId);
this.logContext = new LogContext(logPrefix);
this.log = logContext.logger(getClass());

View File

@ -596,7 +596,11 @@ public class StreamThread extends Thread {
threadProducer = clientSupplier.getProducer(producerConfigs);
}
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
metrics,
threadClientId,
config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG)
);
final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);

View File

@ -29,6 +29,7 @@ import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import java.util.Arrays;
@ -42,10 +43,17 @@ import java.util.Objects;
import java.util.concurrent.TimeUnit;
public class StreamsMetricsImpl implements StreamsMetrics {
public enum Version {
LATEST,
FROM_100_TO_23
}
private final Metrics metrics;
private final Map<Sensor, Sensor> parentSensors;
private final String threadName;
private final Version version;
private final Deque<String> threadLevelSensors = new LinkedList<>();
private final Map<String, Deque<String>> taskLevelSensors = new HashMap<>();
private final Map<String, Deque<String>> nodeLevelSensors = new HashMap<>();
@ -83,14 +91,28 @@ public class StreamsMetricsImpl implements StreamsMetrics {
public static final String EXPIRED_WINDOW_RECORD_DROP = "expired-window-record-drop";
public static final String LATE_RECORD_DROP = "late-record-drop";
public StreamsMetricsImpl(final Metrics metrics, final String threadName) {
public StreamsMetricsImpl(final Metrics metrics, final String threadName, final String builtInMetricsVersion) {
Objects.requireNonNull(metrics, "Metrics cannot be null");
Objects.requireNonNull(builtInMetricsVersion, "Built-in metrics version cannot be null");
this.metrics = metrics;
this.threadName = threadName;
this.version = parseBuiltInMetricsVersion(builtInMetricsVersion);
this.parentSensors = new HashMap<>();
}
private static Version parseBuiltInMetricsVersion(final String builtInMetricsVersion) {
if (builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST)) {
return Version.LATEST;
} else {
return Version.FROM_100_TO_23;
}
}
public Version version() {
return version;
}
public final Sensor threadLevelSensor(final String sensorName,
final RecordingLevel recordingLevel,
final Sensor... parents) {

View File

@ -17,11 +17,12 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
public class MockStreamsMetrics extends StreamsMetricsImpl {
public MockStreamsMetrics(final Metrics metrics) {
super(metrics, "test");
super(metrics, "test", StreamsConfig.METRICS_LATEST);
}
}

View File

@ -160,7 +160,8 @@ public class StandbyTaskTest {
private final byte[] recordKey = intSerializer.serialize(null, 1);
private final String threadName = "threadName";
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), threadName);
private final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(new Metrics(), threadName, StreamsConfig.METRICS_LATEST);
@Before
public void setup() throws Exception {

View File

@ -340,7 +340,7 @@ public class StreamThreadTest {
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 1);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId, StreamsConfig.METRICS_LATEST);
final StreamThread thread = new StreamThread(
mockTime,
config,
@ -466,7 +466,7 @@ public class StreamThreadTest {
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId, StreamsConfig.METRICS_LATEST);
final StreamThread thread = new StreamThread(
mockTime,
config,
@ -501,7 +501,7 @@ public class StreamThreadTest {
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
final TaskManager taskManager = mockTaskManagerCommit(consumer, 2, 1);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId, StreamsConfig.METRICS_LATEST);
final StreamThread thread = new StreamThread(
mockTime,
config,
@ -651,7 +651,8 @@ public class StreamThreadTest {
EasyMock.expectLastCall();
EasyMock.replay(taskManager, consumer);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, clientId, StreamsConfig.METRICS_LATEST);
final StreamThread thread = new StreamThread(
mockTime,
config,
@ -684,7 +685,8 @@ public class StreamThreadTest {
EasyMock.expectLastCall();
EasyMock.replay(taskManager, consumer);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, clientId, StreamsConfig.METRICS_LATEST);
final StreamThread thread = new StreamThread(
mockTime,
config,
@ -756,7 +758,8 @@ public class StreamThreadTest {
taskManager.setConsumer(mockStreamThreadConsumer);
taskManager.setAssignmentMetadata(Collections.emptyMap(), Collections.emptyMap());
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, clientId, StreamsConfig.METRICS_LATEST);
final StreamThread thread = new StreamThread(
mockTime,
config,
@ -790,7 +793,8 @@ public class StreamThreadTest {
EasyMock.expectLastCall();
EasyMock.replay(taskManager, consumer);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, clientId, StreamsConfig.METRICS_LATEST);
final StreamThread thread = new StreamThread(
mockTime,
config,
@ -1184,7 +1188,8 @@ public class StreamThreadTest {
private StandbyTask createStandbyTask() {
final LogContext logContext = new LogContext("test");
final Logger log = logContext.logger(StreamThreadTest.class);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, clientId, StreamsConfig.METRICS_LATEST);
final StreamThread.StandbyTaskCreator standbyTaskCreator = new StreamThread.StandbyTaskCreator(
internalTopologyBuilder,
config,
@ -1621,7 +1626,8 @@ public class StreamThreadTest {
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, clientId, StreamsConfig.METRICS_LATEST);
final StreamThread thread = new StreamThread(
mockTime,
config,
@ -1660,7 +1666,7 @@ public class StreamThreadTest {
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId, StreamsConfig.METRICS_LATEST);
final StreamThread thread = new StreamThread(
mockTime,
config,

View File

@ -23,6 +23,8 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Test;
@ -50,6 +52,8 @@ public class StreamsMetricsImplTest extends EasyMockSupport {
private final static String SENSOR_PREFIX_DELIMITER = ".";
private final static String SENSOR_NAME_DELIMITER = ".s.";
private final static String INTERNAL_PREFIX = "internal";
private final static String THREAD_NAME = "test-thread";
private final static String VERSION = StreamsConfig.METRICS_LATEST;
private final Metrics metrics = new Metrics();
private final Sensor sensor = metrics.sensor("dummy");
@ -59,21 +63,21 @@ public class StreamsMetricsImplTest extends EasyMockSupport {
private final String description1 = "description number one";
private final String description2 = "description number two";
private final MockTime time = new MockTime(0);
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_NAME, VERSION);
@Test
public void shouldGetThreadLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final String threadName = "thread1";
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_NAME, VERSION);
final String sensorName = "sensor1";
final String expectedFullSensorName =
INTERNAL_PREFIX + SENSOR_PREFIX_DELIMITER + threadName + SENSOR_NAME_DELIMITER + sensorName;
INTERNAL_PREFIX + SENSOR_PREFIX_DELIMITER + THREAD_NAME + SENSOR_NAME_DELIMITER + sensorName;
final RecordingLevel recordingLevel = RecordingLevel.DEBUG;
final Sensor[] parents = {};
EasyMock.expect(metrics.sensor(expectedFullSensorName, recordingLevel, parents)).andReturn(null);
replayAll();
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName);
final Sensor sensor = streamsMetrics.threadLevelSensor(sensorName, recordingLevel);
verifyAll();
@ -83,12 +87,11 @@ public class StreamsMetricsImplTest extends EasyMockSupport {
@Test(expected = NullPointerException.class)
public void testNullMetrics() {
new StreamsMetricsImpl(null, "");
new StreamsMetricsImpl(null, "", VERSION);
}
@Test(expected = NullPointerException.class)
public void testRemoveNullSensor() {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "");
streamsMetrics.removeSensor(null);
}
@ -98,7 +101,6 @@ public class StreamsMetricsImplTest extends EasyMockSupport {
final String scope = "scope";
final String entity = "entity";
final String operation = "put";
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "");
final Sensor sensor1 = streamsMetrics.addSensor(sensorName, Sensor.RecordingLevel.DEBUG);
streamsMetrics.removeSensor(sensor1);
@ -116,9 +118,9 @@ public class StreamsMetricsImplTest extends EasyMockSupport {
}
@Test
public void testMutiLevelSensorRemoval() {
public void testMultiLevelSensorRemoval() {
final Metrics registry = new Metrics();
final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, "");
final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, THREAD_NAME, VERSION);
for (final MetricName defaultMetric : registry.metrics().keySet()) {
registry.removeMetric(defaultMetric);
}
@ -169,7 +171,6 @@ public class StreamsMetricsImplTest extends EasyMockSupport {
@Test
public void testLatencyMetrics() {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "");
final int defaultMetrics = streamsMetrics.metrics().size();
final String scope = "scope";
@ -189,7 +190,6 @@ public class StreamsMetricsImplTest extends EasyMockSupport {
@Test
public void testThroughputMetrics() {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "");
final int defaultMetrics = streamsMetrics.metrics().size();
final String scope = "scope";
@ -211,7 +211,7 @@ public class StreamsMetricsImplTest extends EasyMockSupport {
final MockTime time = new MockTime(1);
final MetricConfig config = new MetricConfig().timeWindow(1, TimeUnit.MILLISECONDS);
final Metrics metrics = new Metrics(config, time);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "");
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "", VERSION);
final String scope = "scope";
final String entity = "entity";
@ -245,8 +245,6 @@ public class StreamsMetricsImplTest extends EasyMockSupport {
@Test
public void shouldGetStoreLevelTagMap() {
final String threadName = "test-thread";
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName);
final String taskName = "test-task";
final String storeType = "remote-window";
final String storeName = "window-keeper";
@ -254,7 +252,7 @@ public class StreamsMetricsImplTest extends EasyMockSupport {
final Map<String, String> tagMap = streamsMetrics.storeLevelTagMap(taskName, storeType, storeName);
assertThat(tagMap.size(), equalTo(3));
assertThat(tagMap.get(StreamsMetricsImpl.THREAD_ID_TAG), equalTo(threadName));
assertThat(tagMap.get(StreamsMetricsImpl.THREAD_ID_TAG), equalTo(THREAD_NAME));
assertThat(tagMap.get(StreamsMetricsImpl.TASK_ID_TAG), equalTo(taskName));
assertThat(tagMap.get(storeType + "-" + StreamsMetricsImpl.STORE_ID_TAG), equalTo(storeName));
}
@ -327,6 +325,22 @@ public class StreamsMetricsImplTest extends EasyMockSupport {
assertThat(metrics.metrics().size(), equalTo(2 + 1)); // one metric is added automatically in the constructor of Metrics
}
@Test
public void shouldReturnMetricsVersionCurrent() {
assertThat(
new StreamsMetricsImpl(metrics, THREAD_NAME, StreamsConfig.METRICS_LATEST).version(),
equalTo(Version.LATEST)
);
}
@Test
public void shouldReturnMetricsVersionFrom100To23() {
assertThat(
new StreamsMetricsImpl(metrics, THREAD_NAME, StreamsConfig.METRICS_0100_TO_23).version(),
equalTo(Version.FROM_100_TO_23)
);
}
private void verifyMetric(final String name,
final String description,
final double valueToRecord1,

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
@ -90,7 +91,9 @@ public class GlobalStateStoreProviderTest {
final ProcessorContextImpl mockContext = mock(ProcessorContextImpl.class);
expect(mockContext.applicationId()).andReturn("appId").anyTimes();
expect(mockContext.metrics()).andReturn(new StreamsMetricsImpl(new Metrics(), "threadName")).anyTimes();
expect(mockContext.metrics())
.andReturn(new StreamsMetricsImpl(new Metrics(), "threadName", StreamsConfig.METRICS_LATEST))
.anyTimes();
expect(mockContext.taskId()).andReturn(new TaskId(0, 0)).anyTimes();
expect(mockContext.recordCollector()).andReturn(null).anyTimes();
replay(mockContext);

View File

@ -59,7 +59,8 @@ public class MeteredTimestampedWindowStoreTest {
@Before
public void setUp() {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST);
context = new InternalMockProcessorContext(
TestUtils.tempDirectory(),

View File

@ -75,7 +75,8 @@ public class MeteredWindowStoreTest {
@Before
public void setUp() {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST);
context = new InternalMockProcessorContext(
TestUtils.tempDirectory(),

View File

@ -69,7 +69,7 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
this(null,
null,
null,
new StreamsMetricsImpl(new Metrics(), "mock"),
new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST),
new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
null,
null
@ -78,14 +78,30 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
public InternalMockProcessorContext(final File stateDir,
final StreamsConfig config) {
this(stateDir, null, null, new StreamsMetricsImpl(new Metrics(), "mock"), config, null, null);
this(
stateDir,
null,
null,
new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST),
config,
null,
null
);
}
public InternalMockProcessorContext(final File stateDir,
final Serde<?> keySerde,
final Serde<?> valSerde,
final StreamsConfig config) {
this(stateDir, keySerde, valSerde, new StreamsMetricsImpl(new Metrics(), "mock"), config, null, null);
this(
stateDir,
keySerde,
valSerde,
new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST),
config,
null,
null
);
}
public InternalMockProcessorContext(final StateSerdes<?, ?> serdes,
@ -100,7 +116,7 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
null,
serdes.keySerde(),
serdes.valueSerde(),
new StreamsMetricsImpl(metrics, "mock"),
new StreamsMetricsImpl(metrics, "mock", StreamsConfig.METRICS_LATEST),
new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
() -> collector,
null
@ -115,7 +131,7 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
this(stateDir,
keySerde,
valSerde,
new StreamsMetricsImpl(new Metrics(), "mock"),
new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST),
new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
() -> collector,
cache

View File

@ -280,7 +280,11 @@ public class TopologyTestDriver implements Closeable {
metrics = new Metrics(metricConfig, mockWallClockTime);
final String threadName = "topology-test-driver-virtual-thread";
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
metrics,
threadName,
StreamsConfig.METRICS_LATEST
);
final Sensor skippedRecordsSensor = streamsMetrics.threadLevelSensor("skipped-records", Sensor.RecordingLevel.INFO);
final String threadLevelGroup = "stream-metrics";
skippedRecordsSensor.add(new MetricName("skipped-records-rate",

View File

@ -214,7 +214,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
final MetricConfig metricConfig = new MetricConfig();
metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG);
final String threadName = "mock-processor-context-virtual-thread";
this.metrics = new StreamsMetricsImpl(new Metrics(metricConfig), threadName);
this.metrics = new StreamsMetricsImpl(new Metrics(metricConfig), threadName, StreamsConfig.METRICS_LATEST);
ThreadMetrics.skipRecordSensor(metrics);
}