From 17712b96c86d6c2cce695531d7e5f2e3d5718b21 Mon Sep 17 00:00:00 2001 From: cadonna Date: Mon, 3 Jun 2019 23:31:19 +0200 Subject: [PATCH] KAFKA-6819: Pt. 1 - Refactor thread-level Streams metrics (#6631) * StreamsMetricsImpl wraps the Kafka Streams' metrics registry and provides logic to create and register sensors and their corresponding metrics. An example for such logic can be found in threadLevelSensor(). Furthermore, StreamsMetricsmpl keeps track of the sensors on the different levels of an application, i.e., thread, task, etc., and provides logic to remove sensors per level, e.g., removeAllThreadLevelSensors(). There is one StreamsMetricsImpl object per application instance. * ThreadMetrics contains only static methods that specify all built-in thread-level sensors and metrics and provide logic to register and retrieve those thread-level sensors, e.g., commitSensor(). * From anywhere inside the code base with access to StreamsMetricsImpl, thread-level sensors can be accessed by using ThreadMetrics. * ThreadsMetrics does not inherit from StreamsMetricsImpl anymore. Reviewers: A. Sophie Blee-Goldman , John Roesler , Guozhang Wang --- build.gradle | 2 + .../kstream/internals/KStreamAggregate.java | 6 +- .../kstream/internals/KStreamKStreamJoin.java | 7 +- .../internals/KStreamKTableJoinProcessor.java | 7 +- .../kstream/internals/KStreamReduce.java | 6 +- .../KStreamSessionWindowAggregate.java | 5 +- .../internals/KStreamWindowAggregate.java | 9 +- .../internals/KTableKTableInnerJoin.java | 6 +- .../internals/KTableKTableLeftJoin.java | 6 +- .../internals/KTableKTableOuterJoin.java | 6 +- .../internals/KTableKTableRightJoin.java | 6 +- .../kstream/internals/KTableSource.java | 6 +- .../internals/GlobalStateUpdateTask.java | 3 +- .../processor/internals/RecordQueue.java | 16 +- .../processor/internals/StreamTask.java | 44 +--- .../processor/internals/StreamThread.java | 110 +++----- .../internals/metrics/StreamsMetricsImpl.java | 122 ++++++--- .../internals/metrics/ThreadMetrics.java | 179 +++++++++++++ .../integration/MetricsIntegrationTest.java | 8 +- ...amSessionWindowAggregateProcessorTest.java | 3 + .../processor/internals/StreamTaskTest.java | 20 +- .../processor/internals/StreamThreadTest.java | 106 +++++--- .../metrics/StreamsMetricsImplTest.java | 39 ++- .../internals/metrics/ThreadMetricsTest.java | 244 ++++++++++++++++++ .../StreamThreadStateStoreProviderTest.java | 3 +- .../kafka/streams/TopologyTestDriver.java | 25 +- .../processor/MockProcessorContext.java | 8 +- 27 files changed, 778 insertions(+), 224 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java diff --git a/build.gradle b/build.gradle index 07200a80b5f..516a06cfed2 100644 --- a/build.gradle +++ b/build.gradle @@ -1120,6 +1120,8 @@ project(':streams') { testCompile libs.log4j testCompile libs.junit testCompile libs.easymock + testCompile libs.powermockJunit4 + testCompile libs.powermockEasymock testCompile libs.bcpkix testCompile libs.hamcrest diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java index 8fd4f5b4b7d..ca7266f5aba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java @@ -16,12 +16,14 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.slf4j.Logger; @@ -57,6 +59,7 @@ public class KStreamAggregate implements KStreamAggProcessorSupplier { private TimestampedKeyValueStore store; private StreamsMetricsImpl metrics; + private Sensor skippedRecordsSensor; private TimestampedTupleForwarder tupleForwarder; @SuppressWarnings("unchecked") @@ -64,6 +67,7 @@ public class KStreamAggregate implements KStreamAggProcessorSupplier) context.getStateStore(storeName); tupleForwarder = new TimestampedTupleForwarder<>( store, @@ -80,7 +84,7 @@ public class KStreamAggregate implements KStreamAggProcessorSupplier implements ProcessorSupplier { private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class); @@ -57,12 +58,14 @@ class KStreamKStreamJoin implements ProcessorSupplier { private WindowStore otherWindow; private StreamsMetricsImpl metrics; + private Sensor skippedRecordsSensor; @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { super.init(context); metrics = (StreamsMetricsImpl) context.metrics(); + skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics); otherWindow = (WindowStore) context.getStateStore(otherWindowName); } @@ -81,7 +84,7 @@ class KStreamKStreamJoin implements ProcessorSupplier { "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]", key, value, context().topic(), context().partition(), context().offset() ); - metrics.skippedRecordsSensor().record(); + skippedRecordsSensor.record(); return; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java index dcf0799c190..92fd4d52e5a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java @@ -16,11 +16,13 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +36,7 @@ class KStreamKTableJoinProcessor extends AbstractProcessor joiner; private final boolean leftJoin; private StreamsMetricsImpl metrics; + private Sensor skippedRecordsSensor; KStreamKTableJoinProcessor(final KTableValueGetter valueGetter, final KeyValueMapper keyMapper, @@ -49,6 +52,8 @@ class KStreamKTableJoinProcessor extends AbstractProcessor extends AbstractProcessor implements KStreamAggProcessorSupplier store; private TimestampedTupleForwarder tupleForwarder; private StreamsMetricsImpl metrics; + private Sensor skippedRecordsSensor; @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { super.init(context); metrics = (StreamsMetricsImpl) context.metrics(); + skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics); store = (TimestampedKeyValueStore) context.getStateStore(storeName); tupleForwarder = new TimestampedTupleForwarder<>( store, @@ -78,7 +82,7 @@ public class KStreamReduce implements KStreamAggProcessorSupplier implements KStreamAggProce private StreamsMetricsImpl metrics; private InternalProcessorContext internalProcessorContext; private Sensor lateRecordDropSensor; + private Sensor skippedRecordsSensor; private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; @SuppressWarnings("unchecked") @@ -92,6 +94,7 @@ public class KStreamSessionWindowAggregate implements KStreamAggProce internalProcessorContext = (InternalProcessorContext) context; metrics = (StreamsMetricsImpl) context.metrics(); lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext); + skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics); store = (SessionStore) context.getStateStore(storeName); tupleForwarder = new SessionTupleForwarder<>(store, context, new SessionCacheFlushListener<>(context), sendOldValues); @@ -106,7 +109,7 @@ public class KStreamSessionWindowAggregate implements KStreamAggProce "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]", value, context().topic(), context().partition(), context().offset() ); - metrics.skippedRecordsSensor().record(); + skippedRecordsSensor.record(); return; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index 3458ca02e1c..2983a3a9c7e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; import org.apache.kafka.streams.state.TimestampedWindowStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.slf4j.Logger; @@ -79,6 +80,7 @@ public class KStreamWindowAggregate implements KStr private StreamsMetricsImpl metrics; private InternalProcessorContext internalProcessorContext; private Sensor lateRecordDropSensor; + private Sensor skippedRecordsSensor; private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; @SuppressWarnings("unchecked") @@ -86,8 +88,11 @@ public class KStreamWindowAggregate implements KStr public void init(final ProcessorContext context) { super.init(context); internalProcessorContext = (InternalProcessorContext) context; - metrics = (StreamsMetricsImpl) context.metrics(); + + metrics = internalProcessorContext.metrics(); + lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext); + skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics); windowStore = (TimestampedWindowStore) context.getStateStore(storeName); tupleForwarder = new TimestampedTupleForwarder<>( windowStore, @@ -103,7 +108,7 @@ public class KStreamWindowAggregate implements KStr "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]", value, context().topic(), context().partition(), context().offset() ); - metrics.skippedRecordsSensor().record(); + skippedRecordsSensor.record(); return; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java index 2d4cbc8415c..005ea809b3f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.processor.AbstractProcessor; @@ -23,6 +24,7 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +68,7 @@ class KTableKTableInnerJoin extends KTableKTableAbstractJoin valueGetter; private StreamsMetricsImpl metrics; + private Sensor skippedRecordsSensor; KTableKTableJoinProcessor(final KTableValueGetter valueGetter) { this.valueGetter = valueGetter; @@ -75,6 +78,7 @@ class KTableKTableInnerJoin extends KTableKTableAbstractJoin extends KTableKTableAbstractJoin extends KTableKTableAbstractJoin valueGetter; private StreamsMetricsImpl metrics; + private Sensor skippedRecordsSensor; KTableKTableLeftJoinProcessor(final KTableValueGetter valueGetter) { this.valueGetter = valueGetter; @@ -74,6 +77,7 @@ class KTableKTableLeftJoin extends KTableKTableAbstractJoin extends KTableKTableAbstractJoin extends KTableKTableAbstractJoin valueGetter; private StreamsMetricsImpl metrics; + private Sensor skippedRecordsSensor; KTableKTableOuterJoinProcessor(final KTableValueGetter valueGetter) { this.valueGetter = valueGetter; @@ -73,6 +76,7 @@ class KTableKTableOuterJoin extends KTableKTableAbstractJoin extends KTableKTableAbstractJoin extends KTableKTableAbstractJoin valueGetter; private StreamsMetricsImpl metrics; + private Sensor skippedRecordsSensor; KTableKTableRightJoinProcessor(final KTableValueGetter valueGetter) { this.valueGetter = valueGetter; @@ -72,6 +75,7 @@ class KTableKTableRightJoin extends KTableKTableAbstractJoin extends KTableKTableAbstractJoin implements ProcessorSupplier { private TimestampedKeyValueStore store; private TimestampedTupleForwarder tupleForwarder; private StreamsMetricsImpl metrics; + private Sensor skippedRecordsSensor; @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { super.init(context); metrics = (StreamsMetricsImpl) context.metrics(); + skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics); if (queryableName != null) { store = (TimestampedKeyValueStore) context.getStateStore(queryableName); tupleForwarder = new TimestampedTupleForwarder<>( @@ -94,7 +98,7 @@ public class KTableSource implements ProcessorSupplier { "Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]", context().topic(), context().partition(), context().offset() ); - metrics.skippedRecordsSensor().record(); + skippedRecordsSensor.record(); return; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 29c861e4291..d6f60e35133 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; import java.io.IOException; import java.util.HashMap; @@ -69,7 +70,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { source, deserializationExceptionHandler, logContext, - processorContext.metrics().skippedRecordsSensor() + ThreadMetrics.skipRecordSensor(processorContext.metrics()) ) ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 86f5be383cd..6f3e70b9922 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -18,17 +18,17 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TimestampExtractor; -import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; import org.slf4j.Logger; import java.util.ArrayDeque; - /** * RecordQueue is a FIFO queue of {@link StampedRecord} (ConsumerRecord + timestamp). It also keeps track of the * partition timestamp defined as the minimum timestamp of records in its queue; in addition, its partition @@ -48,6 +48,8 @@ public class RecordQueue { private StampedRecord headRecord = null; + private Sensor skipRecordsSensor; + RecordQueue(final TopicPartition partition, final SourceNode source, final TimestampExtractor timestampExtractor, @@ -58,13 +60,14 @@ public class RecordQueue { this.partition = partition; this.fifoQueue = new ArrayDeque<>(); this.timestampExtractor = timestampExtractor; - this.recordDeserializer = new RecordDeserializer( + this.processorContext = processorContext; + skipRecordsSensor = ThreadMetrics.skipRecordSensor(processorContext.metrics()); + recordDeserializer = new RecordDeserializer( source, deserializationExceptionHandler, logContext, - processorContext.metrics().skippedRecordsSensor() + skipRecordsSensor ); - this.processorContext = processorContext; this.log = logContext.logger(RecordQueue.class); } @@ -180,7 +183,8 @@ public class RecordQueue { "Skipping record due to negative extracted timestamp. topic=[{}] partition=[{}] offset=[{}] extractedTimestamp=[{}] extractor=[{}]", deserialized.topic(), deserialized.partition(), deserialized.offset(), timestamp, timestampExtractor.getClass().getCanonicalName() ); - ((StreamsMetricsImpl) processorContext.metrics()).skippedRecordsSensor().record(); + + skipRecordsSensor.record(); continue; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 80653c5e5d4..4fd57ba3db8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -45,6 +45,7 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; import org.apache.kafka.streams.state.internals.ThreadCache; import java.io.IOException; @@ -78,7 +79,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator private final PunctuationQueue systemTimePunctuationQueue; private final ProducerSupplier producerSupplier; - private Sensor closeSensor; + private Sensor closeTaskSensor; private long idleStartTime; private Producer producer; private boolean commitRequested = false; @@ -96,24 +97,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator final String group = "stream-task-metrics"; // first add the global operation metrics if not yet, with the global tags only - final Map allTagMap = metrics.tagMap("task-id", "all"); - final Sensor parent = metrics.threadLevelSensor("commit", Sensor.RecordingLevel.DEBUG); - parent.add( - new MetricName("commit-latency-avg", group, "The average latency of commit operation.", allTagMap), - new Avg() - ); - parent.add( - new MetricName("commit-latency-max", group, "The max latency of commit operation.", allTagMap), - new Max() - ); - parent.add( - new MetricName("commit-rate", group, "The average number of occurrence of commit operation per second.", allTagMap), - new Rate(TimeUnit.SECONDS, new Count()) - ); - parent.add( - new MetricName("commit-total", group, "The total number of occurrence of commit operations.", allTagMap), - new CumulativeCount() - ); + final Sensor parent = ThreadMetrics.commitOverTasksSensor(metrics); // add the operation metrics with additional tags final Map tagMap = metrics.tagMap("task-id", taskName); @@ -167,9 +151,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator final StateDirectory stateDirectory, final ThreadCache cache, final Time time, - final ProducerSupplier producerSupplier, - final Sensor closeSensor) { - this(id, partitions, topology, consumer, changelogReader, config, metrics, stateDirectory, cache, time, producerSupplier, null, closeSensor); + final ProducerSupplier producerSupplier) { + this(id, partitions, topology, consumer, changelogReader, config, metrics, stateDirectory, cache, time, producerSupplier, null); } public StreamTask(final TaskId id, @@ -178,20 +161,20 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator final Consumer consumer, final ChangelogReader changelogReader, final StreamsConfig config, - final StreamsMetricsImpl metrics, + final StreamsMetricsImpl streamsMetrics, final StateDirectory stateDirectory, final ThreadCache cache, final Time time, final ProducerSupplier producerSupplier, - final RecordCollector recordCollector, - final Sensor closeSensor) { + final RecordCollector recordCollector) { super(id, partitions, topology, consumer, changelogReader, false, stateDirectory, config); this.time = time; this.producerSupplier = producerSupplier; this.producer = producerSupplier.get(); - this.closeSensor = closeSensor; - this.taskMetrics = new TaskMetrics(id, metrics); + this.taskMetrics = new TaskMetrics(id, streamsMetrics); + + closeTaskSensor = ThreadMetrics.closeTaskSensor(streamsMetrics); final ProductionExceptionHandler productionExceptionHandler = config.defaultProductionExceptionHandler(); @@ -200,8 +183,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator id.toString(), logContext, productionExceptionHandler, - metrics.skippedRecordsSensor() - ); + ThreadMetrics.skipRecordSensor(streamsMetrics)); } else { this.recordCollector = recordCollector; } @@ -220,7 +202,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator final Map partitionQueues = new HashMap<>(); // initialize the topology with its own context - final ProcessorContextImpl processorContextImpl = new ProcessorContextImpl(id, this, config, this.recordCollector, stateMgr, metrics, cache); + final ProcessorContextImpl processorContextImpl = new ProcessorContextImpl(id, this, config, this.recordCollector, stateMgr, streamsMetrics, cache); processorContext = processorContextImpl; final TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor(); @@ -691,7 +673,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator partitionGroup.close(); taskMetrics.removeAllSensors(); - closeSensor.record(); + closeTaskSensor.record(); if (firstException != null) { throw firstException; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 419e1811963..4dc1bde9f3a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -31,9 +31,6 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.Count; -import org.apache.kafka.common.metrics.stats.Rate; -import org.apache.kafka.common.metrics.stats.Total; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; @@ -45,8 +42,8 @@ import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskMetadata; import org.apache.kafka.streams.processor.ThreadMetadata; -import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; import org.apache.kafka.streams.state.internals.ThreadCache; import org.slf4j.Logger; @@ -62,7 +59,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.singleton; @@ -340,7 +336,7 @@ public class StreamThread extends Thread { final String applicationId; final InternalTopologyBuilder builder; final StreamsConfig config; - final StreamsMetricsThreadImpl streamsMetrics; + final StreamsMetricsImpl streamsMetrics; final StateDirectory stateDirectory; final ChangelogReader storeChangelogReader; final Time time; @@ -349,7 +345,7 @@ public class StreamThread extends Thread { AbstractTaskCreator(final InternalTopologyBuilder builder, final StreamsConfig config, - final StreamsMetricsThreadImpl streamsMetrics, + final StreamsMetricsImpl streamsMetrics, final StateDirectory stateDirectory, final ChangelogReader storeChangelogReader, final Time time, @@ -398,10 +394,11 @@ public class StreamThread extends Thread { private final KafkaClientSupplier clientSupplier; private final String threadClientId; private final Producer threadProducer; + private final Sensor createTaskSensor; TaskCreator(final InternalTopologyBuilder builder, final StreamsConfig config, - final StreamsMetricsThreadImpl streamsMetrics, + final StreamsMetricsImpl streamsMetrics, final StateDirectory stateDirectory, final ChangelogReader storeChangelogReader, final ThreadCache cache, @@ -422,13 +419,14 @@ public class StreamThread extends Thread { this.clientSupplier = clientSupplier; this.threadProducer = threadProducer; this.threadClientId = threadClientId; + createTaskSensor = ThreadMetrics.createTaskSensor(streamsMetrics); } @Override StreamTask createTask(final Consumer consumer, final TaskId taskId, final Set partitions) { - streamsMetrics.taskCreatedSensor.record(); + createTaskSensor.record(); return new StreamTask( taskId, @@ -441,8 +439,7 @@ public class StreamThread extends Thread { stateDirectory, cache, time, - () -> createProducer(taskId), - streamsMetrics.taskClosedSensor); + () -> createProducer(taskId)); } private Producer createProducer(final TaskId id) { @@ -470,9 +467,11 @@ public class StreamThread extends Thread { } static class StandbyTaskCreator extends AbstractTaskCreator { + private final Sensor createTaskSensor; + StandbyTaskCreator(final InternalTopologyBuilder builder, final StreamsConfig config, - final StreamsMetricsThreadImpl streamsMetrics, + final StreamsMetricsImpl streamsMetrics, final StateDirectory stateDirectory, final ChangelogReader storeChangelogReader, final Time time, @@ -485,13 +484,14 @@ public class StreamThread extends Thread { storeChangelogReader, time, log); + createTaskSensor = ThreadMetrics.createTaskSensor(streamsMetrics); } @Override StandbyTask createTask(final Consumer consumer, final TaskId taskId, final Set partitions) { - streamsMetrics.taskCreatedSensor.record(); + createTaskSensor.record(); final ProcessorTopology topology = builder.build(taskId.topicGroupId); @@ -516,47 +516,6 @@ public class StreamThread extends Thread { } } - static class StreamsMetricsThreadImpl extends StreamsMetricsImpl { - - private final Sensor commitTimeSensor; - private final Sensor pollTimeSensor; - private final Sensor processTimeSensor; - private final Sensor punctuateTimeSensor; - private final Sensor taskCreatedSensor; - private final Sensor taskClosedSensor; - - StreamsMetricsThreadImpl(final Metrics metrics, final String threadName) { - super(metrics, threadName); - final String group = "stream-metrics"; - - commitTimeSensor = threadLevelSensor("commit-latency", Sensor.RecordingLevel.INFO); - addAvgMaxLatency(commitTimeSensor, group, tagMap(), "commit"); - addInvocationRateAndCount(commitTimeSensor, group, tagMap(), "commit"); - - pollTimeSensor = threadLevelSensor("poll-latency", Sensor.RecordingLevel.INFO); - addAvgMaxLatency(pollTimeSensor, group, tagMap(), "poll"); - // can't use addInvocationRateAndCount due to non-standard description string - pollTimeSensor.add(metrics.metricName("poll-rate", group, "The average per-second number of record-poll calls", tagMap()), new Rate(TimeUnit.SECONDS, new Count())); - pollTimeSensor.add(metrics.metricName("poll-total", group, "The total number of record-poll calls", tagMap()), new CumulativeCount()); - - processTimeSensor = threadLevelSensor("process-latency", Sensor.RecordingLevel.INFO); - addAvgMaxLatency(processTimeSensor, group, tagMap(), "process"); - addInvocationRateAndCount(processTimeSensor, group, tagMap(), "process"); - - punctuateTimeSensor = threadLevelSensor("punctuate-latency", Sensor.RecordingLevel.INFO); - addAvgMaxLatency(punctuateTimeSensor, group, tagMap(), "punctuate"); - addInvocationRateAndCount(punctuateTimeSensor, group, tagMap(), "punctuate"); - - taskCreatedSensor = threadLevelSensor("task-created", Sensor.RecordingLevel.INFO); - taskCreatedSensor.add(metrics.metricName("task-created-rate", "stream-metrics", "The average per-second number of newly created tasks", tagMap()), new Rate(TimeUnit.SECONDS, new Count())); - taskCreatedSensor.add(metrics.metricName("task-created-total", "stream-metrics", "The total number of newly created tasks", tagMap()), new Total()); - - taskClosedSensor = threadLevelSensor("task-closed", Sensor.RecordingLevel.INFO); - taskClosedSensor.add(metrics.metricName("task-closed-rate", group, "The average per-second number of closed tasks", tagMap()), new Rate(TimeUnit.SECONDS, new Count())); - taskClosedSensor.add(metrics.metricName("task-closed-total", group, "The total number of closed tasks", tagMap()), new Total()); - } - } - private final Time time; private final Logger log; private final String logPrefix; @@ -566,9 +525,14 @@ public class StreamThread extends Thread { private final int maxPollTimeMs; private final String originalReset; private final TaskManager taskManager; - private final StreamsMetricsThreadImpl streamsMetrics; private final AtomicInteger assignmentErrorCode; + private final StreamsMetricsImpl streamsMetrics; + private final Sensor commitSensor; + private final Sensor pollSensor; + private final Sensor punctuateSensor; + private final Sensor processSensor; + private long now; private long lastPollMs; private long lastCommitMs; @@ -620,10 +584,7 @@ public class StreamThread extends Thread { threadProducer = clientSupplier.getProducer(producerConfigs); } - final StreamsMetricsThreadImpl streamsMetrics = new StreamsMetricsThreadImpl( - metrics, - threadClientId - ); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId); final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics); @@ -697,7 +658,7 @@ public class StreamThread extends Thread { final Consumer consumer, final String originalReset, final TaskManager taskManager, - final StreamsMetricsThreadImpl streamsMetrics, + final StreamsMetricsImpl streamsMetrics, final InternalTopologyBuilder builder, final String threadClientId, final LogContext logContext, @@ -707,9 +668,24 @@ public class StreamThread extends Thread { this.stateLock = new Object(); this.standbyRecords = new HashMap<>(); + this.streamsMetrics = streamsMetrics; + this.commitSensor = ThreadMetrics.commitSensor(streamsMetrics); + this.pollSensor = ThreadMetrics.pollSensor(streamsMetrics); + this.processSensor = ThreadMetrics.processSensor(streamsMetrics); + this.punctuateSensor = ThreadMetrics.punctuateSensor(streamsMetrics); + + // The following sensors are created here but their references are not stored in this object, since within + // this object they are not recorded. The sensors are created here so that the stream threads starts with all + // its metrics initialised. Otherwise, those sensors would have been created during processing, which could + // lead to missing metrics. For instance, if no task were created, the metrics for created and closed + // tasks would never be added to the metrics. + ThreadMetrics.createTaskSensor(streamsMetrics); + ThreadMetrics.closeTaskSensor(streamsMetrics); + ThreadMetrics.skipRecordSensor(streamsMetrics); + ThreadMetrics.commitOverTasksSensor(streamsMetrics); + this.time = time; this.builder = builder; - this.streamsMetrics = streamsMetrics; this.logPrefix = logContext.logPrefix(); this.log = logContext.logger(StreamThread.class); this.rebalanceListener = new RebalanceListener(time, taskManager, this, this.log); @@ -857,7 +833,7 @@ public class StreamThread extends Thread { final long pollLatency = advanceNowAndComputeLatency(); if (records != null && !records.isEmpty()) { - streamsMetrics.pollTimeSensor.record(pollLatency, now); + pollSensor.record(pollLatency, now); addRecordsToTasks(records); } @@ -891,14 +867,14 @@ public class StreamThread extends Thread { if (processed > 0) { final long processLatency = advanceNowAndComputeLatency(); - streamsMetrics.processTimeSensor.record(processLatency / (double) processed, now); + processSensor.record(processLatency / (double) processed, now); // commit any tasks that have requested a commit final int committed = taskManager.maybeCommitActiveTasksPerUserRequested(); if (committed > 0) { final long commitLatency = advanceNowAndComputeLatency(); - streamsMetrics.commitTimeSensor.record(commitLatency / (double) committed, now); + commitSensor.record(commitLatency / (double) committed, now); } } else { // if there is no records to be processed, exit immediately @@ -1031,7 +1007,7 @@ public class StreamThread extends Thread { final int punctuated = taskManager.punctuate(); if (punctuated > 0) { final long punctuateLatency = advanceNowAndComputeLatency(); - streamsMetrics.punctuateTimeSensor.record(punctuateLatency / (double) punctuated, now); + punctuateSensor.record(punctuateLatency / (double) punctuated, now); } return punctuated > 0; @@ -1057,7 +1033,7 @@ public class StreamThread extends Thread { committed += taskManager.commitAll(); if (committed > 0) { final long intervalCommitLatency = advanceNowAndComputeLatency(); - streamsMetrics.commitTimeSensor.record(intervalCommitLatency / (double) committed, now); + commitSensor.record(intervalCommitLatency / (double) committed, now); // try to purge the committed records for repartition topics if possible taskManager.maybePurgeCommitedRecords(); @@ -1074,7 +1050,7 @@ public class StreamThread extends Thread { final int commitPerRequested = taskManager.maybeCommitActiveTasksPerUserRequested(); if (commitPerRequested > 0) { final long requestCommitLatency = advanceNowAndComputeLatency(); - streamsMetrics.commitTimeSensor.record(requestCommitLatency / (double) committed, now); + commitSensor.record(requestCommitLatency / (double) committed, now); committed += commitPerRequested; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index 0a47fce8101..46b5669d601 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -20,11 +20,11 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Count; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; -import org.apache.kafka.common.metrics.stats.Total; import org.apache.kafka.streams.StreamsMetrics; import java.util.Arrays; @@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit; public class StreamsMetricsImpl implements StreamsMetrics { private final Metrics metrics; private final Map parentSensors; - private final Sensor skippedRecordsSensor; private final String threadName; private final Deque threadLevelSensors = new LinkedList<>(); @@ -52,6 +51,20 @@ public class StreamsMetricsImpl implements StreamsMetrics { private static final String SENSOR_PREFIX_DELIMITER = "."; private static final String SENSOR_NAME_DELIMITER = ".s."; + public static final String THREAD_ID_TAG = "client-id"; + public static final String TASK_ID_TAG = "task-id"; + + public static final String ALL_TASKS = "all"; + + public static final String LATENCY_SUFFIX = "-latency"; + public static final String AVG_SUFFIX = "-avg"; + public static final String MAX_SUFFIX = "-max"; + public static final String RATE_SUFFIX = "-rate"; + public static final String TOTAL_SUFFIX = "-total"; + + public static final String THREAD_LEVEL_GROUP = "stream-metrics"; + public static final String TASK_LEVEL_GROUP = "stream-task-metrics"; + public static final String PROCESSOR_NODE_METRICS_GROUP = "stream-processor-node-metrics"; public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id"; @@ -60,30 +73,47 @@ public class StreamsMetricsImpl implements StreamsMetrics { public StreamsMetricsImpl(final Metrics metrics, final String threadName) { Objects.requireNonNull(metrics, "Metrics cannot be null"); + this.metrics = metrics; this.threadName = threadName; - this.metrics = metrics; - this.parentSensors = new HashMap<>(); - - final String group = "stream-metrics"; - skippedRecordsSensor = threadLevelSensor("skipped-records", Sensor.RecordingLevel.INFO); - skippedRecordsSensor.add(new MetricName("skipped-records-rate", group, "The average per-second number of skipped records", tagMap()), new Rate(TimeUnit.SECONDS, new Count())); - skippedRecordsSensor.add(new MetricName("skipped-records-total", group, "The total number of skipped records", tagMap()), new Total()); } public final Sensor threadLevelSensor(final String sensorName, - final Sensor.RecordingLevel recordingLevel, + final RecordingLevel recordingLevel, final Sensor... parents) { synchronized (threadLevelSensors) { final String fullSensorName = threadSensorPrefix() + SENSOR_NAME_DELIMITER + sensorName; final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents); threadLevelSensors.push(fullSensorName); - return sensor; } } + private String threadSensorPrefix() { + return "internal" + SENSOR_PREFIX_DELIMITER + threadName; + } + + public Map threadLevelTagMap() { + final Map tagMap = new LinkedHashMap<>(); + tagMap.put(THREAD_ID_TAG, threadName); + return tagMap; + } + + public Map threadLevelTagMap(final String... tags) { + final Map tagMap = threadLevelTagMap(); + if (tags != null) { + if ((tags.length % 2) != 0) { + throw new IllegalArgumentException("Tags needs to be specified in key-value pairs"); + } + + for (int i = 0; i < tags.length; i += 2) { + tagMap.put(tags[i], tags[i + 1]); + } + } + return tagMap; + } + public final void removeAllThreadLevelSensors() { synchronized (threadLevelSensors) { while (!threadLevelSensors.isEmpty()) { @@ -92,13 +122,9 @@ public class StreamsMetricsImpl implements StreamsMetrics { } } - private String threadSensorPrefix() { - return "internal" + SENSOR_PREFIX_DELIMITER + threadName; - } - public final Sensor taskLevelSensor(final String taskName, final String sensorName, - final Sensor.RecordingLevel recordingLevel, + final RecordingLevel recordingLevel, final Sensor... parents) { final String key = taskSensorPrefix(taskName); synchronized (taskLevelSensors) { @@ -235,10 +261,6 @@ public class StreamsMetricsImpl implements StreamsMetrics { return taskSensorPrefix(taskName) + SENSOR_PREFIX_DELIMITER + "store" + SENSOR_PREFIX_DELIMITER + storeName; } - public final Sensor skippedRecordsSensor() { - return skippedRecordsSensor; - } - @Override public Sensor addSensor(final String name, final Sensor.RecordingLevel recordingLevel) { return metrics.sensor(name, recordingLevel); @@ -357,6 +379,28 @@ public class StreamsMetricsImpl implements StreamsMetrics { } + public static void addAvgAndMax(final Sensor sensor, + final String group, + final Map tags, + final String operation) { + sensor.add( + new MetricName( + operation + AVG_SUFFIX, + group, + "The average value of " + operation + ".", + tags), + new Avg() + ); + sensor.add( + new MetricName( + operation + MAX_SUFFIX, + group, + "The max value of " + operation + ".", + tags), + new Max() + ); + } + public static void addAvgMaxLatency(final Sensor sensor, final String group, final Map tags, @@ -382,25 +426,39 @@ public class StreamsMetricsImpl implements StreamsMetrics { public static void addInvocationRateAndCount(final Sensor sensor, final String group, final Map tags, - final String operation) { + final String operation, + final String descriptionOfInvocation, + final String descriptionOfRate) { sensor.add( new MetricName( - operation + "-rate", + operation + TOTAL_SUFFIX, group, - "The average number of occurrence of " + operation + " operation per second.", - tags - ), - new Rate(TimeUnit.SECONDS, new Count()) - ); - sensor.add( - new MetricName( - operation + "-total", - group, - "The total number of occurrence of " + operation + " operations.", + descriptionOfInvocation, tags ), new CumulativeCount() ); + sensor.add( + new MetricName( + operation + RATE_SUFFIX, + group, + descriptionOfRate, + tags + ), + new Rate(TimeUnit.SECONDS, new Count()) + ); + } + + public static void addInvocationRateAndCount(final Sensor sensor, + final String group, + final Map tags, + final String operation) { + addInvocationRateAndCount(sensor, + group, + tags, + operation, + "The total number of " + operation, + "The average per-second number of " + operation); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java new file mode 100644 index 00000000000..e177667b35f --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.metrics; + +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.Sensor.RecordingLevel; + +import java.util.Map; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ALL_TASKS; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_ID_TAG; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_LEVEL_GROUP; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMax; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount; + +public class ThreadMetrics { + private ThreadMetrics() {} + + private static final String COMMIT = "commit"; + private static final String POLL = "poll"; + private static final String PROCESS = "process"; + private static final String PUNCTUATE = "punctuate"; + private static final String CREATE_TASK = "task-created"; + private static final String CLOSE_TASK = "task-closed"; + private static final String SKIP_RECORD = "skipped-records"; + + private static final String TOTAL_DESCRIPTION = "The total number of "; + private static final String RATE_DESCRIPTION = "The average per-second number of "; + private static final String COMMIT_DESCRIPTION = "commit calls"; + private static final String COMMIT_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + COMMIT_DESCRIPTION; + private static final String COMMIT_RATE_DESCRIPTION = RATE_DESCRIPTION + COMMIT_DESCRIPTION; + private static final String CREATE_TASK_DESCRIPTION = "newly created tasks"; + private static final String CREATE_TASK_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + CREATE_TASK_DESCRIPTION; + private static final String CREATE_TASK_RATE_DESCRIPTION = RATE_DESCRIPTION + CREATE_TASK_DESCRIPTION; + private static final String CLOSE_TASK_DESCRIPTION = "closed tasks"; + private static final String CLOSE_TASK_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + CLOSE_TASK_DESCRIPTION; + private static final String CLOSE_TASK_RATE_DESCRIPTION = RATE_DESCRIPTION + CLOSE_TASK_DESCRIPTION; + private static final String POLL_DESCRIPTION = "poll calls"; + private static final String POLL_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + POLL_DESCRIPTION; + private static final String POLL_RATE_DESCRIPTION = RATE_DESCRIPTION + POLL_DESCRIPTION; + private static final String PROCESS_DESCRIPTION = "process calls"; + private static final String PROCESS_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + PROCESS_DESCRIPTION; + private static final String PROCESS_RATE_DESCRIPTION = RATE_DESCRIPTION + PROCESS_DESCRIPTION; + private static final String PUNCTUATE_DESCRIPTION = "punctuate calls"; + private static final String PUNCTUATE_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + PUNCTUATE_DESCRIPTION; + private static final String PUNCTUATE_RATE_DESCRIPTION = RATE_DESCRIPTION + PUNCTUATE_DESCRIPTION; + private static final String SKIP_RECORDS_DESCRIPTION = "skipped records"; + private static final String SKIP_RECORD_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + SKIP_RECORDS_DESCRIPTION; + private static final String SKIP_RECORD_RATE_DESCRIPTION = RATE_DESCRIPTION + SKIP_RECORDS_DESCRIPTION; + private static final String COMMIT_OVER_TASKS_DESCRIPTION = "commit calls over all tasks"; + private static final String COMMIT_OVER_TASKS_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + COMMIT_OVER_TASKS_DESCRIPTION; + private static final String COMMIT_OVER_TASKS_RATE_DESCRIPTION = RATE_DESCRIPTION + COMMIT_OVER_TASKS_DESCRIPTION; + + private static final String COMMIT_LATENCY = COMMIT + LATENCY_SUFFIX; + private static final String POLL_LATENCY = POLL + LATENCY_SUFFIX; + private static final String PROCESS_LATENCY = PROCESS + LATENCY_SUFFIX; + private static final String PUNCTUATE_LATENCY = PUNCTUATE + LATENCY_SUFFIX; + + public static Sensor createTaskSensor(final StreamsMetricsImpl streamsMetrics) { + final Sensor createTaskSensor = streamsMetrics.threadLevelSensor(CREATE_TASK, RecordingLevel.INFO); + addInvocationRateAndCount(createTaskSensor, + THREAD_LEVEL_GROUP, + streamsMetrics.threadLevelTagMap(), + CREATE_TASK, + CREATE_TASK_TOTAL_DESCRIPTION, + CREATE_TASK_RATE_DESCRIPTION); + return createTaskSensor; + } + + public static Sensor closeTaskSensor(final StreamsMetricsImpl streamsMetrics) { + final Sensor closeTaskSensor = streamsMetrics.threadLevelSensor(CLOSE_TASK, RecordingLevel.INFO); + addInvocationRateAndCount(closeTaskSensor, + THREAD_LEVEL_GROUP, + streamsMetrics.threadLevelTagMap(), + CLOSE_TASK, + CLOSE_TASK_TOTAL_DESCRIPTION, + CLOSE_TASK_RATE_DESCRIPTION); + return closeTaskSensor; + } + + public static Sensor commitSensor(final StreamsMetricsImpl streamsMetrics) { + final Sensor commitSensor = streamsMetrics.threadLevelSensor(COMMIT, Sensor.RecordingLevel.INFO); + final Map tagMap = streamsMetrics.threadLevelTagMap(); + addAvgAndMax(commitSensor, THREAD_LEVEL_GROUP, tagMap, COMMIT_LATENCY); + addInvocationRateAndCount(commitSensor, + THREAD_LEVEL_GROUP, + tagMap, + COMMIT, + COMMIT_TOTAL_DESCRIPTION, + COMMIT_RATE_DESCRIPTION); + return commitSensor; + } + + public static Sensor pollSensor(final StreamsMetricsImpl streamsMetrics) { + final Sensor pollSensor = streamsMetrics.threadLevelSensor(POLL, Sensor.RecordingLevel.INFO); + final Map tagMap = streamsMetrics.threadLevelTagMap(); + addAvgAndMax(pollSensor, THREAD_LEVEL_GROUP, tagMap, POLL_LATENCY); + addInvocationRateAndCount(pollSensor, + THREAD_LEVEL_GROUP, + tagMap, + POLL, + POLL_TOTAL_DESCRIPTION, + POLL_RATE_DESCRIPTION); + return pollSensor; + } + + public static Sensor processSensor(final StreamsMetricsImpl streamsMetrics) { + final Sensor processSensor = streamsMetrics.threadLevelSensor(PROCESS, Sensor.RecordingLevel.INFO); + final Map tagMap = streamsMetrics.threadLevelTagMap(); + addAvgAndMax(processSensor, THREAD_LEVEL_GROUP, tagMap, PROCESS_LATENCY); + addInvocationRateAndCount(processSensor, + THREAD_LEVEL_GROUP, + tagMap, + PROCESS, + PROCESS_TOTAL_DESCRIPTION, + PROCESS_RATE_DESCRIPTION); + + return processSensor; + } + + public static Sensor punctuateSensor(final StreamsMetricsImpl streamsMetrics) { + final Sensor punctuateSensor = streamsMetrics.threadLevelSensor(PUNCTUATE, Sensor.RecordingLevel.INFO); + final Map tagMap = streamsMetrics.threadLevelTagMap(); + addAvgAndMax(punctuateSensor, THREAD_LEVEL_GROUP, tagMap, PUNCTUATE_LATENCY); + addInvocationRateAndCount(punctuateSensor, + THREAD_LEVEL_GROUP, + tagMap, + PUNCTUATE, + PUNCTUATE_TOTAL_DESCRIPTION, + PUNCTUATE_RATE_DESCRIPTION); + + return punctuateSensor; + } + + public static Sensor skipRecordSensor(final StreamsMetricsImpl streamsMetrics) { + final Sensor skippedRecordsSensor = streamsMetrics.threadLevelSensor(SKIP_RECORD, Sensor.RecordingLevel.INFO); + addInvocationRateAndCount(skippedRecordsSensor, + THREAD_LEVEL_GROUP, + streamsMetrics.threadLevelTagMap(), + SKIP_RECORD, + SKIP_RECORD_TOTAL_DESCRIPTION, + SKIP_RECORD_RATE_DESCRIPTION); + + return skippedRecordsSensor; + } + + public static Sensor commitOverTasksSensor(final StreamsMetricsImpl streamsMetrics) { + final Sensor commitOverTasksSensor = streamsMetrics.threadLevelSensor(COMMIT, Sensor.RecordingLevel.DEBUG); + final Map tagMap = streamsMetrics.threadLevelTagMap(TASK_ID_TAG, ALL_TASKS); + addAvgAndMax(commitOverTasksSensor, + TASK_LEVEL_GROUP, + tagMap, + COMMIT_LATENCY); + addInvocationRateAndCount(commitOverTasksSensor, + TASK_LEVEL_GROUP, + tagMap, + COMMIT, + COMMIT_OVER_TASKS_TOTAL_DESCRIPTION, + COMMIT_OVER_TASKS_RATE_DESCRIPTION); + + return commitOverTasksSensor; + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java index 3f6c80691f0..bc4d89535e0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java @@ -25,11 +25,11 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; -import org.apache.kafka.streams.kstream.KGroupedStream; -import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.state.SessionStore; @@ -37,9 +37,9 @@ import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; +import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.After; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index 0dfd8ad45f3..5b207372964 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; import org.apache.kafka.streams.processor.internals.ToInternal; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueIterator; @@ -92,6 +93,8 @@ public class KStreamSessionWindowAggregateProcessorTest { final File stateDir = TestUtils.tempDirectory(); metrics = new Metrics(); final MockStreamsMetrics metrics = new MockStreamsMetrics(KStreamSessionWindowAggregateProcessorTest.this.metrics); + ThreadMetrics.skipRecordSensor(metrics); + context = new InternalMockProcessorContext( stateDir, Serdes.String(), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index ccd94de34c2..ab9a47e92af 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -246,7 +246,6 @@ public class StreamTaskTest { throw new TimeoutException("test"); } }, - null, null ); fail("Expected an exception"); @@ -301,7 +300,6 @@ public class StreamTaskTest { } } }, - null, null ); testTask.initializeTopology(); @@ -851,8 +849,7 @@ public class StreamTaskTest { public void flush() { flushed.set(true); } - }, - metrics.sensor("dummy")); + }); streamTask.flushState(); assertTrue(flushed.get()); } @@ -1427,8 +1424,7 @@ public class StreamTaskTest { stateDirectory, null, time, - () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer), - metrics.sensor("dummy")); + () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer)); task.initializeStateStores(); task.initializeTopology(); @@ -1498,8 +1494,7 @@ public class StreamTaskTest { stateDirectory, null, time, - () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer), - metrics.sensor("dummy")); + () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer)); } private StreamTask createStatefulTaskThatThrowsExceptionOnClose() { @@ -1520,8 +1515,7 @@ public class StreamTaskTest { stateDirectory, null, time, - () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer), - metrics.sensor("dummy")); + () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer)); } private StreamTask createStatelessTask(final StreamsConfig streamsConfig) { @@ -1546,8 +1540,7 @@ public class StreamTaskTest { stateDirectory, null, time, - () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer), - metrics.sensor("dummy")); + () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer)); } // this task will throw exception when processing (on partition2), flushing, suspending and closing @@ -1573,8 +1566,7 @@ public class StreamTaskTest { stateDirectory, null, time, - () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer), - metrics.sensor("dummy")) { + () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer)) { @Override protected void flushState() { throw new RuntimeException("KABOOM!"); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 1de39d2274b..7c17ca7a45c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -56,6 +56,7 @@ import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskMetadata; import org.apache.kafka.streams.processor.ThreadMetadata; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; @@ -254,35 +255,70 @@ public class StreamThreadTest { final StreamThread thread = createStreamThread(clientId, config, false); final String defaultGroupName = "stream-metrics"; final Map defaultTags = Collections.singletonMap("client-id", thread.getName()); + final String descriptionIsNotVerified = ""; - assertNotNull(metrics.metrics().get(metrics.metricName("commit-latency-avg", defaultGroupName, "The average commit time in ms", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("commit-latency-max", defaultGroupName, "The maximum commit time in ms", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("commit-rate", defaultGroupName, "The average per-second number of commit calls", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("commit-total", defaultGroupName, "The total number of commit calls", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("poll-latency-avg", defaultGroupName, "The average poll time in ms", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("poll-latency-max", defaultGroupName, "The maximum poll time in ms", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("poll-rate", defaultGroupName, "The average per-second number of record-poll calls", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("poll-total", defaultGroupName, "The total number of record-poll calls", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("process-latency-avg", defaultGroupName, "The average process time in ms", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("process-latency-max", defaultGroupName, "The maximum process time in ms", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("process-rate", defaultGroupName, "The average per-second number of process calls", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("process-total", defaultGroupName, "The total number of process calls", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("punctuate-latency-avg", defaultGroupName, "The average punctuate time in ms", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("punctuate-latency-max", defaultGroupName, "The maximum punctuate time in ms", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("punctuate-rate", defaultGroupName, "The average per-second number of punctuate calls", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("punctuate-total", defaultGroupName, "The total number of punctuate calls", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("task-created-rate", defaultGroupName, "The average per-second number of newly created tasks", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("task-created-total", defaultGroupName, "The total number of newly created tasks", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("task-closed-rate", defaultGroupName, "The average per-second number of closed tasks", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("task-closed-total", defaultGroupName, "The total number of closed tasks", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("skipped-records-rate", defaultGroupName, "The average per-second number of skipped records.", defaultTags))); - assertNotNull(metrics.metrics().get(metrics.metricName("skipped-records-total", defaultGroupName, "The total number of skipped records.", defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "commit-latency-avg", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "commit-latency-max", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "commit-rate", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "commit-total", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "poll-latency-avg", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "poll-latency-max", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "poll-rate", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "poll-total", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "process-latency-avg", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "process-latency-max", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "process-rate", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "process-total", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "punctuate-latency-avg", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "punctuate-latency-max", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "punctuate-rate", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "punctuate-total", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "task-created-rate", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "task-created-total", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "task-closed-rate", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "task-closed-total", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "skipped-records-rate", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "skipped-records-total", defaultGroupName, descriptionIsNotVerified, defaultTags))); + + final String taskGroupName = "stream-task-metrics"; + final Map taskTags = + mkMap(mkEntry("task-id", "all"), mkEntry("client-id", thread.getName())); + assertNotNull(metrics.metrics().get(metrics.metricName( + "commit-latency-avg", taskGroupName, descriptionIsNotVerified, taskTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "commit-latency-max", taskGroupName, descriptionIsNotVerified, taskTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "commit-rate", taskGroupName, descriptionIsNotVerified, taskTags))); final JmxReporter reporter = new JmxReporter("kafka.streams"); metrics.addReporter(reporter); assertEquals(clientId + "-StreamThread-1", thread.getName()); assertTrue(reporter.containsMbean(String.format("kafka.streams:type=%s,client-id=%s", - defaultGroupName, thread.getName()))); + defaultGroupName, + thread.getName()))); + assertTrue(reporter.containsMbean("kafka.streams:type=stream-task-metrics,client-id=" + thread.getName() + ",task-id=all")); } @Test @@ -296,8 +332,7 @@ public class StreamThreadTest { final Consumer consumer = EasyMock.createNiceMock(Consumer.class); final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 1); - final StreamThread.StreamsMetricsThreadImpl streamsMetrics - = new StreamThread.StreamsMetricsThreadImpl(metrics, ""); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); final StreamThread thread = new StreamThread( mockTime, config, @@ -423,8 +458,7 @@ public class StreamThreadTest { final Consumer consumer = EasyMock.createNiceMock(Consumer.class); final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0); - final StreamThread.StreamsMetricsThreadImpl streamsMetrics - = new StreamThread.StreamsMetricsThreadImpl(metrics, ""); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); final StreamThread thread = new StreamThread( mockTime, config, @@ -459,8 +493,7 @@ public class StreamThreadTest { final Consumer consumer = EasyMock.createNiceMock(Consumer.class); final TaskManager taskManager = mockTaskManagerCommit(consumer, 2, 1); - final StreamThread.StreamsMetricsThreadImpl streamsMetrics - = new StreamThread.StreamsMetricsThreadImpl(metrics, ""); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); final StreamThread thread = new StreamThread( mockTime, config, @@ -610,8 +643,7 @@ public class StreamThreadTest { EasyMock.expectLastCall(); EasyMock.replay(taskManager, consumer); - final StreamThread.StreamsMetricsThreadImpl streamsMetrics - = new StreamThread.StreamsMetricsThreadImpl(metrics, ""); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); final StreamThread thread = new StreamThread( mockTime, config, @@ -644,8 +676,7 @@ public class StreamThreadTest { EasyMock.expectLastCall(); EasyMock.replay(taskManager, consumer); - final StreamThread.StreamsMetricsThreadImpl streamsMetrics - = new StreamThread.StreamsMetricsThreadImpl(metrics, ""); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); final StreamThread thread = new StreamThread( mockTime, config, @@ -672,8 +703,7 @@ public class StreamThreadTest { EasyMock.expectLastCall(); EasyMock.replay(taskManager, consumer); - final StreamThread.StreamsMetricsThreadImpl streamsMetrics - = new StreamThread.StreamsMetricsThreadImpl(metrics, ""); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); final StreamThread thread = new StreamThread( mockTime, config, @@ -1449,8 +1479,7 @@ public class StreamThreadTest { final Consumer consumer = EasyMock.createNiceMock(Consumer.class); final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0); - final StreamThread.StreamsMetricsThreadImpl streamsMetrics - = new StreamThread.StreamsMetricsThreadImpl(metrics, ""); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); final StreamThread thread = new StreamThread( mockTime, config, @@ -1489,8 +1518,7 @@ public class StreamThreadTest { final Consumer consumer = EasyMock.createNiceMock(Consumer.class); final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - final StreamThread.StreamsMetricsThreadImpl streamsMetrics - = new StreamThread.StreamsMetricsThreadImpl(metrics, ""); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); final StreamThread thread = new StreamThread( mockTime, config, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java index e1d6f0c086c..f381a58c123 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java @@ -22,7 +22,10 @@ import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.common.utils.MockTime; +import org.easymock.EasyMock; +import org.easymock.EasyMockSupport; import org.junit.Test; import java.util.Collections; @@ -38,8 +41,34 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; -public class StreamsMetricsImplTest { +public class StreamsMetricsImplTest extends EasyMockSupport { + + private final static String SENSOR_PREFIX_DELIMITER = "."; + private final static String SENSOR_NAME_DELIMITER = ".s."; + private final static String INTERNAL_PREFIX = "internal"; + + @Test + public void shouldGetThreadLevelSensor() { + final Metrics metrics = mock(Metrics.class); + final String threadName = "thread1"; + final String sensorName = "sensor1"; + final String expectedFullSensorName = + INTERNAL_PREFIX + SENSOR_PREFIX_DELIMITER + threadName + SENSOR_NAME_DELIMITER + sensorName; + final RecordingLevel recordingLevel = RecordingLevel.DEBUG; + final Sensor[] parents = {}; + EasyMock.expect(metrics.sensor(expectedFullSensorName, recordingLevel, parents)).andReturn(null); + + replayAll(); + + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName); + final Sensor sensor = streamsMetrics.threadLevelSensor(sensorName, recordingLevel); + + verifyAll(); + + assertNull(sensor); + } @Test(expected = NullPointerException.class) public void testNullMetrics() { @@ -92,13 +121,13 @@ public class StreamsMetricsImplTest { final Sensor parent1 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG); addAvgMaxLatency(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); - addInvocationRateAndCount(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); + addInvocationRateAndCount(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation, "", ""); final int numberOfTaskMetrics = registry.metrics().size(); final Sensor sensor1 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent1); addAvgMaxLatency(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); - addInvocationRateAndCount(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); + addInvocationRateAndCount(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation, "", ""); assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics)); @@ -108,13 +137,13 @@ public class StreamsMetricsImplTest { final Sensor parent2 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG); addAvgMaxLatency(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); - addInvocationRateAndCount(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); + addInvocationRateAndCount(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation, "", ""); assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics)); final Sensor sensor2 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent2); addAvgMaxLatency(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); - addInvocationRateAndCount(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); + addInvocationRateAndCount(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation, "", ""); assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java new file mode 100644 index 00000000000..89395d9c51a --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.metrics; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.Sensor.RecordingLevel; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Collections; +import java.util.Map; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ALL_TASKS; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_ID_TAG; +import static org.easymock.EasyMock.expect; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.powermock.api.easymock.PowerMock.createStrictMock; +import static org.powermock.api.easymock.PowerMock.mockStatic; +import static org.powermock.api.easymock.PowerMock.replay; +import static org.powermock.api.easymock.PowerMock.replayAll; +import static org.powermock.api.easymock.PowerMock.verify; +import static org.powermock.api.easymock.PowerMock.verifyAll; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(StreamsMetricsImpl.class) +public class ThreadMetricsTest { + + private static final String THREAD_LEVEL_GROUP = "stream-metrics"; + private static final String TASK_LEVEL_GROUP = "stream-task-metrics"; + + private final Metrics dummyMetrics = new Metrics(); + private final Sensor dummySensor = dummyMetrics.sensor("dummy"); + private final StreamsMetricsImpl streamsMetrics = createStrictMock(StreamsMetricsImpl.class); + private final Map dummyTagMap = Collections.singletonMap("hello", "world"); + + @Test + public void shouldGetCreateTaskSensor() { + final String operation = "task-created"; + final String totalDescription = "The total number of newly created tasks"; + final String rateDescription = "The average per-second number of newly created tasks"; + mockStatic(StreamsMetricsImpl.class); + expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor); + expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap); + StreamsMetricsImpl.addInvocationRateAndCount( + dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); + + replayAll(); + replay(StreamsMetricsImpl.class); + + final Sensor sensor = ThreadMetrics.createTaskSensor(streamsMetrics); + + verifyAll(); + verify(StreamsMetricsImpl.class); + + assertThat(sensor, is(dummySensor)); + } + + @Test + public void shouldGetCloseTaskSensor() { + final String operation = "task-closed"; + final String totalDescription = "The total number of closed tasks"; + final String rateDescription = "The average per-second number of closed tasks"; + mockStatic(StreamsMetricsImpl.class); + expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor); + expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap); + StreamsMetricsImpl.addInvocationRateAndCount( + dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); + + replayAll(); + replay(StreamsMetricsImpl.class); + + final Sensor sensor = ThreadMetrics.closeTaskSensor(streamsMetrics); + + verifyAll(); + verify(StreamsMetricsImpl.class); + + assertThat(sensor, is(dummySensor)); + } + + @Test + public void shouldGetCommitSensor() { + final String operation = "commit"; + final String operationLatency = operation + StreamsMetricsImpl.LATENCY_SUFFIX; + final String totalDescription = "The total number of commit calls"; + final String rateDescription = "The average per-second number of commit calls"; + mockStatic(StreamsMetricsImpl.class); + expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor); + expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap); + StreamsMetricsImpl.addInvocationRateAndCount( + dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); + StreamsMetricsImpl.addAvgAndMax( + dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency); + + replayAll(); + replay(StreamsMetricsImpl.class); + + final Sensor sensor = ThreadMetrics.commitSensor(streamsMetrics); + + verifyAll(); + verify(StreamsMetricsImpl.class); + + assertThat(sensor, is(dummySensor)); + } + + @Test + public void shouldGetPollSensor() { + final String operation = "poll"; + final String operationLatency = operation + StreamsMetricsImpl.LATENCY_SUFFIX; + final String totalDescription = "The total number of poll calls"; + final String rateDescription = "The average per-second number of poll calls"; + mockStatic(StreamsMetricsImpl.class); + expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor); + expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap); + StreamsMetricsImpl.addInvocationRateAndCount( + dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); + StreamsMetricsImpl.addAvgAndMax( + dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency); + + replayAll(); + replay(StreamsMetricsImpl.class); + + final Sensor sensor = ThreadMetrics.pollSensor(streamsMetrics); + + verifyAll(); + verify(StreamsMetricsImpl.class); + + assertThat(sensor, is(dummySensor)); + } + + @Test + public void shouldGetProcessSensor() { + final String operation = "process"; + final String operationLatency = operation + StreamsMetricsImpl.LATENCY_SUFFIX; + final String totalDescription = "The total number of process calls"; + final String rateDescription = "The average per-second number of process calls"; + mockStatic(StreamsMetricsImpl.class); + expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor); + expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap); + StreamsMetricsImpl.addInvocationRateAndCount( + dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); + StreamsMetricsImpl.addAvgAndMax( + dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency); + + replayAll(); + replay(StreamsMetricsImpl.class); + + final Sensor sensor = ThreadMetrics.processSensor(streamsMetrics); + + verifyAll(); + verify(StreamsMetricsImpl.class); + + assertThat(sensor, is(dummySensor)); + } + + @Test + public void shouldGetPunctuateSensor() { + final String operation = "punctuate"; + final String operationLatency = operation + StreamsMetricsImpl.LATENCY_SUFFIX; + final String totalDescription = "The total number of punctuate calls"; + final String rateDescription = "The average per-second number of punctuate calls"; + mockStatic(StreamsMetricsImpl.class); + expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor); + expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap); + StreamsMetricsImpl.addInvocationRateAndCount( + dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); + StreamsMetricsImpl.addAvgAndMax( + dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency); + + replayAll(); + replay(StreamsMetricsImpl.class); + + final Sensor sensor = ThreadMetrics.punctuateSensor(streamsMetrics); + + verifyAll(); + verify(StreamsMetricsImpl.class); + + assertThat(sensor, is(dummySensor)); + } + + @Test + public void shouldGetSkipRecordSensor() { + final String operation = "skipped-records"; + final String totalDescription = "The total number of skipped records"; + final String rateDescription = "The average per-second number of skipped records"; + mockStatic(StreamsMetricsImpl.class); + expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor); + expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap); + StreamsMetricsImpl.addInvocationRateAndCount( + dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); + + replayAll(); + replay(StreamsMetricsImpl.class); + + final Sensor sensor = ThreadMetrics.skipRecordSensor(streamsMetrics); + + verifyAll(); + verify(StreamsMetricsImpl.class); + + assertThat(sensor, is(dummySensor)); + } + + @Test + public void shouldGetCommitOverTasksSensor() { + final String operation = "commit"; + final String operationLatency = operation + StreamsMetricsImpl.LATENCY_SUFFIX; + final String totalDescription = "The total number of commit calls over all tasks"; + final String rateDescription = "The average per-second number of commit calls over all tasks"; + mockStatic(StreamsMetricsImpl.class); + expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.DEBUG)).andReturn(dummySensor); + expect(streamsMetrics.threadLevelTagMap(TASK_ID_TAG, ALL_TASKS)).andReturn(dummyTagMap); + StreamsMetricsImpl.addInvocationRateAndCount( + dummySensor, TASK_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); + StreamsMetricsImpl.addAvgAndMax( + dummySensor, TASK_LEVEL_GROUP, dummyTagMap, operationLatency); + + replayAll(); + replay(StreamsMetricsImpl.class); + + final Sensor sensor = ThreadMetrics.commitOverTasksSensor(streamsMetrics); + + verifyAll(); + verify(StreamsMetricsImpl.class); + + assertThat(sensor, is(dummySensor)); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index da2d46d6360..f48c31c1d91 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -317,8 +317,7 @@ public class StreamThreadStateStoreProviderTest { stateDirectory, null, new MockTime(), - () -> clientSupplier.getProducer(new HashMap<>()), - metrics.sensor("dummy")) { + () -> clientSupplier.getProducer(new HashMap<>())) { @Override protected void updateOffsetLimits() {} }; diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 38da0d82509..48c038c2d5f 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -32,6 +32,9 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Count; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.Total; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; @@ -277,10 +280,21 @@ public class TopologyTestDriver implements Closeable { .timeWindow(streamsConfig.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); metrics = new Metrics(metricConfig, mockWallClockTime); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( - metrics, - "topology-test-driver-virtual-thread" - ); + + final String threadName = "topology-test-driver-virtual-thread"; + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName); + final Sensor skippedRecordsSensor = streamsMetrics.threadLevelSensor("skipped-records", Sensor.RecordingLevel.INFO); + final String threadLevelGroup = "stream-metrics"; + skippedRecordsSensor.add(new MetricName("skipped-records-rate", + threadLevelGroup, + "The average per-second number of skipped records", + streamsMetrics.tagMap()), + new Rate(TimeUnit.SECONDS, new Count())); + skippedRecordsSensor.add(new MetricName("skipped-records-total", + threadLevelGroup, + "The total number of skipped records", + streamsMetrics.tagMap()), + new Total()); final ThreadCache cache = new ThreadCache( new LogContext("topology-test-driver "), Math.max(0, streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)), @@ -360,8 +374,7 @@ public class TopologyTestDriver implements Closeable { stateDirectory, cache, mockWallClockTime, - () -> producer, - metrics.sensor("dummy")); + () -> producer); task.initializeStateStores(); task.initializeTopology(); ((InternalProcessorContext) task.context()).setRecordContext(new ProcessorRecordContext( diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java index 34a7ed92c68..ed0274649bf 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java @@ -33,6 +33,7 @@ import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.kstream.ValueTransformer; import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; import java.io.File; @@ -214,10 +215,9 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S this.stateDir = stateDir; final MetricConfig metricConfig = new MetricConfig(); metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG); - this.metrics = new StreamsMetricsImpl( - new Metrics(metricConfig), - "mock-processor-context-virtual-thread" - ); + final String threadName = "mock-processor-context-virtual-thread"; + this.metrics = new StreamsMetricsImpl(new Metrics(metricConfig), threadName); + ThreadMetrics.skipRecordSensor(metrics); } @Override