mirror of https://github.com/apache/kafka.git
KAFKA-17488: Cleanup (test) code for Kafka Streams "metric version" (#17182)
This PR simply StreamsMetricsImpl to avoid passing in the unused "metric version" parameter. Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
parent
6cb70a831c
commit
27a3c75216
|
@ -1002,7 +1002,6 @@ public class KafkaStreams implements AutoCloseable {
|
||||||
streamsMetrics = new StreamsMetricsImpl(
|
streamsMetrics = new StreamsMetricsImpl(
|
||||||
metrics,
|
metrics,
|
||||||
clientId,
|
clientId,
|
||||||
applicationConfigs.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
|
|
||||||
time
|
time
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -162,10 +162,8 @@ public class StreamsMetricsImpl implements StreamsMetrics {
|
||||||
|
|
||||||
public StreamsMetricsImpl(final Metrics metrics,
|
public StreamsMetricsImpl(final Metrics metrics,
|
||||||
final String clientId,
|
final String clientId,
|
||||||
final String builtInMetricsVersion,
|
|
||||||
final Time time) {
|
final Time time) {
|
||||||
Objects.requireNonNull(metrics, "Metrics cannot be null");
|
Objects.requireNonNull(metrics, "Metrics cannot be null");
|
||||||
Objects.requireNonNull(builtInMetricsVersion, "Built-in metrics version cannot be null");
|
|
||||||
this.metrics = metrics;
|
this.metrics = metrics;
|
||||||
this.clientId = clientId;
|
this.clientId = clientId;
|
||||||
version = Version.LATEST;
|
version = Version.LATEST;
|
||||||
|
|
|
@ -80,7 +80,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
|
||||||
|
|
||||||
private final MockTime time = new MockTime();
|
private final MockTime time = new MockTime();
|
||||||
private final Metrics metrics = new Metrics();
|
private final Metrics metrics = new Metrics();
|
||||||
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, time);
|
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", time);
|
||||||
private final String threadId = Thread.currentThread().getName();
|
private final String threadId = Thread.currentThread().getName();
|
||||||
private final Initializer<Long> initializer = () -> 0L;
|
private final Initializer<Long> initializer = () -> 0L;
|
||||||
private final Aggregator<String, String, Long> aggregator = (aggKey, value, aggregate) -> aggregate + 1;
|
private final Aggregator<String, String, Long> aggregator = (aggKey, value, aggregate) -> aggregate + 1;
|
||||||
|
|
|
@ -72,7 +72,7 @@ public class ActiveTaskCreatorTest {
|
||||||
private ChangelogReader changeLogReader;
|
private ChangelogReader changeLogReader;
|
||||||
|
|
||||||
private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
|
private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
|
||||||
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", StreamsConfig.METRICS_LATEST, new MockTime());
|
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", new MockTime());
|
||||||
private final Map<String, Object> properties = mkMap(
|
private final Map<String, Object> properties = mkMap(
|
||||||
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
|
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
|
||||||
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
|
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
|
||||||
|
|
|
@ -134,7 +134,7 @@ public class GlobalStreamThreadTest {
|
||||||
mockConsumer,
|
mockConsumer,
|
||||||
new StateDirectory(config, time, true, false),
|
new StateDirectory(config, time, true, false),
|
||||||
0,
|
0,
|
||||||
new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, time),
|
new StreamsMetricsImpl(new Metrics(), "test-client", time),
|
||||||
time,
|
time,
|
||||||
"clientId",
|
"clientId",
|
||||||
stateRestoreListener,
|
stateRestoreListener,
|
||||||
|
@ -173,7 +173,7 @@ public class GlobalStreamThreadTest {
|
||||||
mockConsumer,
|
mockConsumer,
|
||||||
new StateDirectory(config, time, true, false),
|
new StateDirectory(config, time, true, false),
|
||||||
0,
|
0,
|
||||||
new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, time),
|
new StreamsMetricsImpl(new Metrics(), "test-client", time),
|
||||||
time,
|
time,
|
||||||
"clientId",
|
"clientId",
|
||||||
stateRestoreListener,
|
stateRestoreListener,
|
||||||
|
|
|
@ -18,12 +18,11 @@ package org.apache.kafka.streams.processor.internals;
|
||||||
|
|
||||||
import org.apache.kafka.common.metrics.Metrics;
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
import org.apache.kafka.streams.StreamsConfig;
|
|
||||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||||
|
|
||||||
public class MockStreamsMetrics extends StreamsMetricsImpl {
|
public class MockStreamsMetrics extends StreamsMetricsImpl {
|
||||||
|
|
||||||
public MockStreamsMetrics(final Metrics metrics) {
|
public MockStreamsMetrics(final Metrics metrics) {
|
||||||
super(metrics, "test", StreamsConfig.METRICS_LATEST, new MockTime());
|
super(metrics, "test", new MockTime());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -215,7 +215,7 @@ public class ProcessorNodeTest {
|
||||||
public void testMetricsWithBuiltInMetricsVersionLatest() {
|
public void testMetricsWithBuiltInMetricsVersionLatest() {
|
||||||
final Metrics metrics = new Metrics();
|
final Metrics metrics = new Metrics();
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime());
|
new StreamsMetricsImpl(metrics, "test-client", new MockTime());
|
||||||
final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
|
final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
|
||||||
final ProcessorNode<Object, Object, Object, Object> node =
|
final ProcessorNode<Object, Object, Object, Object> node =
|
||||||
new ProcessorNode<>(NAME, new NoOpProcessor(), Collections.emptySet());
|
new ProcessorNode<>(NAME, new NoOpProcessor(), Collections.emptySet());
|
||||||
|
@ -299,7 +299,7 @@ public class ProcessorNodeTest {
|
||||||
public void testTopologyLevelClassCastExceptionDirect() {
|
public void testTopologyLevelClassCastExceptionDirect() {
|
||||||
final Metrics metrics = new Metrics();
|
final Metrics metrics = new Metrics();
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime());
|
new StreamsMetricsImpl(metrics, "test-client", new MockTime());
|
||||||
final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
|
final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
|
||||||
final ProcessorNode<Object, Object, Object, Object> node =
|
final ProcessorNode<Object, Object, Object, Object> node =
|
||||||
new ProcessorNode<>("pname", new ClassCastProcessor(), Collections.emptySet());
|
new ProcessorNode<>("pname", new ClassCastProcessor(), Collections.emptySet());
|
||||||
|
@ -319,7 +319,7 @@ public class ProcessorNodeTest {
|
||||||
final InternalProcessorContext<Object, Object> internalProcessorContext = mock(InternalProcessorContext.class, withSettings().strictness(Strictness.LENIENT));
|
final InternalProcessorContext<Object, Object> internalProcessorContext = mock(InternalProcessorContext.class, withSettings().strictness(Strictness.LENIENT));
|
||||||
|
|
||||||
when(internalProcessorContext.taskId()).thenReturn(TASK_ID);
|
when(internalProcessorContext.taskId()).thenReturn(TASK_ID);
|
||||||
when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, new MockTime()));
|
when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()));
|
||||||
when(internalProcessorContext.topic()).thenReturn(TOPIC);
|
when(internalProcessorContext.topic()).thenReturn(TOPIC);
|
||||||
when(internalProcessorContext.partition()).thenReturn(PARTITION);
|
when(internalProcessorContext.partition()).thenReturn(PARTITION);
|
||||||
when(internalProcessorContext.offset()).thenReturn(OFFSET);
|
when(internalProcessorContext.offset()).thenReturn(OFFSET);
|
||||||
|
|
|
@ -32,7 +32,6 @@ import org.apache.kafka.common.serialization.Serializer;
|
||||||
import org.apache.kafka.common.utils.Bytes;
|
import org.apache.kafka.common.utils.Bytes;
|
||||||
import org.apache.kafka.common.utils.LogContext;
|
import org.apache.kafka.common.utils.LogContext;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
import org.apache.kafka.streams.StreamsConfig;
|
|
||||||
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
|
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
|
||||||
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
|
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
|
||||||
import org.apache.kafka.streams.errors.StreamsException;
|
import org.apache.kafka.streams.errors.StreamsException;
|
||||||
|
@ -73,7 +72,7 @@ public class RecordQueueTest {
|
||||||
|
|
||||||
private final Metrics metrics = new Metrics();
|
private final Metrics metrics = new Metrics();
|
||||||
private final StreamsMetricsImpl streamsMetrics =
|
private final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, "mock", StreamsConfig.METRICS_LATEST, new MockTime());
|
new StreamsMetricsImpl(metrics, "mock", new MockTime());
|
||||||
|
|
||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
final InternalMockProcessorContext context = new InternalMockProcessorContext<>(
|
final InternalMockProcessorContext context = new InternalMockProcessorContext<>(
|
||||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.kafka.common.metrics.Sensor;
|
||||||
import org.apache.kafka.common.metrics.SensorAccessor;
|
import org.apache.kafka.common.metrics.SensorAccessor;
|
||||||
import org.apache.kafka.common.serialization.Deserializer;
|
import org.apache.kafka.common.serialization.Deserializer;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
import org.apache.kafka.streams.StreamsConfig;
|
|
||||||
import org.apache.kafka.streams.errors.StreamsException;
|
import org.apache.kafka.streams.errors.StreamsException;
|
||||||
import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
|
import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
|
||||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||||
|
@ -98,7 +97,7 @@ public class SourceNodeTest {
|
||||||
public void shouldExposeProcessMetrics() {
|
public void shouldExposeProcessMetrics() {
|
||||||
final Metrics metrics = new Metrics();
|
final Metrics metrics = new Metrics();
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime());
|
new StreamsMetricsImpl(metrics, "test-client", new MockTime());
|
||||||
final InternalMockProcessorContext<String, String> context = new InternalMockProcessorContext<>(streamsMetrics);
|
final InternalMockProcessorContext<String, String> context = new InternalMockProcessorContext<>(streamsMetrics);
|
||||||
final SourceNode<String, String> node =
|
final SourceNode<String, String> node =
|
||||||
new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer());
|
new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer());
|
||||||
|
|
|
@ -114,7 +114,7 @@ public class StandbyTaskTest {
|
||||||
|
|
||||||
private final MockTime time = new MockTime();
|
private final MockTime time = new MockTime();
|
||||||
private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), time);
|
private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), time);
|
||||||
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName, StreamsConfig.METRICS_LATEST, time);
|
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName, time);
|
||||||
|
|
||||||
private File baseDir;
|
private File baseDir;
|
||||||
private StreamsConfig config;
|
private StreamsConfig config;
|
||||||
|
|
|
@ -53,7 +53,6 @@ import org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler;
|
||||||
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
|
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
|
||||||
import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler;
|
import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler;
|
||||||
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
|
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
|
||||||
import org.apache.kafka.streams.errors.ProcessingExceptionHandler.ProcessingHandlerResponse;
|
|
||||||
import org.apache.kafka.streams.errors.ProcessorStateException;
|
import org.apache.kafka.streams.errors.ProcessorStateException;
|
||||||
import org.apache.kafka.streams.errors.StreamsException;
|
import org.apache.kafka.streams.errors.StreamsException;
|
||||||
import org.apache.kafka.streams.errors.TaskCorruptedException;
|
import org.apache.kafka.streams.errors.TaskCorruptedException;
|
||||||
|
@ -609,7 +608,7 @@ public class StreamTaskTest {
|
||||||
public void shouldProcessRecordsAfterPrepareCommitWhenEosDisabled() {
|
public void shouldProcessRecordsAfterPrepareCommitWhenEosDisabled() {
|
||||||
when(stateManager.taskId()).thenReturn(taskId);
|
when(stateManager.taskId()).thenReturn(taskId);
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
||||||
task = createSingleSourceStateless(createConfig(), StreamsConfig.METRICS_LATEST);
|
task = createSingleSourceStateless(createConfig());
|
||||||
|
|
||||||
assertFalse(task.process(time.milliseconds()));
|
assertFalse(task.process(time.milliseconds()));
|
||||||
|
|
||||||
|
@ -632,7 +631,7 @@ public class StreamTaskTest {
|
||||||
public void shouldNotProcessRecordsAfterPrepareCommitWhenEosV2Enabled() {
|
public void shouldNotProcessRecordsAfterPrepareCommitWhenEosV2Enabled() {
|
||||||
when(stateManager.taskId()).thenReturn(taskId);
|
when(stateManager.taskId()).thenReturn(taskId);
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
||||||
task = createSingleSourceStateless(createConfig(StreamsConfig.EXACTLY_ONCE_V2, "0"), StreamsConfig.METRICS_LATEST);
|
task = createSingleSourceStateless(createConfig(StreamsConfig.EXACTLY_ONCE_V2, "0"));
|
||||||
|
|
||||||
assertFalse(task.process(time.milliseconds()));
|
assertFalse(task.process(time.milliseconds()));
|
||||||
|
|
||||||
|
@ -656,7 +655,7 @@ public class StreamTaskTest {
|
||||||
public void shouldRecordBufferedRecords() {
|
public void shouldRecordBufferedRecords() {
|
||||||
when(stateManager.taskId()).thenReturn(taskId);
|
when(stateManager.taskId()).thenReturn(taskId);
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
||||||
task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), StreamsConfig.METRICS_LATEST);
|
task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"));
|
||||||
|
|
||||||
final KafkaMetric metric = getMetric("active-buffer", "%s-count", task.id().toString());
|
final KafkaMetric metric = getMetric("active-buffer", "%s-count", task.id().toString());
|
||||||
|
|
||||||
|
@ -734,13 +733,13 @@ public class StreamTaskTest {
|
||||||
final String sourceNodeName = evenKeyForwardingSourceNode.name();
|
final String sourceNodeName = evenKeyForwardingSourceNode.name();
|
||||||
final String terminalNodeName = processorStreamTime.name();
|
final String terminalNodeName = processorStreamTime.name();
|
||||||
|
|
||||||
final Metric sourceAvg = getProcessorMetric("record-e2e-latency", "%s-avg", task.id().toString(), sourceNodeName, StreamsConfig.METRICS_LATEST);
|
final Metric sourceAvg = getProcessorMetric("record-e2e-latency", "%s-avg", task.id().toString(), sourceNodeName);
|
||||||
final Metric sourceMin = getProcessorMetric("record-e2e-latency", "%s-min", task.id().toString(), sourceNodeName, StreamsConfig.METRICS_LATEST);
|
final Metric sourceMin = getProcessorMetric("record-e2e-latency", "%s-min", task.id().toString(), sourceNodeName);
|
||||||
final Metric sourceMax = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), sourceNodeName, StreamsConfig.METRICS_LATEST);
|
final Metric sourceMax = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), sourceNodeName);
|
||||||
|
|
||||||
final Metric terminalAvg = getProcessorMetric("record-e2e-latency", "%s-avg", task.id().toString(), terminalNodeName, StreamsConfig.METRICS_LATEST);
|
final Metric terminalAvg = getProcessorMetric("record-e2e-latency", "%s-avg", task.id().toString(), terminalNodeName);
|
||||||
final Metric terminalMin = getProcessorMetric("record-e2e-latency", "%s-min", task.id().toString(), terminalNodeName, StreamsConfig.METRICS_LATEST);
|
final Metric terminalMin = getProcessorMetric("record-e2e-latency", "%s-min", task.id().toString(), terminalNodeName);
|
||||||
final Metric terminalMax = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), terminalNodeName, StreamsConfig.METRICS_LATEST);
|
final Metric terminalMax = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), terminalNodeName);
|
||||||
|
|
||||||
// e2e latency = 10
|
// e2e latency = 10
|
||||||
task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(0, 0L)));
|
task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(0, 0L)));
|
||||||
|
@ -802,7 +801,7 @@ public class StreamTaskTest {
|
||||||
public void shouldRecordRestoredRecords() {
|
public void shouldRecordRestoredRecords() {
|
||||||
when(stateManager.taskId()).thenReturn(taskId);
|
when(stateManager.taskId()).thenReturn(taskId);
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
||||||
task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), StreamsConfig.METRICS_LATEST);
|
task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"));
|
||||||
|
|
||||||
final KafkaMetric totalMetric = getMetric("restore", "%s-total", task.id().toString());
|
final KafkaMetric totalMetric = getMetric("restore", "%s-total", task.id().toString());
|
||||||
final KafkaMetric rateMetric = getMetric("restore", "%s-rate", task.id().toString());
|
final KafkaMetric rateMetric = getMetric("restore", "%s-rate", task.id().toString());
|
||||||
|
@ -927,7 +926,6 @@ public class StreamTaskTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testMetricsForBuiltInMetricsVersionLatest() {
|
private void testMetricsForBuiltInMetricsVersionLatest() {
|
||||||
final String builtInMetricsVersion = StreamsConfig.METRICS_LATEST;
|
|
||||||
assertNull(getMetric("commit", "%s-latency-avg", "all"));
|
assertNull(getMetric("commit", "%s-latency-avg", "all"));
|
||||||
assertNull(getMetric("commit", "%s-latency-max", "all"));
|
assertNull(getMetric("commit", "%s-latency-max", "all"));
|
||||||
assertNull(getMetric("commit", "%s-rate", "all"));
|
assertNull(getMetric("commit", "%s-rate", "all"));
|
||||||
|
@ -960,8 +958,7 @@ public class StreamTaskTest {
|
||||||
private Metric getProcessorMetric(final String operation,
|
private Metric getProcessorMetric(final String operation,
|
||||||
final String nameFormat,
|
final String nameFormat,
|
||||||
final String taskId,
|
final String taskId,
|
||||||
final String processorNodeId,
|
final String processorNodeId) {
|
||||||
final String builtInMetricsVersion) {
|
|
||||||
|
|
||||||
return getMetricByNameFilterByTags(
|
return getMetricByNameFilterByTags(
|
||||||
metrics.metrics(),
|
metrics.metrics(),
|
||||||
|
@ -1213,7 +1210,7 @@ public class StreamTaskTest {
|
||||||
public void shouldRespectCommitNeeded() {
|
public void shouldRespectCommitNeeded() {
|
||||||
when(stateManager.taskId()).thenReturn(taskId);
|
when(stateManager.taskId()).thenReturn(taskId);
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
||||||
task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), StreamsConfig.METRICS_LATEST);
|
task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"));
|
||||||
task.initializeIfNeeded();
|
task.initializeIfNeeded();
|
||||||
task.completeRestoration(noOpResetter -> { });
|
task.completeRestoration(noOpResetter -> { });
|
||||||
|
|
||||||
|
@ -1255,7 +1252,7 @@ public class StreamTaskTest {
|
||||||
public void shouldCommitNextOffsetAndProcessorMetadataFromQueueIfAvailable() {
|
public void shouldCommitNextOffsetAndProcessorMetadataFromQueueIfAvailable() {
|
||||||
when(stateManager.taskId()).thenReturn(taskId);
|
when(stateManager.taskId()).thenReturn(taskId);
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
||||||
task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), StreamsConfig.METRICS_LATEST);
|
task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"));
|
||||||
task.initializeIfNeeded();
|
task.initializeIfNeeded();
|
||||||
task.completeRestoration(noOpResetter -> { });
|
task.completeRestoration(noOpResetter -> { });
|
||||||
|
|
||||||
|
@ -2311,7 +2308,7 @@ public class StreamTaskTest {
|
||||||
public void shouldClearCommitStatusesInCloseDirty() {
|
public void shouldClearCommitStatusesInCloseDirty() {
|
||||||
when(stateManager.taskId()).thenReturn(taskId);
|
when(stateManager.taskId()).thenReturn(taskId);
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
||||||
task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), StreamsConfig.METRICS_LATEST);
|
task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"));
|
||||||
task.initializeIfNeeded();
|
task.initializeIfNeeded();
|
||||||
task.completeRestoration(noOpResetter -> { });
|
task.completeRestoration(noOpResetter -> { });
|
||||||
|
|
||||||
|
@ -2361,7 +2358,7 @@ public class StreamTaskTest {
|
||||||
public void shouldThrowIfCleanClosingDirtyTask() {
|
public void shouldThrowIfCleanClosingDirtyTask() {
|
||||||
when(stateManager.taskId()).thenReturn(taskId);
|
when(stateManager.taskId()).thenReturn(taskId);
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
||||||
task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), StreamsConfig.METRICS_LATEST);
|
task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"));
|
||||||
task.initializeIfNeeded();
|
task.initializeIfNeeded();
|
||||||
task.completeRestoration(noOpResetter -> { });
|
task.completeRestoration(noOpResetter -> { });
|
||||||
|
|
||||||
|
@ -2452,7 +2449,7 @@ public class StreamTaskTest {
|
||||||
streamsMetrics,
|
streamsMetrics,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
final StreamsMetricsImpl metrics = new StreamsMetricsImpl(this.metrics, "test", StreamsConfig.METRICS_LATEST, time);
|
final StreamsMetricsImpl metrics = new StreamsMetricsImpl(this.metrics, "test", time);
|
||||||
|
|
||||||
// The processor topology is missing the topics
|
// The processor topology is missing the topics
|
||||||
final ProcessorTopology topology = withSources(emptyList(), mkMap());
|
final ProcessorTopology topology = withSources(emptyList(), mkMap());
|
||||||
|
@ -2981,8 +2978,7 @@ public class StreamTaskTest {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private StreamTask createSingleSourceStateless(final StreamsConfig config,
|
private StreamTask createSingleSourceStateless(final StreamsConfig config) {
|
||||||
final String builtInMetricsVersion) {
|
|
||||||
final ProcessorTopology topology = withSources(
|
final ProcessorTopology topology = withSources(
|
||||||
asList(source1, processorStreamTime, processorSystemTime),
|
asList(source1, processorStreamTime, processorSystemTime),
|
||||||
mkMap(mkEntry(topic1, source1))
|
mkMap(mkEntry(topic1, source1))
|
||||||
|
@ -3005,7 +3001,7 @@ public class StreamTaskTest {
|
||||||
topology,
|
topology,
|
||||||
consumer,
|
consumer,
|
||||||
new TopologyConfig(null, config, new Properties()).getTaskConfig(),
|
new TopologyConfig(null, config, new Properties()).getTaskConfig(),
|
||||||
new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion, time),
|
new StreamsMetricsImpl(metrics, "test", time),
|
||||||
stateDirectory,
|
stateDirectory,
|
||||||
cache,
|
cache,
|
||||||
time,
|
time,
|
||||||
|
@ -3042,7 +3038,7 @@ public class StreamTaskTest {
|
||||||
topology,
|
topology,
|
||||||
consumer,
|
consumer,
|
||||||
new TopologyConfig(null, config, new Properties()).getTaskConfig(),
|
new TopologyConfig(null, config, new Properties()).getTaskConfig(),
|
||||||
new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, time),
|
new StreamsMetricsImpl(metrics, "test", time),
|
||||||
stateDirectory,
|
stateDirectory,
|
||||||
cache,
|
cache,
|
||||||
time,
|
time,
|
||||||
|
@ -3078,7 +3074,7 @@ public class StreamTaskTest {
|
||||||
topology,
|
topology,
|
||||||
consumer,
|
consumer,
|
||||||
new TopologyConfig(null, config, new Properties()).getTaskConfig(),
|
new TopologyConfig(null, config, new Properties()).getTaskConfig(),
|
||||||
new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, time),
|
new StreamsMetricsImpl(metrics, "test", time),
|
||||||
stateDirectory,
|
stateDirectory,
|
||||||
cache,
|
cache,
|
||||||
time,
|
time,
|
||||||
|
|
|
@ -307,7 +307,6 @@ public class StreamThreadTest {
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
|
||||||
metrics,
|
metrics,
|
||||||
APPLICATION_ID,
|
APPLICATION_ID,
|
||||||
config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
|
|
||||||
time
|
time
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -715,7 +714,6 @@ public class StreamThreadTest {
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
|
||||||
metrics,
|
metrics,
|
||||||
APPLICATION_ID,
|
APPLICATION_ID,
|
||||||
config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
|
|
||||||
mockTime
|
mockTime
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -780,7 +778,6 @@ public class StreamThreadTest {
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
|
||||||
metrics,
|
metrics,
|
||||||
APPLICATION_ID,
|
APPLICATION_ID,
|
||||||
config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
|
|
||||||
mockTime
|
mockTime
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -1145,7 +1142,7 @@ public class StreamThreadTest {
|
||||||
|
|
||||||
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
|
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
|
new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
|
||||||
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
||||||
topologyMetadata.buildAndRewriteTopology();
|
topologyMetadata.buildAndRewriteTopology();
|
||||||
|
|
||||||
|
@ -1368,7 +1365,7 @@ public class StreamThreadTest {
|
||||||
|
|
||||||
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
|
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
|
new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
|
||||||
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
||||||
topologyMetadata.buildAndRewriteTopology();
|
topologyMetadata.buildAndRewriteTopology();
|
||||||
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
|
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
|
||||||
|
@ -1422,7 +1419,7 @@ public class StreamThreadTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
|
new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
|
||||||
|
|
||||||
final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
|
final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
|
||||||
final StreamsConfig config = new StreamsConfig(props);
|
final StreamsConfig config = new StreamsConfig(props);
|
||||||
|
@ -1468,7 +1465,7 @@ public class StreamThreadTest {
|
||||||
|
|
||||||
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
|
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
|
new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
|
||||||
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
||||||
topologyMetadata.buildAndRewriteTopology();
|
topologyMetadata.buildAndRewriteTopology();
|
||||||
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
|
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
|
||||||
|
@ -1488,7 +1485,7 @@ public class StreamThreadTest {
|
||||||
|
|
||||||
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
|
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
|
new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
|
||||||
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
||||||
topologyMetadata.buildAndRewriteTopology();
|
topologyMetadata.buildAndRewriteTopology();
|
||||||
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
|
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
|
||||||
|
@ -1889,7 +1886,6 @@ public class StreamThreadTest {
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
|
||||||
metrics,
|
metrics,
|
||||||
APPLICATION_ID,
|
APPLICATION_ID,
|
||||||
config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
|
|
||||||
mockTime
|
mockTime
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -2592,7 +2588,7 @@ public class StreamThreadTest {
|
||||||
doThrow(new TaskMigratedException("Task lost exception", new RuntimeException())).when(taskManager).handleLostAll();
|
doThrow(new TaskMigratedException("Task lost exception", new RuntimeException())).when(taskManager).handleLostAll();
|
||||||
|
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
|
new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
|
||||||
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
||||||
topologyMetadata.buildAndRewriteTopology();
|
topologyMetadata.buildAndRewriteTopology();
|
||||||
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
|
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
|
||||||
|
@ -2622,7 +2618,7 @@ public class StreamThreadTest {
|
||||||
doThrow(new TaskMigratedException("Revocation non fatal exception", new RuntimeException())).when(taskManager).handleRevocation(assignedPartitions);
|
doThrow(new TaskMigratedException("Revocation non fatal exception", new RuntimeException())).when(taskManager).handleRevocation(assignedPartitions);
|
||||||
|
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
|
new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
|
||||||
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
||||||
topologyMetadata.buildAndRewriteTopology();
|
topologyMetadata.buildAndRewriteTopology();
|
||||||
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
|
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
|
||||||
|
@ -2654,7 +2650,7 @@ public class StreamThreadTest {
|
||||||
when(taskManager.handleCorruption(corruptedTasks)).thenReturn(true);
|
when(taskManager.handleCorruption(corruptedTasks)).thenReturn(true);
|
||||||
|
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
|
new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
|
||||||
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
||||||
topologyMetadata.buildAndRewriteTopology();
|
topologyMetadata.buildAndRewriteTopology();
|
||||||
thread = new StreamThread(
|
thread = new StreamThread(
|
||||||
|
@ -2712,7 +2708,7 @@ public class StreamThreadTest {
|
||||||
doThrow(new TimeoutException()).when(taskManager).handleCorruption(corruptedTasks);
|
doThrow(new TimeoutException()).when(taskManager).handleCorruption(corruptedTasks);
|
||||||
|
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
|
new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
|
||||||
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
||||||
topologyMetadata.buildAndRewriteTopology();
|
topologyMetadata.buildAndRewriteTopology();
|
||||||
thread = new StreamThread(
|
thread = new StreamThread(
|
||||||
|
@ -2779,7 +2775,7 @@ public class StreamThreadTest {
|
||||||
doNothing().when(taskManager).handleLostAll();
|
doNothing().when(taskManager).handleLostAll();
|
||||||
|
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
|
new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
|
||||||
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
||||||
topologyMetadata.buildAndRewriteTopology();
|
topologyMetadata.buildAndRewriteTopology();
|
||||||
thread = new StreamThread(
|
thread = new StreamThread(
|
||||||
|
@ -2842,7 +2838,7 @@ public class StreamThreadTest {
|
||||||
doNothing().when(consumer).enforceRebalance("Active tasks corrupted");
|
doNothing().when(consumer).enforceRebalance("Active tasks corrupted");
|
||||||
|
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
|
new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
|
||||||
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
||||||
topologyMetadata.buildAndRewriteTopology();
|
topologyMetadata.buildAndRewriteTopology();
|
||||||
thread = new StreamThread(
|
thread = new StreamThread(
|
||||||
|
@ -2902,7 +2898,7 @@ public class StreamThreadTest {
|
||||||
when(taskManager.handleCorruption(corruptedTasks)).thenReturn(false);
|
when(taskManager.handleCorruption(corruptedTasks)).thenReturn(false);
|
||||||
|
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
|
new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
|
||||||
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
||||||
topologyMetadata.buildAndRewriteTopology();
|
topologyMetadata.buildAndRewriteTopology();
|
||||||
thread = new StreamThread(
|
thread = new StreamThread(
|
||||||
|
@ -3086,7 +3082,7 @@ public class StreamThreadTest {
|
||||||
when(taskManager.producerMetrics()).thenReturn(dummyProducerMetrics);
|
when(taskManager.producerMetrics()).thenReturn(dummyProducerMetrics);
|
||||||
|
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
|
new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
|
||||||
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
||||||
topologyMetadata.buildAndRewriteTopology();
|
topologyMetadata.buildAndRewriteTopology();
|
||||||
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata);
|
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata);
|
||||||
|
@ -3111,7 +3107,7 @@ public class StreamThreadTest {
|
||||||
final TaskManager taskManager = mock(TaskManager.class);
|
final TaskManager taskManager = mock(TaskManager.class);
|
||||||
|
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
|
new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
|
||||||
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
||||||
topologyMetadata.buildAndRewriteTopology();
|
topologyMetadata.buildAndRewriteTopology();
|
||||||
thread = new StreamThread(
|
thread = new StreamThread(
|
||||||
|
@ -3167,7 +3163,7 @@ public class StreamThreadTest {
|
||||||
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
|
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
|
||||||
final TaskManager taskManager = mock(TaskManager.class);
|
final TaskManager taskManager = mock(TaskManager.class);
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
|
new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
|
||||||
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
||||||
topologyMetadata.buildAndRewriteTopology();
|
topologyMetadata.buildAndRewriteTopology();
|
||||||
thread = new StreamThread(
|
thread = new StreamThread(
|
||||||
|
@ -3583,7 +3579,7 @@ public class StreamThreadTest {
|
||||||
"",
|
"",
|
||||||
taskManager,
|
taskManager,
|
||||||
null,
|
null,
|
||||||
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime),
|
new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime),
|
||||||
topologyMetadata,
|
topologyMetadata,
|
||||||
"thread-id",
|
"thread-id",
|
||||||
new LogContext(),
|
new LogContext(),
|
||||||
|
@ -3633,7 +3629,7 @@ public class StreamThreadTest {
|
||||||
final LogContext logContext = new LogContext("test");
|
final LogContext logContext = new LogContext("test");
|
||||||
final Logger log = logContext.logger(StreamThreadTest.class);
|
final Logger log = logContext.logger(StreamThreadTest.class);
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
|
new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
|
||||||
final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
|
final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
|
||||||
new TopologyMetadata(internalTopologyBuilder, config),
|
new TopologyMetadata(internalTopologyBuilder, config),
|
||||||
config,
|
config,
|
||||||
|
@ -3692,7 +3688,7 @@ public class StreamThreadTest {
|
||||||
final StreamsConfig config,
|
final StreamsConfig config,
|
||||||
final TopologyMetadata topologyMetadata) {
|
final TopologyMetadata topologyMetadata) {
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
|
new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
|
||||||
|
|
||||||
return new StreamThread(
|
return new StreamThread(
|
||||||
mockTime,
|
mockTime,
|
||||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
|
||||||
import org.apache.kafka.common.metrics.stats.Rate;
|
import org.apache.kafka.common.metrics.stats.Rate;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.streams.StreamsConfig;
|
|
||||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ImmutableMetricValue;
|
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ImmutableMetricValue;
|
||||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
|
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
|
||||||
import org.apache.kafka.test.StreamsTestUtils;
|
import org.apache.kafka.test.StreamsTestUtils;
|
||||||
|
@ -93,7 +92,6 @@ public class StreamsMetricsImplTest {
|
||||||
private static final String SENSOR_NAME_1 = "sensor1";
|
private static final String SENSOR_NAME_1 = "sensor1";
|
||||||
private static final String SENSOR_NAME_2 = "sensor2";
|
private static final String SENSOR_NAME_2 = "sensor2";
|
||||||
private static final String INTERNAL_PREFIX = "internal";
|
private static final String INTERNAL_PREFIX = "internal";
|
||||||
private static final String VERSION = StreamsConfig.METRICS_LATEST;
|
|
||||||
private static final String CLIENT_ID = "test-client";
|
private static final String CLIENT_ID = "test-client";
|
||||||
private static final String THREAD_ID1 = "test-thread-1";
|
private static final String THREAD_ID1 = "test-thread-1";
|
||||||
private static final String TASK_ID1 = "test-task-1";
|
private static final String TASK_ID1 = "test-task-1";
|
||||||
|
@ -139,7 +137,7 @@ public class StreamsMetricsImplTest {
|
||||||
private final MetricName metricName2 =
|
private final MetricName metricName2 =
|
||||||
new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION2, clientLevelTags);
|
new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION2, clientLevelTags);
|
||||||
private final MockTime time = new MockTime(0);
|
private final MockTime time = new MockTime(0);
|
||||||
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
|
|
||||||
private static MetricConfig eqMetricConfig(final MetricConfig metricConfig) {
|
private static MetricConfig eqMetricConfig(final MetricConfig metricConfig) {
|
||||||
final StringBuffer message = new StringBuffer();
|
final StringBuffer message = new StringBuffer();
|
||||||
|
@ -254,7 +252,7 @@ public class StreamsMetricsImplTest {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
||||||
setupGetNewSensorTest(metrics, recordingLevel);
|
setupGetNewSensorTest(metrics, recordingLevel);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
|
|
||||||
final Sensor actualSensor = streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel);
|
final Sensor actualSensor = streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel);
|
||||||
|
|
||||||
|
@ -266,7 +264,7 @@ public class StreamsMetricsImplTest {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
||||||
setupGetExistingSensorTest(metrics);
|
setupGetExistingSensorTest(metrics);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
|
|
||||||
final Sensor actualSensor = streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel);
|
final Sensor actualSensor = streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel);
|
||||||
|
|
||||||
|
@ -278,7 +276,7 @@ public class StreamsMetricsImplTest {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
||||||
setupGetNewSensorTest(metrics, recordingLevel);
|
setupGetNewSensorTest(metrics, recordingLevel);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
|
|
||||||
final Sensor actualSensor = streamsMetrics.taskLevelSensor(
|
final Sensor actualSensor = streamsMetrics.taskLevelSensor(
|
||||||
THREAD_ID1,
|
THREAD_ID1,
|
||||||
|
@ -295,7 +293,7 @@ public class StreamsMetricsImplTest {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
||||||
setupGetExistingSensorTest(metrics);
|
setupGetExistingSensorTest(metrics);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
|
|
||||||
final Sensor actualSensor = streamsMetrics.taskLevelSensor(
|
final Sensor actualSensor = streamsMetrics.taskLevelSensor(
|
||||||
THREAD_ID1,
|
THREAD_ID1,
|
||||||
|
@ -312,7 +310,7 @@ public class StreamsMetricsImplTest {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
||||||
setupGetNewSensorTest(metrics, recordingLevel);
|
setupGetNewSensorTest(metrics, recordingLevel);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
|
|
||||||
final Sensor actualSensor = streamsMetrics.topicLevelSensor(
|
final Sensor actualSensor = streamsMetrics.topicLevelSensor(
|
||||||
THREAD_ID1,
|
THREAD_ID1,
|
||||||
|
@ -331,7 +329,7 @@ public class StreamsMetricsImplTest {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
||||||
setupGetExistingSensorTest(metrics);
|
setupGetExistingSensorTest(metrics);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
|
|
||||||
final Sensor actualSensor = streamsMetrics.topicLevelSensor(
|
final Sensor actualSensor = streamsMetrics.topicLevelSensor(
|
||||||
THREAD_ID1,
|
THREAD_ID1,
|
||||||
|
@ -350,7 +348,7 @@ public class StreamsMetricsImplTest {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
||||||
final ArgumentCaptor<String> sensorKeys = setupGetNewSensorTest(metrics, recordingLevel);
|
final ArgumentCaptor<String> sensorKeys = setupGetNewSensorTest(metrics, recordingLevel);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
|
|
||||||
final Sensor actualSensor = streamsMetrics.storeLevelSensor(
|
final Sensor actualSensor = streamsMetrics.storeLevelSensor(
|
||||||
TASK_ID1,
|
TASK_ID1,
|
||||||
|
@ -368,7 +366,7 @@ public class StreamsMetricsImplTest {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
||||||
setupGetExistingSensorTest(metrics);
|
setupGetExistingSensorTest(metrics);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
|
|
||||||
final Sensor actualSensor = streamsMetrics.storeLevelSensor(
|
final Sensor actualSensor = streamsMetrics.storeLevelSensor(
|
||||||
TASK_ID1,
|
TASK_ID1,
|
||||||
|
@ -384,7 +382,7 @@ public class StreamsMetricsImplTest {
|
||||||
public void shouldUseSameStoreLevelSensorKeyWithTwoDifferentSensorNames() {
|
public void shouldUseSameStoreLevelSensorKeyWithTwoDifferentSensorNames() {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
|
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
|
|
||||||
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
|
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
|
||||||
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_2, INFO_RECORDING_LEVEL);
|
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_2, INFO_RECORDING_LEVEL);
|
||||||
|
@ -396,7 +394,7 @@ public class StreamsMetricsImplTest {
|
||||||
public void shouldNotUseSameStoreLevelSensorKeyWithDifferentTaskIds() {
|
public void shouldNotUseSameStoreLevelSensorKeyWithDifferentTaskIds() {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
|
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
|
|
||||||
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
|
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
|
||||||
streamsMetrics.storeLevelSensor(TASK_ID2, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
|
streamsMetrics.storeLevelSensor(TASK_ID2, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
|
||||||
|
@ -408,7 +406,7 @@ public class StreamsMetricsImplTest {
|
||||||
public void shouldNotUseSameStoreLevelSensorKeyWithDifferentStoreNames() {
|
public void shouldNotUseSameStoreLevelSensorKeyWithDifferentStoreNames() {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
|
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
|
|
||||||
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
|
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
|
||||||
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME2, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
|
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME2, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
|
||||||
|
@ -420,7 +418,7 @@ public class StreamsMetricsImplTest {
|
||||||
public void shouldNotUseSameStoreLevelSensorKeyWithDifferentThreadIds() throws InterruptedException {
|
public void shouldNotUseSameStoreLevelSensorKeyWithDifferentThreadIds() throws InterruptedException {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
|
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
|
|
||||||
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
|
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
|
||||||
final Thread otherThread =
|
final Thread otherThread =
|
||||||
|
@ -435,7 +433,7 @@ public class StreamsMetricsImplTest {
|
||||||
public void shouldUseSameStoreLevelSensorKeyWithSameSensorNames() {
|
public void shouldUseSameStoreLevelSensorKeyWithSameSensorNames() {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
|
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
|
|
||||||
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
|
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
|
||||||
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
|
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
|
||||||
|
@ -459,7 +457,7 @@ public class StreamsMetricsImplTest {
|
||||||
.thenReturn(metricName);
|
.thenReturn(metricName);
|
||||||
when(metrics.metric(metricName)).thenReturn(null);
|
when(metrics.metric(metricName)).thenReturn(null);
|
||||||
when(metrics.addMetricIfAbsent(eq(metricName), eqMetricConfig(metricConfig), eq(VALUE_PROVIDER))).thenReturn(null);
|
when(metrics.addMetricIfAbsent(eq(metricName), eqMetricConfig(metricConfig), eq(VALUE_PROVIDER))).thenReturn(null);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
|
|
||||||
streamsMetrics.addStoreLevelMutableMetric(
|
streamsMetrics.addStoreLevelMutableMetric(
|
||||||
TASK_ID1,
|
TASK_ID1,
|
||||||
|
@ -491,7 +489,7 @@ public class StreamsMetricsImplTest {
|
||||||
when(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP))
|
when(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP))
|
||||||
.thenReturn(metricName);
|
.thenReturn(metricName);
|
||||||
when(metrics.metric(metricName)).thenReturn(null);
|
when(metrics.metric(metricName)).thenReturn(null);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
|
|
||||||
streamsMetrics.addStoreLevelMutableMetric(
|
streamsMetrics.addStoreLevelMutableMetric(
|
||||||
TASK_ID1,
|
TASK_ID1,
|
||||||
|
@ -539,7 +537,7 @@ public class StreamsMetricsImplTest {
|
||||||
@Test
|
@Test
|
||||||
public void shouldRemoveStateStoreLevelSensors() {
|
public void shouldRemoveStateStoreLevelSensors() {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
final MetricName metricName1 =
|
final MetricName metricName1 =
|
||||||
new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP);
|
new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP);
|
||||||
final MetricName metricName2 =
|
final MetricName metricName2 =
|
||||||
|
@ -562,7 +560,7 @@ public class StreamsMetricsImplTest {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
||||||
setupGetNewSensorTest(metrics, recordingLevel);
|
setupGetNewSensorTest(metrics, recordingLevel);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
|
|
||||||
final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
|
final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
|
||||||
THREAD_ID1,
|
THREAD_ID1,
|
||||||
|
@ -580,7 +578,7 @@ public class StreamsMetricsImplTest {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
||||||
setupGetExistingSensorTest(metrics);
|
setupGetExistingSensorTest(metrics);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
|
|
||||||
final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
|
final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
|
||||||
THREAD_ID1,
|
THREAD_ID1,
|
||||||
|
@ -599,7 +597,7 @@ public class StreamsMetricsImplTest {
|
||||||
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
||||||
final String processorCacheName = "processorNodeName";
|
final String processorCacheName = "processorNodeName";
|
||||||
setupGetNewSensorTest(metrics, recordingLevel);
|
setupGetNewSensorTest(metrics, recordingLevel);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
|
|
||||||
final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
|
final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
|
||||||
THREAD_ID1,
|
THREAD_ID1,
|
||||||
|
@ -618,7 +616,7 @@ public class StreamsMetricsImplTest {
|
||||||
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
||||||
final String processorCacheName = "processorNodeName";
|
final String processorCacheName = "processorNodeName";
|
||||||
setupGetExistingSensorTest(metrics);
|
setupGetExistingSensorTest(metrics);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
|
|
||||||
final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
|
final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
|
||||||
THREAD_ID1, TASK_ID1,
|
THREAD_ID1, TASK_ID1,
|
||||||
|
@ -635,7 +633,7 @@ public class StreamsMetricsImplTest {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
||||||
setupGetNewSensorTest(metrics, recordingLevel);
|
setupGetNewSensorTest(metrics, recordingLevel);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
|
|
||||||
final Sensor actualSensor = streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel);
|
final Sensor actualSensor = streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel);
|
||||||
|
|
||||||
|
@ -647,7 +645,7 @@ public class StreamsMetricsImplTest {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
||||||
setupGetExistingSensorTest(metrics);
|
setupGetExistingSensorTest(metrics);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
|
|
||||||
final Sensor actualSensor = streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel);
|
final Sensor actualSensor = streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel);
|
||||||
|
|
||||||
|
@ -664,7 +662,7 @@ public class StreamsMetricsImplTest {
|
||||||
when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags))
|
when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags))
|
||||||
.thenReturn(metricName1);
|
.thenReturn(metricName1);
|
||||||
doNothing().when(metrics).addMetric(eq(metricName1), eqMetricConfig(metricConfig), eq(immutableValue));
|
doNothing().when(metrics).addMetric(eq(metricName1), eqMetricConfig(metricConfig), eq(immutableValue));
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
|
|
||||||
streamsMetrics.addClientLevelImmutableMetric(METRIC_NAME1, DESCRIPTION1, recordingLevel, value);
|
streamsMetrics.addClientLevelImmutableMetric(METRIC_NAME1, DESCRIPTION1, recordingLevel, value);
|
||||||
}
|
}
|
||||||
|
@ -678,7 +676,7 @@ public class StreamsMetricsImplTest {
|
||||||
when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags))
|
when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags))
|
||||||
.thenReturn(metricName1);
|
.thenReturn(metricName1);
|
||||||
doNothing().when(metrics).addMetric(eq(metricName1), eqMetricConfig(metricConfig), eq(valueProvider));
|
doNothing().when(metrics).addMetric(eq(metricName1), eqMetricConfig(metricConfig), eq(valueProvider));
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
|
|
||||||
streamsMetrics.addClientLevelMutableMetric(METRIC_NAME1, DESCRIPTION1, recordingLevel, valueProvider);
|
streamsMetrics.addClientLevelMutableMetric(METRIC_NAME1, DESCRIPTION1, recordingLevel, valueProvider);
|
||||||
}
|
}
|
||||||
|
@ -699,7 +697,7 @@ public class StreamsMetricsImplTest {
|
||||||
@Test
|
@Test
|
||||||
public void shouldRemoveClientLevelMetricsAndSensors() {
|
public void shouldRemoveClientLevelMetricsAndSensors() {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
final ArgumentCaptor<String> sensorKeys = addSensorsOnAllLevels(metrics, streamsMetrics);
|
final ArgumentCaptor<String> sensorKeys = addSensorsOnAllLevels(metrics, streamsMetrics);
|
||||||
|
|
||||||
doNothing().when(metrics).removeSensor(sensorKeys.getAllValues().get(0));
|
doNothing().when(metrics).removeSensor(sensorKeys.getAllValues().get(0));
|
||||||
|
@ -712,7 +710,7 @@ public class StreamsMetricsImplTest {
|
||||||
@Test
|
@Test
|
||||||
public void shouldRemoveThreadLevelSensors() {
|
public void shouldRemoveThreadLevelSensors() {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
addSensorsOnAllLevels(metrics, streamsMetrics);
|
addSensorsOnAllLevels(metrics, streamsMetrics);
|
||||||
setupRemoveSensorsTest(metrics, THREAD_ID1);
|
setupRemoveSensorsTest(metrics, THREAD_ID1);
|
||||||
|
|
||||||
|
@ -721,7 +719,7 @@ public class StreamsMetricsImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNullMetrics() {
|
public void testNullMetrics() {
|
||||||
assertThrows(NullPointerException.class, () -> new StreamsMetricsImpl(null, "", VERSION, time));
|
assertThrows(NullPointerException.class, () -> new StreamsMetricsImpl(null, "", time));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -754,7 +752,7 @@ public class StreamsMetricsImplTest {
|
||||||
@Test
|
@Test
|
||||||
public void testMultiLevelSensorRemoval() {
|
public void testMultiLevelSensorRemoval() {
|
||||||
final Metrics registry = new Metrics();
|
final Metrics registry = new Metrics();
|
||||||
final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, THREAD_ID1, VERSION, time);
|
final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, THREAD_ID1, time);
|
||||||
for (final MetricName defaultMetric : registry.metrics().keySet()) {
|
for (final MetricName defaultMetric : registry.metrics().keySet()) {
|
||||||
registry.removeMetric(defaultMetric);
|
registry.removeMetric(defaultMetric);
|
||||||
}
|
}
|
||||||
|
@ -860,7 +858,7 @@ public class StreamsMetricsImplTest {
|
||||||
final MockTime time = new MockTime(1);
|
final MockTime time = new MockTime(1);
|
||||||
final MetricConfig config = new MetricConfig().timeWindow(1, TimeUnit.MILLISECONDS);
|
final MetricConfig config = new MetricConfig().timeWindow(1, TimeUnit.MILLISECONDS);
|
||||||
final Metrics metrics = new Metrics(config, time);
|
final Metrics metrics = new Metrics(config, time);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "", VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "", time);
|
||||||
|
|
||||||
final String scope = "scope";
|
final String scope = "scope";
|
||||||
final String entity = "entity";
|
final String entity = "entity";
|
||||||
|
@ -894,7 +892,7 @@ public class StreamsMetricsImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldAddLatencyRateTotalSensor() {
|
public void shouldAddLatencyRateTotalSensor() {
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
shouldAddCustomSensor(
|
shouldAddCustomSensor(
|
||||||
streamsMetrics.addLatencyRateTotalSensor(SCOPE_NAME, ENTITY_NAME, OPERATION_NAME, RecordingLevel.DEBUG),
|
streamsMetrics.addLatencyRateTotalSensor(SCOPE_NAME, ENTITY_NAME, OPERATION_NAME, RecordingLevel.DEBUG),
|
||||||
streamsMetrics,
|
streamsMetrics,
|
||||||
|
@ -909,7 +907,7 @@ public class StreamsMetricsImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldAddRateTotalSensor() {
|
public void shouldAddRateTotalSensor() {
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
|
||||||
shouldAddCustomSensor(
|
shouldAddCustomSensor(
|
||||||
streamsMetrics.addRateTotalSensor(SCOPE_NAME, ENTITY_NAME, OPERATION_NAME, RecordingLevel.DEBUG),
|
streamsMetrics.addRateTotalSensor(SCOPE_NAME, ENTITY_NAME, OPERATION_NAME, RecordingLevel.DEBUG),
|
||||||
streamsMetrics,
|
streamsMetrics,
|
||||||
|
@ -1044,7 +1042,7 @@ public class StreamsMetricsImplTest {
|
||||||
final String taskName = "test-task";
|
final String taskName = "test-task";
|
||||||
final String storeType = "remote-window";
|
final String storeType = "remote-window";
|
||||||
final String storeName = "window-keeper";
|
final String storeName = "window-keeper";
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_ID1, time);
|
||||||
|
|
||||||
final Map<String, String> tagMap = streamsMetrics.storeLevelTagMap(taskName, storeType, storeName);
|
final Map<String, String> tagMap = streamsMetrics.storeLevelTagMap(taskName, storeType, storeName);
|
||||||
|
|
||||||
|
@ -1059,7 +1057,7 @@ public class StreamsMetricsImplTest {
|
||||||
@Test
|
@Test
|
||||||
public void shouldGetCacheLevelTagMap() {
|
public void shouldGetCacheLevelTagMap() {
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
|
new StreamsMetricsImpl(metrics, THREAD_ID1, time);
|
||||||
final String taskName = "taskName";
|
final String taskName = "taskName";
|
||||||
final String storeName = "storeName";
|
final String storeName = "storeName";
|
||||||
|
|
||||||
|
@ -1076,7 +1074,7 @@ public class StreamsMetricsImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldGetThreadLevelTagMap() {
|
public void shouldGetThreadLevelTagMap() {
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_ID1, time);
|
||||||
|
|
||||||
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(THREAD_ID1);
|
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(THREAD_ID1);
|
||||||
|
|
||||||
|
@ -1209,7 +1207,7 @@ public class StreamsMetricsImplTest {
|
||||||
@Test
|
@Test
|
||||||
public void shouldReturnMetricsVersionCurrent() {
|
public void shouldReturnMetricsVersionCurrent() {
|
||||||
assertThat(
|
assertThat(
|
||||||
new StreamsMetricsImpl(metrics, THREAD_ID1, StreamsConfig.METRICS_LATEST, time).version(),
|
new StreamsMetricsImpl(metrics, THREAD_ID1, time).version(),
|
||||||
equalTo(Version.LATEST)
|
equalTo(Version.LATEST)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -1268,7 +1266,7 @@ public class StreamsMetricsImplTest {
|
||||||
public void shouldAddThreadLevelMutableMetric() {
|
public void shouldAddThreadLevelMutableMetric() {
|
||||||
final int measuredValue = 123;
|
final int measuredValue = 123;
|
||||||
final StreamsMetricsImpl streamsMetrics
|
final StreamsMetricsImpl streamsMetrics
|
||||||
= new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
|
= new StreamsMetricsImpl(metrics, THREAD_ID1, time);
|
||||||
|
|
||||||
streamsMetrics.addThreadLevelMutableMetric(
|
streamsMetrics.addThreadLevelMutableMetric(
|
||||||
"foobar",
|
"foobar",
|
||||||
|
@ -1290,7 +1288,7 @@ public class StreamsMetricsImplTest {
|
||||||
public void shouldCleanupThreadLevelMutableMetric() {
|
public void shouldCleanupThreadLevelMutableMetric() {
|
||||||
final int measuredValue = 123;
|
final int measuredValue = 123;
|
||||||
final StreamsMetricsImpl streamsMetrics
|
final StreamsMetricsImpl streamsMetrics
|
||||||
= new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
|
= new StreamsMetricsImpl(metrics, THREAD_ID1, time);
|
||||||
streamsMetrics.addThreadLevelMutableMetric(
|
streamsMetrics.addThreadLevelMutableMetric(
|
||||||
"foobar",
|
"foobar",
|
||||||
"test metric",
|
"test metric",
|
||||||
|
@ -1312,7 +1310,7 @@ public class StreamsMetricsImplTest {
|
||||||
public void shouldAddThreadLevelImmutableMetric() {
|
public void shouldAddThreadLevelImmutableMetric() {
|
||||||
final int measuredValue = 123;
|
final int measuredValue = 123;
|
||||||
final StreamsMetricsImpl streamsMetrics
|
final StreamsMetricsImpl streamsMetrics
|
||||||
= new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
|
= new StreamsMetricsImpl(metrics, THREAD_ID1, time);
|
||||||
|
|
||||||
streamsMetrics.addThreadLevelImmutableMetric(
|
streamsMetrics.addThreadLevelImmutableMetric(
|
||||||
"foobar",
|
"foobar",
|
||||||
|
@ -1334,7 +1332,7 @@ public class StreamsMetricsImplTest {
|
||||||
public void shouldCleanupThreadLevelImmutableMetric() {
|
public void shouldCleanupThreadLevelImmutableMetric() {
|
||||||
final int measuredValue = 123;
|
final int measuredValue = 123;
|
||||||
final StreamsMetricsImpl streamsMetrics
|
final StreamsMetricsImpl streamsMetrics
|
||||||
= new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
|
= new StreamsMetricsImpl(metrics, THREAD_ID1, time);
|
||||||
streamsMetrics.addThreadLevelImmutableMetric(
|
streamsMetrics.addThreadLevelImmutableMetric(
|
||||||
"foobar",
|
"foobar",
|
||||||
"test metric",
|
"test metric",
|
||||||
|
|
|
@ -1416,7 +1416,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
|
||||||
dir,
|
dir,
|
||||||
Serdes.String(),
|
Serdes.String(),
|
||||||
Serdes.String(),
|
Serdes.String(),
|
||||||
new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST, new MockTime()),
|
new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
|
||||||
new StreamsConfig(props),
|
new StreamsConfig(props),
|
||||||
MockRecordCollector::new,
|
MockRecordCollector::new,
|
||||||
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())),
|
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())),
|
||||||
|
@ -1452,7 +1452,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
|
||||||
dir,
|
dir,
|
||||||
Serdes.String(),
|
Serdes.String(),
|
||||||
Serdes.String(),
|
Serdes.String(),
|
||||||
new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST, new MockTime()),
|
new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
|
||||||
new StreamsConfig(props),
|
new StreamsConfig(props),
|
||||||
MockRecordCollector::new,
|
MockRecordCollector::new,
|
||||||
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())),
|
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())),
|
||||||
|
@ -1491,7 +1491,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
|
||||||
dir,
|
dir,
|
||||||
Serdes.String(),
|
Serdes.String(),
|
||||||
Serdes.String(),
|
Serdes.String(),
|
||||||
new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST, new MockTime()),
|
new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
|
||||||
new StreamsConfig(props),
|
new StreamsConfig(props),
|
||||||
MockRecordCollector::new,
|
MockRecordCollector::new,
|
||||||
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())),
|
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())),
|
||||||
|
@ -1532,7 +1532,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
|
||||||
dir,
|
dir,
|
||||||
Serdes.String(),
|
Serdes.String(),
|
||||||
Serdes.String(),
|
Serdes.String(),
|
||||||
new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST, new MockTime()),
|
new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
|
||||||
new StreamsConfig(props),
|
new StreamsConfig(props),
|
||||||
MockRecordCollector::new,
|
MockRecordCollector::new,
|
||||||
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())),
|
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())),
|
||||||
|
|
|
@ -573,7 +573,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
|
||||||
dir,
|
dir,
|
||||||
Serdes.String(),
|
Serdes.String(),
|
||||||
Serdes.String(),
|
Serdes.String(),
|
||||||
new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST, new MockTime()),
|
new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
|
||||||
new StreamsConfig(props),
|
new StreamsConfig(props),
|
||||||
MockRecordCollector::new,
|
MockRecordCollector::new,
|
||||||
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())),
|
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())),
|
||||||
|
@ -613,7 +613,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
|
||||||
dir,
|
dir,
|
||||||
Serdes.String(),
|
Serdes.String(),
|
||||||
Serdes.String(),
|
Serdes.String(),
|
||||||
new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST, new MockTime()),
|
new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
|
||||||
new StreamsConfig(props),
|
new StreamsConfig(props),
|
||||||
MockRecordCollector::new,
|
MockRecordCollector::new,
|
||||||
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())),
|
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())),
|
||||||
|
@ -655,7 +655,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
|
||||||
dir,
|
dir,
|
||||||
Serdes.String(),
|
Serdes.String(),
|
||||||
Serdes.String(),
|
Serdes.String(),
|
||||||
new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST, new MockTime()),
|
new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
|
||||||
new StreamsConfig(props),
|
new StreamsConfig(props),
|
||||||
MockRecordCollector::new,
|
MockRecordCollector::new,
|
||||||
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())),
|
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())),
|
||||||
|
@ -699,7 +699,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
|
||||||
dir,
|
dir,
|
||||||
Serdes.String(),
|
Serdes.String(),
|
||||||
Serdes.String(),
|
Serdes.String(),
|
||||||
new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST, new MockTime()),
|
new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
|
||||||
new StreamsConfig(props),
|
new StreamsConfig(props),
|
||||||
MockRecordCollector::new,
|
MockRecordCollector::new,
|
||||||
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())),
|
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())),
|
||||||
|
|
|
@ -98,7 +98,7 @@ public class ChangeLoggingKeyValueBytesStoreTest {
|
||||||
TestUtils.tempDirectory(),
|
TestUtils.tempDirectory(),
|
||||||
Serdes.String(),
|
Serdes.String(),
|
||||||
Serdes.Long(),
|
Serdes.Long(),
|
||||||
new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST, new MockTime()),
|
new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
|
||||||
streamsConfig,
|
streamsConfig,
|
||||||
() -> collector,
|
() -> collector,
|
||||||
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())),
|
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())),
|
||||||
|
|
|
@ -113,7 +113,7 @@ public class GlobalStateStoreProviderTest {
|
||||||
when(mockContext.applicationId()).thenReturn("appId");
|
when(mockContext.applicationId()).thenReturn("appId");
|
||||||
when(mockContext.metrics())
|
when(mockContext.metrics())
|
||||||
.thenReturn(
|
.thenReturn(
|
||||||
new StreamsMetricsImpl(new Metrics(), "threadName", StreamsConfig.METRICS_LATEST, new MockTime())
|
new StreamsMetricsImpl(new Metrics(), "threadName", new MockTime())
|
||||||
);
|
);
|
||||||
when(mockContext.taskId()).thenReturn(new TaskId(0, 0));
|
when(mockContext.taskId()).thenReturn(new TaskId(0, 0));
|
||||||
when(mockContext.appConfigs()).thenReturn(CONFIGS);
|
when(mockContext.appConfigs()).thenReturn(CONFIGS);
|
||||||
|
|
|
@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
|
||||||
|
|
||||||
import org.apache.kafka.common.metrics.Metrics;
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
import org.apache.kafka.streams.StreamsConfig;
|
|
||||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||||
import org.apache.kafka.streams.processor.TaskId;
|
import org.apache.kafka.streams.processor.TaskId;
|
||||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||||
|
@ -58,7 +57,7 @@ public class KeyValueSegmentTest {
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
metricsRecorder.init(
|
metricsRecorder.init(
|
||||||
new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, new MockTime()),
|
new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()),
|
||||||
new TaskId(0, 0)
|
new TaskId(0, 0)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
import org.apache.kafka.common.utils.Bytes;
|
import org.apache.kafka.common.utils.Bytes;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
import org.apache.kafka.streams.KeyValue;
|
import org.apache.kafka.streams.KeyValue;
|
||||||
import org.apache.kafka.streams.StreamsConfig;
|
|
||||||
import org.apache.kafka.streams.processor.StateStoreContext;
|
import org.apache.kafka.streams.processor.StateStoreContext;
|
||||||
import org.apache.kafka.streams.processor.TaskId;
|
import org.apache.kafka.streams.processor.TaskId;
|
||||||
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
|
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
|
||||||
|
@ -125,7 +124,7 @@ public class MeteredKeyValueStoreTest {
|
||||||
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
|
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
|
||||||
when(context.applicationId()).thenReturn(APPLICATION_ID);
|
when(context.applicationId()).thenReturn(APPLICATION_ID);
|
||||||
when(context.metrics()).thenReturn(
|
when(context.metrics()).thenReturn(
|
||||||
new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime)
|
new StreamsMetricsImpl(metrics, "test", mockTime)
|
||||||
);
|
);
|
||||||
when(context.taskId()).thenReturn(taskId);
|
when(context.taskId()).thenReturn(taskId);
|
||||||
when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
|
when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
|
||||||
|
|
|
@ -32,7 +32,6 @@ import org.apache.kafka.common.utils.Bytes;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.streams.KeyValue;
|
import org.apache.kafka.streams.KeyValue;
|
||||||
import org.apache.kafka.streams.StreamsConfig;
|
|
||||||
import org.apache.kafka.streams.kstream.Windowed;
|
import org.apache.kafka.streams.kstream.Windowed;
|
||||||
import org.apache.kafka.streams.kstream.internals.SessionWindow;
|
import org.apache.kafka.streams.kstream.internals.SessionWindow;
|
||||||
import org.apache.kafka.streams.processor.StateStoreContext;
|
import org.apache.kafka.streams.processor.StateStoreContext;
|
||||||
|
@ -128,7 +127,7 @@ public class MeteredSessionStoreTest {
|
||||||
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
|
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
|
||||||
when(context.applicationId()).thenReturn(APPLICATION_ID);
|
when(context.applicationId()).thenReturn(APPLICATION_ID);
|
||||||
when(context.metrics())
|
when(context.metrics())
|
||||||
.thenReturn(new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime));
|
.thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime));
|
||||||
when(context.taskId()).thenReturn(taskId);
|
when(context.taskId()).thenReturn(taskId);
|
||||||
when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
|
when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
|
||||||
when(innerStore.name()).thenReturn(STORE_NAME);
|
when(innerStore.name()).thenReturn(STORE_NAME);
|
||||||
|
|
|
@ -129,7 +129,7 @@ public class MeteredTimestampedKeyValueStoreTest {
|
||||||
setUpWithoutContext();
|
setUpWithoutContext();
|
||||||
when(context.applicationId()).thenReturn(APPLICATION_ID);
|
when(context.applicationId()).thenReturn(APPLICATION_ID);
|
||||||
when(context.metrics())
|
when(context.metrics())
|
||||||
.thenReturn(new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime));
|
.thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime));
|
||||||
when(context.taskId()).thenReturn(taskId);
|
when(context.taskId()).thenReturn(taskId);
|
||||||
when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
|
when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
|
||||||
when(inner.name()).thenReturn(STORE_NAME);
|
when(inner.name()).thenReturn(STORE_NAME);
|
||||||
|
|
|
@ -78,7 +78,7 @@ public class MeteredTimestampedWindowStoreTest {
|
||||||
|
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, new MockTime());
|
new StreamsMetricsImpl(metrics, "test", new MockTime());
|
||||||
|
|
||||||
context = new InternalMockProcessorContext<>(
|
context = new InternalMockProcessorContext<>(
|
||||||
TestUtils.tempDirectory(),
|
TestUtils.tempDirectory(),
|
||||||
|
@ -106,7 +106,7 @@ public class MeteredTimestampedWindowStoreTest {
|
||||||
|
|
||||||
public void setUpWithoutContextName() {
|
public void setUpWithoutContextName() {
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, new MockTime());
|
new StreamsMetricsImpl(metrics, "test", new MockTime());
|
||||||
|
|
||||||
context = new InternalMockProcessorContext<>(
|
context = new InternalMockProcessorContext<>(
|
||||||
TestUtils.tempDirectory(),
|
TestUtils.tempDirectory(),
|
||||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.kafka.common.serialization.Serializer;
|
||||||
import org.apache.kafka.common.utils.Bytes;
|
import org.apache.kafka.common.utils.Bytes;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.streams.StreamsConfig;
|
|
||||||
import org.apache.kafka.streams.processor.StateStoreContext;
|
import org.apache.kafka.streams.processor.StateStoreContext;
|
||||||
import org.apache.kafka.streams.processor.TaskId;
|
import org.apache.kafka.streams.processor.TaskId;
|
||||||
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
|
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
|
||||||
|
@ -112,7 +111,7 @@ public class MeteredVersionedKeyValueStoreTest {
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
when(inner.name()).thenReturn(STORE_NAME);
|
when(inner.name()).thenReturn(STORE_NAME);
|
||||||
when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime));
|
when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime));
|
||||||
when(context.applicationId()).thenReturn(APPLICATION_ID);
|
when(context.applicationId()).thenReturn(APPLICATION_ID);
|
||||||
when(context.taskId()).thenReturn(TASK_ID);
|
when(context.taskId()).thenReturn(TASK_ID);
|
||||||
|
|
||||||
|
|
|
@ -117,7 +117,7 @@ public class MeteredWindowStoreTest {
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, new MockTime());
|
new StreamsMetricsImpl(metrics, "test", new MockTime());
|
||||||
context = new InternalMockProcessorContext<>(
|
context = new InternalMockProcessorContext<>(
|
||||||
TestUtils.tempDirectory(),
|
TestUtils.tempDirectory(),
|
||||||
Serdes.String(),
|
Serdes.String(),
|
||||||
|
|
|
@ -919,7 +919,7 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
|
||||||
|
|
||||||
final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.DEBUG));
|
final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.DEBUG));
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, "test-application", StreamsConfig.METRICS_LATEST, time);
|
new StreamsMetricsImpl(metrics, "test-application", time);
|
||||||
|
|
||||||
context = mock(InternalMockProcessorContext.class);
|
context = mock(InternalMockProcessorContext.class);
|
||||||
when(context.metrics()).thenReturn(streamsMetrics);
|
when(context.metrics()).thenReturn(streamsMetrics);
|
||||||
|
@ -952,7 +952,7 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
|
||||||
|
|
||||||
final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.INFO));
|
final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.INFO));
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, "test-application", StreamsConfig.METRICS_LATEST, time);
|
new StreamsMetricsImpl(metrics, "test-application", time);
|
||||||
|
|
||||||
context = mock(InternalMockProcessorContext.class);
|
context = mock(InternalMockProcessorContext.class);
|
||||||
when(context.metrics()).thenReturn(streamsMetrics);
|
when(context.metrics()).thenReturn(streamsMetrics);
|
||||||
|
@ -984,7 +984,7 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
|
||||||
|
|
||||||
final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.INFO));
|
final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.INFO));
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, "test-application", StreamsConfig.METRICS_LATEST, time);
|
new StreamsMetricsImpl(metrics, "test-application", time);
|
||||||
|
|
||||||
final Properties props = StreamsTestUtils.getStreamsConfig();
|
final Properties props = StreamsTestUtils.getStreamsConfig();
|
||||||
context = mock(InternalMockProcessorContext.class);
|
context = mock(InternalMockProcessorContext.class);
|
||||||
|
|
|
@ -21,7 +21,6 @@ import org.apache.kafka.common.metrics.Metrics;
|
||||||
import org.apache.kafka.common.metrics.Sensor;
|
import org.apache.kafka.common.metrics.Sensor;
|
||||||
import org.apache.kafka.common.serialization.Serdes;
|
import org.apache.kafka.common.serialization.Serdes;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
import org.apache.kafka.streams.StreamsConfig;
|
|
||||||
import org.apache.kafka.streams.processor.StateStoreContext;
|
import org.apache.kafka.streams.processor.StateStoreContext;
|
||||||
import org.apache.kafka.streams.processor.TaskId;
|
import org.apache.kafka.streams.processor.TaskId;
|
||||||
import org.apache.kafka.streams.processor.api.Record;
|
import org.apache.kafka.streams.processor.api.Record;
|
||||||
|
@ -66,7 +65,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
|
||||||
when(serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde());
|
when(serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde());
|
||||||
final Metrics metrics = new Metrics();
|
final Metrics metrics = new Metrics();
|
||||||
offset = 0;
|
offset = 0;
|
||||||
streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime());
|
streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", new MockTime());
|
||||||
context = new MockInternalNewProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new TaskId(0, 0), TestUtils.tempDirectory());
|
context = new MockInternalNewProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new TaskId(0, 0), TestUtils.tempDirectory());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
|
||||||
|
|
||||||
import org.apache.kafka.common.metrics.Metrics;
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
import org.apache.kafka.streams.StreamsConfig;
|
|
||||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||||
import org.apache.kafka.streams.processor.TaskId;
|
import org.apache.kafka.streams.processor.TaskId;
|
||||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||||
|
@ -58,7 +57,7 @@ public class TimestampedSegmentTest {
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
metricsRecorder.init(
|
metricsRecorder.init(
|
||||||
new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, new MockTime()),
|
new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()),
|
||||||
new TaskId(0, 0)
|
new TaskId(0, 0)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,6 @@ import org.apache.kafka.common.MetricName;
|
||||||
import org.apache.kafka.common.metrics.KafkaMetric;
|
import org.apache.kafka.common.metrics.KafkaMetric;
|
||||||
import org.apache.kafka.common.metrics.Metrics;
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
import org.apache.kafka.streams.StreamsConfig;
|
|
||||||
import org.apache.kafka.streams.processor.TaskId;
|
import org.apache.kafka.streams.processor.TaskId;
|
||||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||||
|
|
||||||
|
@ -203,7 +202,7 @@ public class RocksDBMetricsRecorderGaugesTest {
|
||||||
|
|
||||||
private void runAndVerifySumOfProperties(final String propertyName) throws Exception {
|
private void runAndVerifySumOfProperties(final String propertyName) throws Exception {
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, new MockTime());
|
new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime());
|
||||||
final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
|
final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
|
||||||
|
|
||||||
recorder.init(streamsMetrics, TASK_ID);
|
recorder.init(streamsMetrics, TASK_ID);
|
||||||
|
@ -220,7 +219,7 @@ public class RocksDBMetricsRecorderGaugesTest {
|
||||||
|
|
||||||
private void runAndVerifyBlockCacheMetricsWithMultipleCaches(final String propertyName) throws Exception {
|
private void runAndVerifyBlockCacheMetricsWithMultipleCaches(final String propertyName) throws Exception {
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, new MockTime());
|
new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime());
|
||||||
final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
|
final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
|
||||||
|
|
||||||
recorder.init(streamsMetrics, TASK_ID);
|
recorder.init(streamsMetrics, TASK_ID);
|
||||||
|
@ -237,7 +236,7 @@ public class RocksDBMetricsRecorderGaugesTest {
|
||||||
|
|
||||||
private void runAndVerifyBlockCacheMetricsWithSingleCache(final String propertyName) throws Exception {
|
private void runAndVerifyBlockCacheMetricsWithSingleCache(final String propertyName) throws Exception {
|
||||||
final StreamsMetricsImpl streamsMetrics =
|
final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, new MockTime());
|
new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime());
|
||||||
final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
|
final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
|
||||||
|
|
||||||
recorder.init(streamsMetrics, TASK_ID);
|
recorder.init(streamsMetrics, TASK_ID);
|
||||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals.metrics;
|
||||||
import org.apache.kafka.common.metrics.Metrics;
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
import org.apache.kafka.common.metrics.Sensor;
|
import org.apache.kafka.common.metrics.Sensor;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
import org.apache.kafka.streams.StreamsConfig;
|
|
||||||
import org.apache.kafka.streams.processor.TaskId;
|
import org.apache.kafka.streams.processor.TaskId;
|
||||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||||
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.RocksDBMetricContext;
|
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.RocksDBMetricContext;
|
||||||
|
@ -179,7 +178,7 @@ public class RocksDBMetricsRecorderTest {
|
||||||
assertThrows(
|
assertThrows(
|
||||||
IllegalStateException.class,
|
IllegalStateException.class,
|
||||||
() -> recorder.init(
|
() -> recorder.init(
|
||||||
new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, new MockTime()),
|
new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()),
|
||||||
TASK_ID1
|
TASK_ID1
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
|
@ -89,7 +89,7 @@ public class InternalMockProcessorContext<KOut, VOut>
|
||||||
this(null,
|
this(null,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST, new MockTime()),
|
new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
|
||||||
new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
|
new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
@ -106,7 +106,6 @@ public class InternalMockProcessorContext<KOut, VOut>
|
||||||
new StreamsMetricsImpl(
|
new StreamsMetricsImpl(
|
||||||
new Metrics(),
|
new Metrics(),
|
||||||
"mock",
|
"mock",
|
||||||
config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
|
|
||||||
new MockTime()
|
new MockTime()
|
||||||
),
|
),
|
||||||
config,
|
config,
|
||||||
|
@ -139,7 +138,6 @@ public class InternalMockProcessorContext<KOut, VOut>
|
||||||
new StreamsMetricsImpl(
|
new StreamsMetricsImpl(
|
||||||
new Metrics(),
|
new Metrics(),
|
||||||
"mock",
|
"mock",
|
||||||
config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
|
|
||||||
new MockTime()
|
new MockTime()
|
||||||
),
|
),
|
||||||
config,
|
config,
|
||||||
|
@ -157,7 +155,7 @@ public class InternalMockProcessorContext<KOut, VOut>
|
||||||
stateDir,
|
stateDir,
|
||||||
keySerde,
|
keySerde,
|
||||||
valueSerde,
|
valueSerde,
|
||||||
new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST, new MockTime()),
|
new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
|
||||||
config,
|
config,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
@ -177,7 +175,7 @@ public class InternalMockProcessorContext<KOut, VOut>
|
||||||
null,
|
null,
|
||||||
serdes.keySerde(),
|
serdes.keySerde(),
|
||||||
serdes.valueSerde(),
|
serdes.valueSerde(),
|
||||||
new StreamsMetricsImpl(metrics, "mock", StreamsConfig.METRICS_LATEST, new MockTime()),
|
new StreamsMetricsImpl(metrics, "mock", new MockTime()),
|
||||||
new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
|
new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
|
||||||
() -> collector,
|
() -> collector,
|
||||||
null,
|
null,
|
||||||
|
@ -194,7 +192,7 @@ public class InternalMockProcessorContext<KOut, VOut>
|
||||||
stateDir,
|
stateDir,
|
||||||
keySerde,
|
keySerde,
|
||||||
valueSerde,
|
valueSerde,
|
||||||
new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST, new MockTime()),
|
new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
|
||||||
new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
|
new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
|
||||||
() -> collector,
|
() -> collector,
|
||||||
cache,
|
cache,
|
||||||
|
|
|
@ -403,7 +403,6 @@ public class TopologyTestDriver implements Closeable {
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
|
||||||
metrics,
|
metrics,
|
||||||
"test-client",
|
"test-client",
|
||||||
streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
|
|
||||||
mockWallClockTime
|
mockWallClockTime
|
||||||
);
|
);
|
||||||
TaskMetrics.droppedRecordsSensor(threadId, TASK_ID.toString(), streamsMetrics);
|
TaskMetrics.droppedRecordsSensor(threadId, TASK_ID.toString(), streamsMetrics);
|
||||||
|
|
|
@ -242,7 +242,6 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
|
||||||
this.metrics = new StreamsMetricsImpl(
|
this.metrics = new StreamsMetricsImpl(
|
||||||
new Metrics(metricConfig),
|
new Metrics(metricConfig),
|
||||||
threadId,
|
threadId,
|
||||||
streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
|
|
||||||
Time.SYSTEM
|
Time.SYSTEM
|
||||||
);
|
);
|
||||||
TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics);
|
TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics);
|
||||||
|
|
|
@ -255,7 +255,6 @@ public class MockProcessorContext<KForward, VForward> implements ProcessorContex
|
||||||
metrics = new StreamsMetricsImpl(
|
metrics = new StreamsMetricsImpl(
|
||||||
new Metrics(metricConfig),
|
new Metrics(metricConfig),
|
||||||
threadId,
|
threadId,
|
||||||
streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
|
|
||||||
Time.SYSTEM
|
Time.SYSTEM
|
||||||
);
|
);
|
||||||
TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics);
|
TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics);
|
||||||
|
|
|
@ -232,7 +232,6 @@ public class MockProcessorContextTest {
|
||||||
when(mockInternalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(
|
when(mockInternalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(
|
||||||
new Metrics(new MetricConfig()),
|
new Metrics(new MetricConfig()),
|
||||||
Thread.currentThread().getName(),
|
Thread.currentThread().getName(),
|
||||||
"",
|
|
||||||
Time.SYSTEM
|
Time.SYSTEM
|
||||||
));
|
));
|
||||||
when(mockInternalProcessorContext.taskId()).thenReturn(new TaskId(1, 1));
|
when(mockInternalProcessorContext.taskId()).thenReturn(new TaskId(1, 1));
|
||||||
|
|
Loading…
Reference in New Issue