diff --git a/build.gradle b/build.gradle index 07200a80b5f..516a06cfed2 100644 --- a/build.gradle +++ b/build.gradle @@ -1120,6 +1120,8 @@ project(':streams') { testCompile libs.log4j testCompile libs.junit testCompile libs.easymock + testCompile libs.powermockJunit4 + testCompile libs.powermockEasymock testCompile libs.bcpkix testCompile libs.hamcrest diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java index 8fd4f5b4b7d..ca7266f5aba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java @@ -16,12 +16,14 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.slf4j.Logger; @@ -57,6 +59,7 @@ public class KStreamAggregate implements KStreamAggProcessorSupplier { private TimestampedKeyValueStore store; private StreamsMetricsImpl metrics; + private Sensor skippedRecordsSensor; private TimestampedTupleForwarder tupleForwarder; @SuppressWarnings("unchecked") @@ -64,6 +67,7 @@ public class KStreamAggregate implements KStreamAggProcessorSupplier) context.getStateStore(storeName); tupleForwarder = new TimestampedTupleForwarder<>( store, @@ -80,7 +84,7 @@ public class KStreamAggregate implements KStreamAggProcessorSupplier implements ProcessorSupplier { private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class); @@ -57,12 +58,14 @@ class KStreamKStreamJoin implements ProcessorSupplier { private WindowStore otherWindow; private StreamsMetricsImpl metrics; + private Sensor skippedRecordsSensor; @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { super.init(context); metrics = (StreamsMetricsImpl) context.metrics(); + skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics); otherWindow = (WindowStore) context.getStateStore(otherWindowName); } @@ -81,7 +84,7 @@ class KStreamKStreamJoin implements ProcessorSupplier { "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]", key, value, context().topic(), context().partition(), context().offset() ); - metrics.skippedRecordsSensor().record(); + skippedRecordsSensor.record(); return; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java index dcf0799c190..92fd4d52e5a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java @@ -16,11 +16,13 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +36,7 @@ class KStreamKTableJoinProcessor extends AbstractProcessor joiner; private final boolean leftJoin; private StreamsMetricsImpl metrics; + private Sensor skippedRecordsSensor; KStreamKTableJoinProcessor(final KTableValueGetter valueGetter, final KeyValueMapper keyMapper, @@ -49,6 +52,8 @@ class KStreamKTableJoinProcessor extends AbstractProcessor extends AbstractProcessor implements KStreamAggProcessorSupplier store; private TimestampedTupleForwarder tupleForwarder; private StreamsMetricsImpl metrics; + private Sensor skippedRecordsSensor; @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { super.init(context); metrics = (StreamsMetricsImpl) context.metrics(); + skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics); store = (TimestampedKeyValueStore) context.getStateStore(storeName); tupleForwarder = new TimestampedTupleForwarder<>( store, @@ -78,7 +82,7 @@ public class KStreamReduce implements KStreamAggProcessorSupplier implements KStreamAggProce private StreamsMetricsImpl metrics; private InternalProcessorContext internalProcessorContext; private Sensor lateRecordDropSensor; + private Sensor skippedRecordsSensor; private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; @SuppressWarnings("unchecked") @@ -92,6 +94,7 @@ public class KStreamSessionWindowAggregate implements KStreamAggProce internalProcessorContext = (InternalProcessorContext) context; metrics = (StreamsMetricsImpl) context.metrics(); lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext); + skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics); store = (SessionStore) context.getStateStore(storeName); tupleForwarder = new SessionTupleForwarder<>(store, context, new SessionCacheFlushListener<>(context), sendOldValues); @@ -106,7 +109,7 @@ public class KStreamSessionWindowAggregate implements KStreamAggProce "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]", value, context().topic(), context().partition(), context().offset() ); - metrics.skippedRecordsSensor().record(); + skippedRecordsSensor.record(); return; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index 3458ca02e1c..2983a3a9c7e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; import org.apache.kafka.streams.state.TimestampedWindowStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.slf4j.Logger; @@ -79,6 +80,7 @@ public class KStreamWindowAggregate implements KStr private StreamsMetricsImpl metrics; private InternalProcessorContext internalProcessorContext; private Sensor lateRecordDropSensor; + private Sensor skippedRecordsSensor; private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; @SuppressWarnings("unchecked") @@ -86,8 +88,11 @@ public class KStreamWindowAggregate implements KStr public void init(final ProcessorContext context) { super.init(context); internalProcessorContext = (InternalProcessorContext) context; - metrics = (StreamsMetricsImpl) context.metrics(); + + metrics = internalProcessorContext.metrics(); + lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext); + skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics); windowStore = (TimestampedWindowStore) context.getStateStore(storeName); tupleForwarder = new TimestampedTupleForwarder<>( windowStore, @@ -103,7 +108,7 @@ public class KStreamWindowAggregate implements KStr "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]", value, context().topic(), context().partition(), context().offset() ); - metrics.skippedRecordsSensor().record(); + skippedRecordsSensor.record(); return; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java index 2d4cbc8415c..005ea809b3f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.processor.AbstractProcessor; @@ -23,6 +24,7 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +68,7 @@ class KTableKTableInnerJoin extends KTableKTableAbstractJoin valueGetter; private StreamsMetricsImpl metrics; + private Sensor skippedRecordsSensor; KTableKTableJoinProcessor(final KTableValueGetter valueGetter) { this.valueGetter = valueGetter; @@ -75,6 +78,7 @@ class KTableKTableInnerJoin extends KTableKTableAbstractJoin extends KTableKTableAbstractJoin extends KTableKTableAbstractJoin valueGetter; private StreamsMetricsImpl metrics; + private Sensor skippedRecordsSensor; KTableKTableLeftJoinProcessor(final KTableValueGetter valueGetter) { this.valueGetter = valueGetter; @@ -74,6 +77,7 @@ class KTableKTableLeftJoin extends KTableKTableAbstractJoin extends KTableKTableAbstractJoin extends KTableKTableAbstractJoin valueGetter; private StreamsMetricsImpl metrics; + private Sensor skippedRecordsSensor; KTableKTableOuterJoinProcessor(final KTableValueGetter valueGetter) { this.valueGetter = valueGetter; @@ -73,6 +76,7 @@ class KTableKTableOuterJoin extends KTableKTableAbstractJoin extends KTableKTableAbstractJoin extends KTableKTableAbstractJoin valueGetter; private StreamsMetricsImpl metrics; + private Sensor skippedRecordsSensor; KTableKTableRightJoinProcessor(final KTableValueGetter valueGetter) { this.valueGetter = valueGetter; @@ -72,6 +75,7 @@ class KTableKTableRightJoin extends KTableKTableAbstractJoin extends KTableKTableAbstractJoin implements ProcessorSupplier { private TimestampedKeyValueStore store; private TimestampedTupleForwarder tupleForwarder; private StreamsMetricsImpl metrics; + private Sensor skippedRecordsSensor; @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { super.init(context); metrics = (StreamsMetricsImpl) context.metrics(); + skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics); if (queryableName != null) { store = (TimestampedKeyValueStore) context.getStateStore(queryableName); tupleForwarder = new TimestampedTupleForwarder<>( @@ -94,7 +98,7 @@ public class KTableSource implements ProcessorSupplier { "Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]", context().topic(), context().partition(), context().offset() ); - metrics.skippedRecordsSensor().record(); + skippedRecordsSensor.record(); return; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 29c861e4291..d6f60e35133 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; import java.io.IOException; import java.util.HashMap; @@ -69,7 +70,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { source, deserializationExceptionHandler, logContext, - processorContext.metrics().skippedRecordsSensor() + ThreadMetrics.skipRecordSensor(processorContext.metrics()) ) ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 86f5be383cd..6f3e70b9922 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -18,17 +18,17 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TimestampExtractor; -import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; import org.slf4j.Logger; import java.util.ArrayDeque; - /** * RecordQueue is a FIFO queue of {@link StampedRecord} (ConsumerRecord + timestamp). It also keeps track of the * partition timestamp defined as the minimum timestamp of records in its queue; in addition, its partition @@ -48,6 +48,8 @@ public class RecordQueue { private StampedRecord headRecord = null; + private Sensor skipRecordsSensor; + RecordQueue(final TopicPartition partition, final SourceNode source, final TimestampExtractor timestampExtractor, @@ -58,13 +60,14 @@ public class RecordQueue { this.partition = partition; this.fifoQueue = new ArrayDeque<>(); this.timestampExtractor = timestampExtractor; - this.recordDeserializer = new RecordDeserializer( + this.processorContext = processorContext; + skipRecordsSensor = ThreadMetrics.skipRecordSensor(processorContext.metrics()); + recordDeserializer = new RecordDeserializer( source, deserializationExceptionHandler, logContext, - processorContext.metrics().skippedRecordsSensor() + skipRecordsSensor ); - this.processorContext = processorContext; this.log = logContext.logger(RecordQueue.class); } @@ -180,7 +183,8 @@ public class RecordQueue { "Skipping record due to negative extracted timestamp. topic=[{}] partition=[{}] offset=[{}] extractedTimestamp=[{}] extractor=[{}]", deserialized.topic(), deserialized.partition(), deserialized.offset(), timestamp, timestampExtractor.getClass().getCanonicalName() ); - ((StreamsMetricsImpl) processorContext.metrics()).skippedRecordsSensor().record(); + + skipRecordsSensor.record(); continue; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 80653c5e5d4..4fd57ba3db8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -45,6 +45,7 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; import org.apache.kafka.streams.state.internals.ThreadCache; import java.io.IOException; @@ -78,7 +79,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator private final PunctuationQueue systemTimePunctuationQueue; private final ProducerSupplier producerSupplier; - private Sensor closeSensor; + private Sensor closeTaskSensor; private long idleStartTime; private Producer producer; private boolean commitRequested = false; @@ -96,24 +97,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator final String group = "stream-task-metrics"; // first add the global operation metrics if not yet, with the global tags only - final Map allTagMap = metrics.tagMap("task-id", "all"); - final Sensor parent = metrics.threadLevelSensor("commit", Sensor.RecordingLevel.DEBUG); - parent.add( - new MetricName("commit-latency-avg", group, "The average latency of commit operation.", allTagMap), - new Avg() - ); - parent.add( - new MetricName("commit-latency-max", group, "The max latency of commit operation.", allTagMap), - new Max() - ); - parent.add( - new MetricName("commit-rate", group, "The average number of occurrence of commit operation per second.", allTagMap), - new Rate(TimeUnit.SECONDS, new Count()) - ); - parent.add( - new MetricName("commit-total", group, "The total number of occurrence of commit operations.", allTagMap), - new CumulativeCount() - ); + final Sensor parent = ThreadMetrics.commitOverTasksSensor(metrics); // add the operation metrics with additional tags final Map tagMap = metrics.tagMap("task-id", taskName); @@ -167,9 +151,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator final StateDirectory stateDirectory, final ThreadCache cache, final Time time, - final ProducerSupplier producerSupplier, - final Sensor closeSensor) { - this(id, partitions, topology, consumer, changelogReader, config, metrics, stateDirectory, cache, time, producerSupplier, null, closeSensor); + final ProducerSupplier producerSupplier) { + this(id, partitions, topology, consumer, changelogReader, config, metrics, stateDirectory, cache, time, producerSupplier, null); } public StreamTask(final TaskId id, @@ -178,20 +161,20 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator final Consumer consumer, final ChangelogReader changelogReader, final StreamsConfig config, - final StreamsMetricsImpl metrics, + final StreamsMetricsImpl streamsMetrics, final StateDirectory stateDirectory, final ThreadCache cache, final Time time, final ProducerSupplier producerSupplier, - final RecordCollector recordCollector, - final Sensor closeSensor) { + final RecordCollector recordCollector) { super(id, partitions, topology, consumer, changelogReader, false, stateDirectory, config); this.time = time; this.producerSupplier = producerSupplier; this.producer = producerSupplier.get(); - this.closeSensor = closeSensor; - this.taskMetrics = new TaskMetrics(id, metrics); + this.taskMetrics = new TaskMetrics(id, streamsMetrics); + + closeTaskSensor = ThreadMetrics.closeTaskSensor(streamsMetrics); final ProductionExceptionHandler productionExceptionHandler = config.defaultProductionExceptionHandler(); @@ -200,8 +183,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator id.toString(), logContext, productionExceptionHandler, - metrics.skippedRecordsSensor() - ); + ThreadMetrics.skipRecordSensor(streamsMetrics)); } else { this.recordCollector = recordCollector; } @@ -220,7 +202,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator final Map partitionQueues = new HashMap<>(); // initialize the topology with its own context - final ProcessorContextImpl processorContextImpl = new ProcessorContextImpl(id, this, config, this.recordCollector, stateMgr, metrics, cache); + final ProcessorContextImpl processorContextImpl = new ProcessorContextImpl(id, this, config, this.recordCollector, stateMgr, streamsMetrics, cache); processorContext = processorContextImpl; final TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor(); @@ -691,7 +673,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator partitionGroup.close(); taskMetrics.removeAllSensors(); - closeSensor.record(); + closeTaskSensor.record(); if (firstException != null) { throw firstException; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 419e1811963..4dc1bde9f3a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -31,9 +31,6 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.Count; -import org.apache.kafka.common.metrics.stats.Rate; -import org.apache.kafka.common.metrics.stats.Total; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; @@ -45,8 +42,8 @@ import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskMetadata; import org.apache.kafka.streams.processor.ThreadMetadata; -import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; import org.apache.kafka.streams.state.internals.ThreadCache; import org.slf4j.Logger; @@ -62,7 +59,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.singleton; @@ -340,7 +336,7 @@ public class StreamThread extends Thread { final String applicationId; final InternalTopologyBuilder builder; final StreamsConfig config; - final StreamsMetricsThreadImpl streamsMetrics; + final StreamsMetricsImpl streamsMetrics; final StateDirectory stateDirectory; final ChangelogReader storeChangelogReader; final Time time; @@ -349,7 +345,7 @@ public class StreamThread extends Thread { AbstractTaskCreator(final InternalTopologyBuilder builder, final StreamsConfig config, - final StreamsMetricsThreadImpl streamsMetrics, + final StreamsMetricsImpl streamsMetrics, final StateDirectory stateDirectory, final ChangelogReader storeChangelogReader, final Time time, @@ -398,10 +394,11 @@ public class StreamThread extends Thread { private final KafkaClientSupplier clientSupplier; private final String threadClientId; private final Producer threadProducer; + private final Sensor createTaskSensor; TaskCreator(final InternalTopologyBuilder builder, final StreamsConfig config, - final StreamsMetricsThreadImpl streamsMetrics, + final StreamsMetricsImpl streamsMetrics, final StateDirectory stateDirectory, final ChangelogReader storeChangelogReader, final ThreadCache cache, @@ -422,13 +419,14 @@ public class StreamThread extends Thread { this.clientSupplier = clientSupplier; this.threadProducer = threadProducer; this.threadClientId = threadClientId; + createTaskSensor = ThreadMetrics.createTaskSensor(streamsMetrics); } @Override StreamTask createTask(final Consumer consumer, final TaskId taskId, final Set partitions) { - streamsMetrics.taskCreatedSensor.record(); + createTaskSensor.record(); return new StreamTask( taskId, @@ -441,8 +439,7 @@ public class StreamThread extends Thread { stateDirectory, cache, time, - () -> createProducer(taskId), - streamsMetrics.taskClosedSensor); + () -> createProducer(taskId)); } private Producer createProducer(final TaskId id) { @@ -470,9 +467,11 @@ public class StreamThread extends Thread { } static class StandbyTaskCreator extends AbstractTaskCreator { + private final Sensor createTaskSensor; + StandbyTaskCreator(final InternalTopologyBuilder builder, final StreamsConfig config, - final StreamsMetricsThreadImpl streamsMetrics, + final StreamsMetricsImpl streamsMetrics, final StateDirectory stateDirectory, final ChangelogReader storeChangelogReader, final Time time, @@ -485,13 +484,14 @@ public class StreamThread extends Thread { storeChangelogReader, time, log); + createTaskSensor = ThreadMetrics.createTaskSensor(streamsMetrics); } @Override StandbyTask createTask(final Consumer consumer, final TaskId taskId, final Set partitions) { - streamsMetrics.taskCreatedSensor.record(); + createTaskSensor.record(); final ProcessorTopology topology = builder.build(taskId.topicGroupId); @@ -516,47 +516,6 @@ public class StreamThread extends Thread { } } - static class StreamsMetricsThreadImpl extends StreamsMetricsImpl { - - private final Sensor commitTimeSensor; - private final Sensor pollTimeSensor; - private final Sensor processTimeSensor; - private final Sensor punctuateTimeSensor; - private final Sensor taskCreatedSensor; - private final Sensor taskClosedSensor; - - StreamsMetricsThreadImpl(final Metrics metrics, final String threadName) { - super(metrics, threadName); - final String group = "stream-metrics"; - - commitTimeSensor = threadLevelSensor("commit-latency", Sensor.RecordingLevel.INFO); - addAvgMaxLatency(commitTimeSensor, group, tagMap(), "commit"); - addInvocationRateAndCount(commitTimeSensor, group, tagMap(), "commit"); - - pollTimeSensor = threadLevelSensor("poll-latency", Sensor.RecordingLevel.INFO); - addAvgMaxLatency(pollTimeSensor, group, tagMap(), "poll"); - // can't use addInvocationRateAndCount due to non-standard description string - pollTimeSensor.add(metrics.metricName("poll-rate", group, "The average per-second number of record-poll calls", tagMap()), new Rate(TimeUnit.SECONDS, new Count())); - pollTimeSensor.add(metrics.metricName("poll-total", group, "The total number of record-poll calls", tagMap()), new CumulativeCount()); - - processTimeSensor = threadLevelSensor("process-latency", Sensor.RecordingLevel.INFO); - addAvgMaxLatency(processTimeSensor, group, tagMap(), "process"); - addInvocationRateAndCount(processTimeSensor, group, tagMap(), "process"); - - punctuateTimeSensor = threadLevelSensor("punctuate-latency", Sensor.RecordingLevel.INFO); - addAvgMaxLatency(punctuateTimeSensor, group, tagMap(), "punctuate"); - addInvocationRateAndCount(punctuateTimeSensor, group, tagMap(), "punctuate"); - - taskCreatedSensor = threadLevelSensor("task-created", Sensor.RecordingLevel.INFO); - taskCreatedSensor.add(metrics.metricName("task-created-rate", "stream-metrics", "The average per-second number of newly created tasks", tagMap()), new Rate(TimeUnit.SECONDS, new Count())); - taskCreatedSensor.add(metrics.metricName("task-created-total", "stream-metrics", "The total number of newly created tasks", tagMap()), new Total()); - - taskClosedSensor = threadLevelSensor("task-closed", Sensor.RecordingLevel.INFO); - taskClosedSensor.add(metrics.metricName("task-closed-rate", group, "The average per-second number of closed tasks", tagMap()), new Rate(TimeUnit.SECONDS, new Count())); - taskClosedSensor.add(metrics.metricName("task-closed-total", group, "The total number of closed tasks", tagMap()), new Total()); - } - } - private final Time time; private final Logger log; private final String logPrefix; @@ -566,9 +525,14 @@ public class StreamThread extends Thread { private final int maxPollTimeMs; private final String originalReset; private final TaskManager taskManager; - private final StreamsMetricsThreadImpl streamsMetrics; private final AtomicInteger assignmentErrorCode; + private final StreamsMetricsImpl streamsMetrics; + private final Sensor commitSensor; + private final Sensor pollSensor; + private final Sensor punctuateSensor; + private final Sensor processSensor; + private long now; private long lastPollMs; private long lastCommitMs; @@ -620,10 +584,7 @@ public class StreamThread extends Thread { threadProducer = clientSupplier.getProducer(producerConfigs); } - final StreamsMetricsThreadImpl streamsMetrics = new StreamsMetricsThreadImpl( - metrics, - threadClientId - ); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId); final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics); @@ -697,7 +658,7 @@ public class StreamThread extends Thread { final Consumer consumer, final String originalReset, final TaskManager taskManager, - final StreamsMetricsThreadImpl streamsMetrics, + final StreamsMetricsImpl streamsMetrics, final InternalTopologyBuilder builder, final String threadClientId, final LogContext logContext, @@ -707,9 +668,24 @@ public class StreamThread extends Thread { this.stateLock = new Object(); this.standbyRecords = new HashMap<>(); + this.streamsMetrics = streamsMetrics; + this.commitSensor = ThreadMetrics.commitSensor(streamsMetrics); + this.pollSensor = ThreadMetrics.pollSensor(streamsMetrics); + this.processSensor = ThreadMetrics.processSensor(streamsMetrics); + this.punctuateSensor = ThreadMetrics.punctuateSensor(streamsMetrics); + + // The following sensors are created here but their references are not stored in this object, since within + // this object they are not recorded. The sensors are created here so that the stream threads starts with all + // its metrics initialised. Otherwise, those sensors would have been created during processing, which could + // lead to missing metrics. For instance, if no task were created, the metrics for created and closed + // tasks would never be added to the metrics. + ThreadMetrics.createTaskSensor(streamsMetrics); + ThreadMetrics.closeTaskSensor(streamsMetrics); + ThreadMetrics.skipRecordSensor(streamsMetrics); + ThreadMetrics.commitOverTasksSensor(streamsMetrics); + this.time = time; this.builder = builder; - this.streamsMetrics = streamsMetrics; this.logPrefix = logContext.logPrefix(); this.log = logContext.logger(StreamThread.class); this.rebalanceListener = new RebalanceListener(time, taskManager, this, this.log); @@ -857,7 +833,7 @@ public class StreamThread extends Thread { final long pollLatency = advanceNowAndComputeLatency(); if (records != null && !records.isEmpty()) { - streamsMetrics.pollTimeSensor.record(pollLatency, now); + pollSensor.record(pollLatency, now); addRecordsToTasks(records); } @@ -891,14 +867,14 @@ public class StreamThread extends Thread { if (processed > 0) { final long processLatency = advanceNowAndComputeLatency(); - streamsMetrics.processTimeSensor.record(processLatency / (double) processed, now); + processSensor.record(processLatency / (double) processed, now); // commit any tasks that have requested a commit final int committed = taskManager.maybeCommitActiveTasksPerUserRequested(); if (committed > 0) { final long commitLatency = advanceNowAndComputeLatency(); - streamsMetrics.commitTimeSensor.record(commitLatency / (double) committed, now); + commitSensor.record(commitLatency / (double) committed, now); } } else { // if there is no records to be processed, exit immediately @@ -1031,7 +1007,7 @@ public class StreamThread extends Thread { final int punctuated = taskManager.punctuate(); if (punctuated > 0) { final long punctuateLatency = advanceNowAndComputeLatency(); - streamsMetrics.punctuateTimeSensor.record(punctuateLatency / (double) punctuated, now); + punctuateSensor.record(punctuateLatency / (double) punctuated, now); } return punctuated > 0; @@ -1057,7 +1033,7 @@ public class StreamThread extends Thread { committed += taskManager.commitAll(); if (committed > 0) { final long intervalCommitLatency = advanceNowAndComputeLatency(); - streamsMetrics.commitTimeSensor.record(intervalCommitLatency / (double) committed, now); + commitSensor.record(intervalCommitLatency / (double) committed, now); // try to purge the committed records for repartition topics if possible taskManager.maybePurgeCommitedRecords(); @@ -1074,7 +1050,7 @@ public class StreamThread extends Thread { final int commitPerRequested = taskManager.maybeCommitActiveTasksPerUserRequested(); if (commitPerRequested > 0) { final long requestCommitLatency = advanceNowAndComputeLatency(); - streamsMetrics.commitTimeSensor.record(requestCommitLatency / (double) committed, now); + commitSensor.record(requestCommitLatency / (double) committed, now); committed += commitPerRequested; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index 0a47fce8101..46b5669d601 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -20,11 +20,11 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Count; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; -import org.apache.kafka.common.metrics.stats.Total; import org.apache.kafka.streams.StreamsMetrics; import java.util.Arrays; @@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit; public class StreamsMetricsImpl implements StreamsMetrics { private final Metrics metrics; private final Map parentSensors; - private final Sensor skippedRecordsSensor; private final String threadName; private final Deque threadLevelSensors = new LinkedList<>(); @@ -52,6 +51,20 @@ public class StreamsMetricsImpl implements StreamsMetrics { private static final String SENSOR_PREFIX_DELIMITER = "."; private static final String SENSOR_NAME_DELIMITER = ".s."; + public static final String THREAD_ID_TAG = "client-id"; + public static final String TASK_ID_TAG = "task-id"; + + public static final String ALL_TASKS = "all"; + + public static final String LATENCY_SUFFIX = "-latency"; + public static final String AVG_SUFFIX = "-avg"; + public static final String MAX_SUFFIX = "-max"; + public static final String RATE_SUFFIX = "-rate"; + public static final String TOTAL_SUFFIX = "-total"; + + public static final String THREAD_LEVEL_GROUP = "stream-metrics"; + public static final String TASK_LEVEL_GROUP = "stream-task-metrics"; + public static final String PROCESSOR_NODE_METRICS_GROUP = "stream-processor-node-metrics"; public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id"; @@ -60,30 +73,47 @@ public class StreamsMetricsImpl implements StreamsMetrics { public StreamsMetricsImpl(final Metrics metrics, final String threadName) { Objects.requireNonNull(metrics, "Metrics cannot be null"); + this.metrics = metrics; this.threadName = threadName; - this.metrics = metrics; - this.parentSensors = new HashMap<>(); - - final String group = "stream-metrics"; - skippedRecordsSensor = threadLevelSensor("skipped-records", Sensor.RecordingLevel.INFO); - skippedRecordsSensor.add(new MetricName("skipped-records-rate", group, "The average per-second number of skipped records", tagMap()), new Rate(TimeUnit.SECONDS, new Count())); - skippedRecordsSensor.add(new MetricName("skipped-records-total", group, "The total number of skipped records", tagMap()), new Total()); } public final Sensor threadLevelSensor(final String sensorName, - final Sensor.RecordingLevel recordingLevel, + final RecordingLevel recordingLevel, final Sensor... parents) { synchronized (threadLevelSensors) { final String fullSensorName = threadSensorPrefix() + SENSOR_NAME_DELIMITER + sensorName; final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents); threadLevelSensors.push(fullSensorName); - return sensor; } } + private String threadSensorPrefix() { + return "internal" + SENSOR_PREFIX_DELIMITER + threadName; + } + + public Map threadLevelTagMap() { + final Map tagMap = new LinkedHashMap<>(); + tagMap.put(THREAD_ID_TAG, threadName); + return tagMap; + } + + public Map threadLevelTagMap(final String... tags) { + final Map tagMap = threadLevelTagMap(); + if (tags != null) { + if ((tags.length % 2) != 0) { + throw new IllegalArgumentException("Tags needs to be specified in key-value pairs"); + } + + for (int i = 0; i < tags.length; i += 2) { + tagMap.put(tags[i], tags[i + 1]); + } + } + return tagMap; + } + public final void removeAllThreadLevelSensors() { synchronized (threadLevelSensors) { while (!threadLevelSensors.isEmpty()) { @@ -92,13 +122,9 @@ public class StreamsMetricsImpl implements StreamsMetrics { } } - private String threadSensorPrefix() { - return "internal" + SENSOR_PREFIX_DELIMITER + threadName; - } - public final Sensor taskLevelSensor(final String taskName, final String sensorName, - final Sensor.RecordingLevel recordingLevel, + final RecordingLevel recordingLevel, final Sensor... parents) { final String key = taskSensorPrefix(taskName); synchronized (taskLevelSensors) { @@ -235,10 +261,6 @@ public class StreamsMetricsImpl implements StreamsMetrics { return taskSensorPrefix(taskName) + SENSOR_PREFIX_DELIMITER + "store" + SENSOR_PREFIX_DELIMITER + storeName; } - public final Sensor skippedRecordsSensor() { - return skippedRecordsSensor; - } - @Override public Sensor addSensor(final String name, final Sensor.RecordingLevel recordingLevel) { return metrics.sensor(name, recordingLevel); @@ -357,6 +379,28 @@ public class StreamsMetricsImpl implements StreamsMetrics { } + public static void addAvgAndMax(final Sensor sensor, + final String group, + final Map tags, + final String operation) { + sensor.add( + new MetricName( + operation + AVG_SUFFIX, + group, + "The average value of " + operation + ".", + tags), + new Avg() + ); + sensor.add( + new MetricName( + operation + MAX_SUFFIX, + group, + "The max value of " + operation + ".", + tags), + new Max() + ); + } + public static void addAvgMaxLatency(final Sensor sensor, final String group, final Map tags, @@ -382,25 +426,39 @@ public class StreamsMetricsImpl implements StreamsMetrics { public static void addInvocationRateAndCount(final Sensor sensor, final String group, final Map tags, - final String operation) { + final String operation, + final String descriptionOfInvocation, + final String descriptionOfRate) { sensor.add( new MetricName( - operation + "-rate", + operation + TOTAL_SUFFIX, group, - "The average number of occurrence of " + operation + " operation per second.", - tags - ), - new Rate(TimeUnit.SECONDS, new Count()) - ); - sensor.add( - new MetricName( - operation + "-total", - group, - "The total number of occurrence of " + operation + " operations.", + descriptionOfInvocation, tags ), new CumulativeCount() ); + sensor.add( + new MetricName( + operation + RATE_SUFFIX, + group, + descriptionOfRate, + tags + ), + new Rate(TimeUnit.SECONDS, new Count()) + ); + } + + public static void addInvocationRateAndCount(final Sensor sensor, + final String group, + final Map tags, + final String operation) { + addInvocationRateAndCount(sensor, + group, + tags, + operation, + "The total number of " + operation, + "The average per-second number of " + operation); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java new file mode 100644 index 00000000000..e177667b35f --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.metrics; + +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.Sensor.RecordingLevel; + +import java.util.Map; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ALL_TASKS; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_ID_TAG; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_LEVEL_GROUP; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMax; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount; + +public class ThreadMetrics { + private ThreadMetrics() {} + + private static final String COMMIT = "commit"; + private static final String POLL = "poll"; + private static final String PROCESS = "process"; + private static final String PUNCTUATE = "punctuate"; + private static final String CREATE_TASK = "task-created"; + private static final String CLOSE_TASK = "task-closed"; + private static final String SKIP_RECORD = "skipped-records"; + + private static final String TOTAL_DESCRIPTION = "The total number of "; + private static final String RATE_DESCRIPTION = "The average per-second number of "; + private static final String COMMIT_DESCRIPTION = "commit calls"; + private static final String COMMIT_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + COMMIT_DESCRIPTION; + private static final String COMMIT_RATE_DESCRIPTION = RATE_DESCRIPTION + COMMIT_DESCRIPTION; + private static final String CREATE_TASK_DESCRIPTION = "newly created tasks"; + private static final String CREATE_TASK_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + CREATE_TASK_DESCRIPTION; + private static final String CREATE_TASK_RATE_DESCRIPTION = RATE_DESCRIPTION + CREATE_TASK_DESCRIPTION; + private static final String CLOSE_TASK_DESCRIPTION = "closed tasks"; + private static final String CLOSE_TASK_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + CLOSE_TASK_DESCRIPTION; + private static final String CLOSE_TASK_RATE_DESCRIPTION = RATE_DESCRIPTION + CLOSE_TASK_DESCRIPTION; + private static final String POLL_DESCRIPTION = "poll calls"; + private static final String POLL_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + POLL_DESCRIPTION; + private static final String POLL_RATE_DESCRIPTION = RATE_DESCRIPTION + POLL_DESCRIPTION; + private static final String PROCESS_DESCRIPTION = "process calls"; + private static final String PROCESS_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + PROCESS_DESCRIPTION; + private static final String PROCESS_RATE_DESCRIPTION = RATE_DESCRIPTION + PROCESS_DESCRIPTION; + private static final String PUNCTUATE_DESCRIPTION = "punctuate calls"; + private static final String PUNCTUATE_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + PUNCTUATE_DESCRIPTION; + private static final String PUNCTUATE_RATE_DESCRIPTION = RATE_DESCRIPTION + PUNCTUATE_DESCRIPTION; + private static final String SKIP_RECORDS_DESCRIPTION = "skipped records"; + private static final String SKIP_RECORD_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + SKIP_RECORDS_DESCRIPTION; + private static final String SKIP_RECORD_RATE_DESCRIPTION = RATE_DESCRIPTION + SKIP_RECORDS_DESCRIPTION; + private static final String COMMIT_OVER_TASKS_DESCRIPTION = "commit calls over all tasks"; + private static final String COMMIT_OVER_TASKS_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + COMMIT_OVER_TASKS_DESCRIPTION; + private static final String COMMIT_OVER_TASKS_RATE_DESCRIPTION = RATE_DESCRIPTION + COMMIT_OVER_TASKS_DESCRIPTION; + + private static final String COMMIT_LATENCY = COMMIT + LATENCY_SUFFIX; + private static final String POLL_LATENCY = POLL + LATENCY_SUFFIX; + private static final String PROCESS_LATENCY = PROCESS + LATENCY_SUFFIX; + private static final String PUNCTUATE_LATENCY = PUNCTUATE + LATENCY_SUFFIX; + + public static Sensor createTaskSensor(final StreamsMetricsImpl streamsMetrics) { + final Sensor createTaskSensor = streamsMetrics.threadLevelSensor(CREATE_TASK, RecordingLevel.INFO); + addInvocationRateAndCount(createTaskSensor, + THREAD_LEVEL_GROUP, + streamsMetrics.threadLevelTagMap(), + CREATE_TASK, + CREATE_TASK_TOTAL_DESCRIPTION, + CREATE_TASK_RATE_DESCRIPTION); + return createTaskSensor; + } + + public static Sensor closeTaskSensor(final StreamsMetricsImpl streamsMetrics) { + final Sensor closeTaskSensor = streamsMetrics.threadLevelSensor(CLOSE_TASK, RecordingLevel.INFO); + addInvocationRateAndCount(closeTaskSensor, + THREAD_LEVEL_GROUP, + streamsMetrics.threadLevelTagMap(), + CLOSE_TASK, + CLOSE_TASK_TOTAL_DESCRIPTION, + CLOSE_TASK_RATE_DESCRIPTION); + return closeTaskSensor; + } + + public static Sensor commitSensor(final StreamsMetricsImpl streamsMetrics) { + final Sensor commitSensor = streamsMetrics.threadLevelSensor(COMMIT, Sensor.RecordingLevel.INFO); + final Map tagMap = streamsMetrics.threadLevelTagMap(); + addAvgAndMax(commitSensor, THREAD_LEVEL_GROUP, tagMap, COMMIT_LATENCY); + addInvocationRateAndCount(commitSensor, + THREAD_LEVEL_GROUP, + tagMap, + COMMIT, + COMMIT_TOTAL_DESCRIPTION, + COMMIT_RATE_DESCRIPTION); + return commitSensor; + } + + public static Sensor pollSensor(final StreamsMetricsImpl streamsMetrics) { + final Sensor pollSensor = streamsMetrics.threadLevelSensor(POLL, Sensor.RecordingLevel.INFO); + final Map tagMap = streamsMetrics.threadLevelTagMap(); + addAvgAndMax(pollSensor, THREAD_LEVEL_GROUP, tagMap, POLL_LATENCY); + addInvocationRateAndCount(pollSensor, + THREAD_LEVEL_GROUP, + tagMap, + POLL, + POLL_TOTAL_DESCRIPTION, + POLL_RATE_DESCRIPTION); + return pollSensor; + } + + public static Sensor processSensor(final StreamsMetricsImpl streamsMetrics) { + final Sensor processSensor = streamsMetrics.threadLevelSensor(PROCESS, Sensor.RecordingLevel.INFO); + final Map tagMap = streamsMetrics.threadLevelTagMap(); + addAvgAndMax(processSensor, THREAD_LEVEL_GROUP, tagMap, PROCESS_LATENCY); + addInvocationRateAndCount(processSensor, + THREAD_LEVEL_GROUP, + tagMap, + PROCESS, + PROCESS_TOTAL_DESCRIPTION, + PROCESS_RATE_DESCRIPTION); + + return processSensor; + } + + public static Sensor punctuateSensor(final StreamsMetricsImpl streamsMetrics) { + final Sensor punctuateSensor = streamsMetrics.threadLevelSensor(PUNCTUATE, Sensor.RecordingLevel.INFO); + final Map tagMap = streamsMetrics.threadLevelTagMap(); + addAvgAndMax(punctuateSensor, THREAD_LEVEL_GROUP, tagMap, PUNCTUATE_LATENCY); + addInvocationRateAndCount(punctuateSensor, + THREAD_LEVEL_GROUP, + tagMap, + PUNCTUATE, + PUNCTUATE_TOTAL_DESCRIPTION, + PUNCTUATE_RATE_DESCRIPTION); + + return punctuateSensor; + } + + public static Sensor skipRecordSensor(final StreamsMetricsImpl streamsMetrics) { + final Sensor skippedRecordsSensor = streamsMetrics.threadLevelSensor(SKIP_RECORD, Sensor.RecordingLevel.INFO); + addInvocationRateAndCount(skippedRecordsSensor, + THREAD_LEVEL_GROUP, + streamsMetrics.threadLevelTagMap(), + SKIP_RECORD, + SKIP_RECORD_TOTAL_DESCRIPTION, + SKIP_RECORD_RATE_DESCRIPTION); + + return skippedRecordsSensor; + } + + public static Sensor commitOverTasksSensor(final StreamsMetricsImpl streamsMetrics) { + final Sensor commitOverTasksSensor = streamsMetrics.threadLevelSensor(COMMIT, Sensor.RecordingLevel.DEBUG); + final Map tagMap = streamsMetrics.threadLevelTagMap(TASK_ID_TAG, ALL_TASKS); + addAvgAndMax(commitOverTasksSensor, + TASK_LEVEL_GROUP, + tagMap, + COMMIT_LATENCY); + addInvocationRateAndCount(commitOverTasksSensor, + TASK_LEVEL_GROUP, + tagMap, + COMMIT, + COMMIT_OVER_TASKS_TOTAL_DESCRIPTION, + COMMIT_OVER_TASKS_RATE_DESCRIPTION); + + return commitOverTasksSensor; + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java index 3f6c80691f0..bc4d89535e0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java @@ -25,11 +25,11 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; -import org.apache.kafka.streams.kstream.KGroupedStream; -import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.state.SessionStore; @@ -37,9 +37,9 @@ import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; +import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.After; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index 0dfd8ad45f3..5b207372964 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; import org.apache.kafka.streams.processor.internals.ToInternal; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueIterator; @@ -92,6 +93,8 @@ public class KStreamSessionWindowAggregateProcessorTest { final File stateDir = TestUtils.tempDirectory(); metrics = new Metrics(); final MockStreamsMetrics metrics = new MockStreamsMetrics(KStreamSessionWindowAggregateProcessorTest.this.metrics); + ThreadMetrics.skipRecordSensor(metrics); + context = new InternalMockProcessorContext( stateDir, Serdes.String(), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index ccd94de34c2..ab9a47e92af 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -246,7 +246,6 @@ public class StreamTaskTest { throw new TimeoutException("test"); } }, - null, null ); fail("Expected an exception"); @@ -301,7 +300,6 @@ public class StreamTaskTest { } } }, - null, null ); testTask.initializeTopology(); @@ -851,8 +849,7 @@ public class StreamTaskTest { public void flush() { flushed.set(true); } - }, - metrics.sensor("dummy")); + }); streamTask.flushState(); assertTrue(flushed.get()); } @@ -1427,8 +1424,7 @@ public class StreamTaskTest { stateDirectory, null, time, - () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer), - metrics.sensor("dummy")); + () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer)); task.initializeStateStores(); task.initializeTopology(); @@ -1498,8 +1494,7 @@ public class StreamTaskTest { stateDirectory, null, time, - () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer), - metrics.sensor("dummy")); + () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer)); } private StreamTask createStatefulTaskThatThrowsExceptionOnClose() { @@ -1520,8 +1515,7 @@ public class StreamTaskTest { stateDirectory, null, time, - () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer), - metrics.sensor("dummy")); + () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer)); } private StreamTask createStatelessTask(final StreamsConfig streamsConfig) { @@ -1546,8 +1540,7 @@ public class StreamTaskTest { stateDirectory, null, time, - () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer), - metrics.sensor("dummy")); + () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer)); } // this task will throw exception when processing (on partition2), flushing, suspending and closing @@ -1573,8 +1566,7 @@ public class StreamTaskTest { stateDirectory, null, time, - () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer), - metrics.sensor("dummy")) { + () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer)) { @Override protected void flushState() { throw new RuntimeException("KABOOM!"); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 1de39d2274b..7c17ca7a45c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -56,6 +56,7 @@ import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskMetadata; import org.apache.kafka.streams.processor.ThreadMetadata; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; @@ -254,35 +255,70 @@ public class StreamThreadTest { final StreamThread thread = createStreamThread(clientId, config, false); final String defaultGroupName = "stream-metrics"; final Map defaultTags = Collections.singletonMap("client-id", thread.getName()); + final String descriptionIsNotVerified = ""; - assertNotNull(metrics.metrics().get(metrics.metricName("commit-latency-avg", defaultGroupName, "The average commit time in ms", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("commit-latency-max", defaultGroupName, "The maximum commit time in ms", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("commit-rate", defaultGroupName, "The average per-second number of commit calls", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("commit-total", defaultGroupName, "The total number of commit calls", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("poll-latency-avg", defaultGroupName, "The average poll time in ms", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("poll-latency-max", defaultGroupName, "The maximum poll time in ms", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("poll-rate", defaultGroupName, "The average per-second number of record-poll calls", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("poll-total", defaultGroupName, "The total number of record-poll calls", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("process-latency-avg", defaultGroupName, "The average process time in ms", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("process-latency-max", defaultGroupName, "The maximum process time in ms", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("process-rate", defaultGroupName, "The average per-second number of process calls", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("process-total", defaultGroupName, "The total number of process calls", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("punctuate-latency-avg", defaultGroupName, "The average punctuate time in ms", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("punctuate-latency-max", defaultGroupName, "The maximum punctuate time in ms", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("punctuate-rate", defaultGroupName, "The average per-second number of punctuate calls", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("punctuate-total", defaultGroupName, "The total number of punctuate calls", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("task-created-rate", defaultGroupName, "The average per-second number of newly created tasks", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("task-created-total", defaultGroupName, "The total number of newly created tasks", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("task-closed-rate", defaultGroupName, "The average per-second number of closed tasks", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("task-closed-total", defaultGroupName, "The total number of closed tasks", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("skipped-records-rate", defaultGroupName, "The average per-second number of skipped records.", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("skipped-records-total", defaultGroupName, "The total number of skipped records.", defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "commit-latency-avg", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "commit-latency-max", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "commit-rate", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "commit-total", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "poll-latency-avg", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "poll-latency-max", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "poll-rate", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "poll-total", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "process-latency-avg", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "process-latency-max", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "process-rate", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "process-total", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "punctuate-latency-avg", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "punctuate-latency-max", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "punctuate-rate", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "punctuate-total", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "task-created-rate", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "task-created-total", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "task-closed-rate", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "task-closed-total", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "skipped-records-rate", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "skipped-records-total", defaultGroupName, descriptionIsNotVerified, defaultTags))); + + final String taskGroupName = "stream-task-metrics"; + final Map taskTags = + mkMap(mkEntry("task-id", "all"), mkEntry("client-id", thread.getName())); + assertNotNull(metrics.metrics().get(metrics.metricName( + "commit-latency-avg", taskGroupName, descriptionIsNotVerified, taskTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "commit-latency-max", taskGroupName, descriptionIsNotVerified, taskTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "commit-rate", taskGroupName, descriptionIsNotVerified, taskTags))); final JmxReporter reporter = new JmxReporter("kafka.streams"); metrics.addReporter(reporter); assertEquals(clientId + "-StreamThread-1", thread.getName()); assertTrue(reporter.containsMbean(String.format("kafka.streams:type=%s,client-id=%s", - defaultGroupName, thread.getName()))); + defaultGroupName, + thread.getName()))); + assertTrue(reporter.containsMbean("kafka.streams:type=stream-task-metrics,client-id=" + thread.getName() + ",task-id=all")); } @Test @@ -296,8 +332,7 @@ public class StreamThreadTest { final Consumer consumer = EasyMock.createNiceMock(Consumer.class); final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 1); - final StreamThread.StreamsMetricsThreadImpl streamsMetrics - = new StreamThread.StreamsMetricsThreadImpl(metrics, ""); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); final StreamThread thread = new StreamThread( mockTime, config, @@ -423,8 +458,7 @@ public class StreamThreadTest { final Consumer consumer = EasyMock.createNiceMock(Consumer.class); final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0); - final StreamThread.StreamsMetricsThreadImpl streamsMetrics - = new StreamThread.StreamsMetricsThreadImpl(metrics, ""); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); final StreamThread thread = new StreamThread( mockTime, config, @@ -459,8 +493,7 @@ public class StreamThreadTest { final Consumer consumer = EasyMock.createNiceMock(Consumer.class); final TaskManager taskManager = mockTaskManagerCommit(consumer, 2, 1); - final StreamThread.StreamsMetricsThreadImpl streamsMetrics - = new StreamThread.StreamsMetricsThreadImpl(metrics, ""); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); final StreamThread thread = new StreamThread( mockTime, config, @@ -610,8 +643,7 @@ public class StreamThreadTest { EasyMock.expectLastCall(); EasyMock.replay(taskManager, consumer); - final StreamThread.StreamsMetricsThreadImpl streamsMetrics - = new StreamThread.StreamsMetricsThreadImpl(metrics, ""); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); final StreamThread thread = new StreamThread( mockTime, config, @@ -644,8 +676,7 @@ public class StreamThreadTest { EasyMock.expectLastCall(); EasyMock.replay(taskManager, consumer); - final StreamThread.StreamsMetricsThreadImpl streamsMetrics - = new StreamThread.StreamsMetricsThreadImpl(metrics, ""); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); final StreamThread thread = new StreamThread( mockTime, config, @@ -672,8 +703,7 @@ public class StreamThreadTest { EasyMock.expectLastCall(); EasyMock.replay(taskManager, consumer); - final StreamThread.StreamsMetricsThreadImpl streamsMetrics - = new StreamThread.StreamsMetricsThreadImpl(metrics, ""); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); final StreamThread thread = new StreamThread( mockTime, config, @@ -1449,8 +1479,7 @@ public class StreamThreadTest { final Consumer consumer = EasyMock.createNiceMock(Consumer.class); final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0); - final StreamThread.StreamsMetricsThreadImpl streamsMetrics - = new StreamThread.StreamsMetricsThreadImpl(metrics, ""); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); final StreamThread thread = new StreamThread( mockTime, config, @@ -1489,8 +1518,7 @@ public class StreamThreadTest { final Consumer consumer = EasyMock.createNiceMock(Consumer.class); final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - final StreamThread.StreamsMetricsThreadImpl streamsMetrics - = new StreamThread.StreamsMetricsThreadImpl(metrics, ""); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); final StreamThread thread = new StreamThread( mockTime, config, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java index e1d6f0c086c..f381a58c123 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java @@ -22,7 +22,10 @@ import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.common.utils.MockTime; +import org.easymock.EasyMock; +import org.easymock.EasyMockSupport; import org.junit.Test; import java.util.Collections; @@ -38,8 +41,34 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; -public class StreamsMetricsImplTest { +public class StreamsMetricsImplTest extends EasyMockSupport { + + private final static String SENSOR_PREFIX_DELIMITER = "."; + private final static String SENSOR_NAME_DELIMITER = ".s."; + private final static String INTERNAL_PREFIX = "internal"; + + @Test + public void shouldGetThreadLevelSensor() { + final Metrics metrics = mock(Metrics.class); + final String threadName = "thread1"; + final String sensorName = "sensor1"; + final String expectedFullSensorName = + INTERNAL_PREFIX + SENSOR_PREFIX_DELIMITER + threadName + SENSOR_NAME_DELIMITER + sensorName; + final RecordingLevel recordingLevel = RecordingLevel.DEBUG; + final Sensor[] parents = {}; + EasyMock.expect(metrics.sensor(expectedFullSensorName, recordingLevel, parents)).andReturn(null); + + replayAll(); + + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName); + final Sensor sensor = streamsMetrics.threadLevelSensor(sensorName, recordingLevel); + + verifyAll(); + + assertNull(sensor); + } @Test(expected = NullPointerException.class) public void testNullMetrics() { @@ -92,13 +121,13 @@ public class StreamsMetricsImplTest { final Sensor parent1 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG); addAvgMaxLatency(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); - addInvocationRateAndCount(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); + addInvocationRateAndCount(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation, "", ""); final int numberOfTaskMetrics = registry.metrics().size(); final Sensor sensor1 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent1); addAvgMaxLatency(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); - addInvocationRateAndCount(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); + addInvocationRateAndCount(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation, "", ""); assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics)); @@ -108,13 +137,13 @@ public class StreamsMetricsImplTest { final Sensor parent2 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG); addAvgMaxLatency(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); - addInvocationRateAndCount(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); + addInvocationRateAndCount(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation, "", ""); assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics)); final Sensor sensor2 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent2); addAvgMaxLatency(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); - addInvocationRateAndCount(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); + addInvocationRateAndCount(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation, "", ""); assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java new file mode 100644 index 00000000000..89395d9c51a --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.metrics; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.Sensor.RecordingLevel; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Collections; +import java.util.Map; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ALL_TASKS; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_ID_TAG; +import static org.easymock.EasyMock.expect; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.powermock.api.easymock.PowerMock.createStrictMock; +import static org.powermock.api.easymock.PowerMock.mockStatic; +import static org.powermock.api.easymock.PowerMock.replay; +import static org.powermock.api.easymock.PowerMock.replayAll; +import static org.powermock.api.easymock.PowerMock.verify; +import static org.powermock.api.easymock.PowerMock.verifyAll; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(StreamsMetricsImpl.class) +public class ThreadMetricsTest { + + private static final String THREAD_LEVEL_GROUP = "stream-metrics"; + private static final String TASK_LEVEL_GROUP = "stream-task-metrics"; + + private final Metrics dummyMetrics = new Metrics(); + private final Sensor dummySensor = dummyMetrics.sensor("dummy"); + private final StreamsMetricsImpl streamsMetrics = createStrictMock(StreamsMetricsImpl.class); + private final Map dummyTagMap = Collections.singletonMap("hello", "world"); + + @Test + public void shouldGetCreateTaskSensor() { + final String operation = "task-created"; + final String totalDescription = "The total number of newly created tasks"; + final String rateDescription = "The average per-second number of newly created tasks"; + mockStatic(StreamsMetricsImpl.class); + expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor); + expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap); + StreamsMetricsImpl.addInvocationRateAndCount( + dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); + + replayAll(); + replay(StreamsMetricsImpl.class); + + final Sensor sensor = ThreadMetrics.createTaskSensor(streamsMetrics); + + verifyAll(); + verify(StreamsMetricsImpl.class); + + assertThat(sensor, is(dummySensor)); + } + + @Test + public void shouldGetCloseTaskSensor() { + final String operation = "task-closed"; + final String totalDescription = "The total number of closed tasks"; + final String rateDescription = "The average per-second number of closed tasks"; + mockStatic(StreamsMetricsImpl.class); + expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor); + expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap); + StreamsMetricsImpl.addInvocationRateAndCount( + dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); + + replayAll(); + replay(StreamsMetricsImpl.class); + + final Sensor sensor = ThreadMetrics.closeTaskSensor(streamsMetrics); + + verifyAll(); + verify(StreamsMetricsImpl.class); + + assertThat(sensor, is(dummySensor)); + } + + @Test + public void shouldGetCommitSensor() { + final String operation = "commit"; + final String operationLatency = operation + StreamsMetricsImpl.LATENCY_SUFFIX; + final String totalDescription = "The total number of commit calls"; + final String rateDescription = "The average per-second number of commit calls"; + mockStatic(StreamsMetricsImpl.class); + expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor); + expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap); + StreamsMetricsImpl.addInvocationRateAndCount( + dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); + StreamsMetricsImpl.addAvgAndMax( + dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency); + + replayAll(); + replay(StreamsMetricsImpl.class); + + final Sensor sensor = ThreadMetrics.commitSensor(streamsMetrics); + + verifyAll(); + verify(StreamsMetricsImpl.class); + + assertThat(sensor, is(dummySensor)); + } + + @Test + public void shouldGetPollSensor() { + final String operation = "poll"; + final String operationLatency = operation + StreamsMetricsImpl.LATENCY_SUFFIX; + final String totalDescription = "The total number of poll calls"; + final String rateDescription = "The average per-second number of poll calls"; + mockStatic(StreamsMetricsImpl.class); + expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor); + expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap); + StreamsMetricsImpl.addInvocationRateAndCount( + dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); + StreamsMetricsImpl.addAvgAndMax( + dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency); + + replayAll(); + replay(StreamsMetricsImpl.class); + + final Sensor sensor = ThreadMetrics.pollSensor(streamsMetrics); + + verifyAll(); + verify(StreamsMetricsImpl.class); + + assertThat(sensor, is(dummySensor)); + } + + @Test + public void shouldGetProcessSensor() { + final String operation = "process"; + final String operationLatency = operation + StreamsMetricsImpl.LATENCY_SUFFIX; + final String totalDescription = "The total number of process calls"; + final String rateDescription = "The average per-second number of process calls"; + mockStatic(StreamsMetricsImpl.class); + expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor); + expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap); + StreamsMetricsImpl.addInvocationRateAndCount( + dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); + StreamsMetricsImpl.addAvgAndMax( + dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency); + + replayAll(); + replay(StreamsMetricsImpl.class); + + final Sensor sensor = ThreadMetrics.processSensor(streamsMetrics); + + verifyAll(); + verify(StreamsMetricsImpl.class); + + assertThat(sensor, is(dummySensor)); + } + + @Test + public void shouldGetPunctuateSensor() { + final String operation = "punctuate"; + final String operationLatency = operation + StreamsMetricsImpl.LATENCY_SUFFIX; + final String totalDescription = "The total number of punctuate calls"; + final String rateDescription = "The average per-second number of punctuate calls"; + mockStatic(StreamsMetricsImpl.class); + expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor); + expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap); + StreamsMetricsImpl.addInvocationRateAndCount( + dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); + StreamsMetricsImpl.addAvgAndMax( + dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency); + + replayAll(); + replay(StreamsMetricsImpl.class); + + final Sensor sensor = ThreadMetrics.punctuateSensor(streamsMetrics); + + verifyAll(); + verify(StreamsMetricsImpl.class); + + assertThat(sensor, is(dummySensor)); + } + + @Test + public void shouldGetSkipRecordSensor() { + final String operation = "skipped-records"; + final String totalDescription = "The total number of skipped records"; + final String rateDescription = "The average per-second number of skipped records"; + mockStatic(StreamsMetricsImpl.class); + expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor); + expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap); + StreamsMetricsImpl.addInvocationRateAndCount( + dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); + + replayAll(); + replay(StreamsMetricsImpl.class); + + final Sensor sensor = ThreadMetrics.skipRecordSensor(streamsMetrics); + + verifyAll(); + verify(StreamsMetricsImpl.class); + + assertThat(sensor, is(dummySensor)); + } + + @Test + public void shouldGetCommitOverTasksSensor() { + final String operation = "commit"; + final String operationLatency = operation + StreamsMetricsImpl.LATENCY_SUFFIX; + final String totalDescription = "The total number of commit calls over all tasks"; + final String rateDescription = "The average per-second number of commit calls over all tasks"; + mockStatic(StreamsMetricsImpl.class); + expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.DEBUG)).andReturn(dummySensor); + expect(streamsMetrics.threadLevelTagMap(TASK_ID_TAG, ALL_TASKS)).andReturn(dummyTagMap); + StreamsMetricsImpl.addInvocationRateAndCount( + dummySensor, TASK_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); + StreamsMetricsImpl.addAvgAndMax( + dummySensor, TASK_LEVEL_GROUP, dummyTagMap, operationLatency); + + replayAll(); + replay(StreamsMetricsImpl.class); + + final Sensor sensor = ThreadMetrics.commitOverTasksSensor(streamsMetrics); + + verifyAll(); + verify(StreamsMetricsImpl.class); + + assertThat(sensor, is(dummySensor)); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index da2d46d6360..f48c31c1d91 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -317,8 +317,7 @@ public class StreamThreadStateStoreProviderTest { stateDirectory, null, new MockTime(), - () -> clientSupplier.getProducer(new HashMap<>()), - metrics.sensor("dummy")) { + () -> clientSupplier.getProducer(new HashMap<>())) { @Override protected void updateOffsetLimits() {} }; diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 38da0d82509..48c038c2d5f 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -32,6 +32,9 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Count; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.Total; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; @@ -277,10 +280,21 @@ public class TopologyTestDriver implements Closeable { .timeWindow(streamsConfig.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); metrics = new Metrics(metricConfig, mockWallClockTime); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( - metrics, - "topology-test-driver-virtual-thread" - ); + + final String threadName = "topology-test-driver-virtual-thread"; + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName); + final Sensor skippedRecordsSensor = streamsMetrics.threadLevelSensor("skipped-records", Sensor.RecordingLevel.INFO); + final String threadLevelGroup = "stream-metrics"; + skippedRecordsSensor.add(new MetricName("skipped-records-rate", + threadLevelGroup, + "The average per-second number of skipped records", + streamsMetrics.tagMap()), + new Rate(TimeUnit.SECONDS, new Count())); + skippedRecordsSensor.add(new MetricName("skipped-records-total", + threadLevelGroup, + "The total number of skipped records", + streamsMetrics.tagMap()), + new Total()); final ThreadCache cache = new ThreadCache( new LogContext("topology-test-driver "), Math.max(0, streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)), @@ -360,8 +374,7 @@ public class TopologyTestDriver implements Closeable { stateDirectory, cache, mockWallClockTime, - () -> producer, - metrics.sensor("dummy")); + () -> producer); task.initializeStateStores(); task.initializeTopology(); ((InternalProcessorContext) task.context()).setRecordContext(new ProcessorRecordContext( diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java index 34a7ed92c68..ed0274649bf 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java @@ -33,6 +33,7 @@ import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.kstream.ValueTransformer; import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; import java.io.File; @@ -214,10 +215,9 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S this.stateDir = stateDir; final MetricConfig metricConfig = new MetricConfig(); metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG); - this.metrics = new StreamsMetricsImpl( - new Metrics(metricConfig), - "mock-processor-context-virtual-thread" - ); + final String threadName = "mock-processor-context-virtual-thread"; + this.metrics = new StreamsMetricsImpl(new Metrics(metricConfig), threadName); + ThreadMetrics.skipRecordSensor(metrics); } @Override