diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 98e5d3bc52e..d55743a3336 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -170,6 +170,8 @@
files="StreamsPartitionAssignor.java"/>
+
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 5c227d8bb04..9ff8097ca6b 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -116,7 +116,7 @@ public final class Sensor {
this.lastRecordTime = time.milliseconds();
this.recordingLevel = recordingLevel;
this.metricLock = new Object();
- checkForest(new HashSet());
+ checkForest(new HashSet<>());
}
/* Validate that this sensor doesn't end up referencing itself */
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
index bbb8f2456d0..e609e270ecb 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
@@ -20,17 +20,20 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.common.metrics.CompoundStat;
-import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.stats.Histogram.BinScheme;
import org.apache.kafka.common.metrics.stats.Histogram.ConstantBinScheme;
import org.apache.kafka.common.metrics.stats.Histogram.LinearBinScheme;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A compound stat that reports one or more percentiles
*/
public class Percentiles extends SampledStat implements CompoundStat {
+ private final Logger log = LoggerFactory.getLogger(Percentiles.class);
+
public enum BucketSizing {
CONSTANT, LINEAR
}
@@ -38,6 +41,8 @@ public class Percentiles extends SampledStat implements CompoundStat {
private final int buckets;
private final Percentile[] percentiles;
private final BinScheme binScheme;
+ private final double min;
+ private final double max;
public Percentiles(int sizeInBytes, double max, BucketSizing bucketing, Percentile... percentiles) {
this(sizeInBytes, 0.0, max, bucketing, percentiles);
@@ -47,6 +52,8 @@ public class Percentiles extends SampledStat implements CompoundStat {
super(0.0);
this.percentiles = percentiles;
this.buckets = sizeInBytes / 4;
+ this.min = min;
+ this.max = max;
if (bucketing == BucketSizing.CONSTANT) {
this.binScheme = new ConstantBinScheme(buckets, min, max);
} else if (bucketing == BucketSizing.LINEAR) {
@@ -63,11 +70,10 @@ public class Percentiles extends SampledStat implements CompoundStat {
List ms = new ArrayList<>(this.percentiles.length);
for (Percentile percentile : this.percentiles) {
final double pct = percentile.percentile();
- ms.add(new NamedMeasurable(percentile.name(), new Measurable() {
- public double measure(MetricConfig config, long now) {
- return value(config, now, pct / 100.0);
- }
- }));
+ ms.add(new NamedMeasurable(
+ percentile.name(),
+ (config, now) -> value(config, now, pct / 100.0))
+ );
}
return ms;
}
@@ -105,8 +111,21 @@ public class Percentiles extends SampledStat implements CompoundStat {
@Override
protected void update(Sample sample, MetricConfig config, double value, long timeMs) {
+ final double boundedValue;
+ if (value > max) {
+ log.warn("Received value {} which is greater than max recordable value {}, will be pinned to the max value",
+ value, max);
+ boundedValue = max;
+ } else if (value < min) {
+ log.warn("Received value {} which is less than min recordable value {}, will be pinned to the min value",
+ value, min);
+ boundedValue = min;
+ } else {
+ boundedValue = value;
+ }
+
HistogramSample hist = (HistogramSample) sample;
- hist.histogram.record(value);
+ hist.histogram.record(boundedValue);
}
private static class HistogramSample extends SampledStat.Sample {
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java
index 18dd6f271b0..369709e32e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java
@@ -40,7 +40,7 @@ public abstract class SampledStat implements MeasurableStat {
public SampledStat(double initialValue) {
this.initialValue = initialValue;
- this.samples = new ArrayList(2);
+ this.samples = new ArrayList<>(2);
}
@Override
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index a6f5d410baa..e57d4e51310 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
@@ -492,6 +493,88 @@ public class MetricsTest {
assertEquals(75, (Double) p75.metricValue(), 1.0);
}
+ @Test
+ public void shouldPinSmallerValuesToMin() {
+ final double min = 0.0d;
+ final double max = 100d;
+ Percentiles percs = new Percentiles(1000,
+ min,
+ max,
+ BucketSizing.LINEAR,
+ new Percentile(metrics.metricName("test.p50", "grp1"), 50));
+ MetricConfig config = new MetricConfig().eventWindow(50).samples(2);
+ Sensor sensor = metrics.sensor("test", config);
+ sensor.add(percs);
+ Metric p50 = this.metrics.metrics().get(metrics.metricName("test.p50", "grp1"));
+
+ sensor.record(min - 100);
+ sensor.record(min - 100);
+ assertEquals(min, (double) p50.metricValue(), 0d);
+ }
+
+ @Test
+ public void shouldPinLargerValuesToMax() {
+ final double min = 0.0d;
+ final double max = 100d;
+ Percentiles percs = new Percentiles(1000,
+ min,
+ max,
+ BucketSizing.LINEAR,
+ new Percentile(metrics.metricName("test.p50", "grp1"), 50));
+ MetricConfig config = new MetricConfig().eventWindow(50).samples(2);
+ Sensor sensor = metrics.sensor("test", config);
+ sensor.add(percs);
+ Metric p50 = this.metrics.metrics().get(metrics.metricName("test.p50", "grp1"));
+
+ sensor.record(max + 100);
+ sensor.record(max + 100);
+ assertEquals(max, (double) p50.metricValue(), 0d);
+ }
+
+ @Test
+ public void testPercentilesWithRandomNumbersAndLinearBucketing() {
+ long seed = new Random().nextLong();
+ int sizeInBytes = 1000 * 1000; // 1MB
+ long maximumValue = 1000 * 24 * 60 * 60 * 1000L; // if values are ms, max is 1000 days
+
+ try {
+ Random prng = new Random(seed);
+ int numberOfValues = 5000 + prng.nextInt(10_000); // ranges is [5000, 15000]
+
+ Percentiles percs = new Percentiles(sizeInBytes,
+ maximumValue,
+ BucketSizing.LINEAR,
+ new Percentile(metrics.metricName("test.p90", "grp1"), 90),
+ new Percentile(metrics.metricName("test.p99", "grp1"), 99));
+ MetricConfig config = new MetricConfig().eventWindow(50).samples(2);
+ Sensor sensor = metrics.sensor("test", config);
+ sensor.add(percs);
+ Metric p90 = this.metrics.metrics().get(metrics.metricName("test.p90", "grp1"));
+ Metric p99 = this.metrics.metrics().get(metrics.metricName("test.p99", "grp1"));
+
+ final List values = new ArrayList<>(numberOfValues);
+ // record two windows worth of sequential values
+ for (int i = 0; i < numberOfValues; ++i) {
+ long value = (Math.abs(prng.nextLong()) - 1) % maximumValue;
+ values.add(value);
+ sensor.record(value);
+ }
+
+ Collections.sort(values);
+
+ int p90Index = (int) Math.ceil(((double) (90 * numberOfValues)) / 100);
+ int p99Index = (int) Math.ceil(((double) (99 * numberOfValues)) / 100);
+
+ double expectedP90 = values.get(p90Index - 1);
+ double expectedP99 = values.get(p99Index - 1);
+
+ assertEquals(expectedP90, (Double) p90.metricValue(), expectedP90 / 10);
+ assertEquals(expectedP99, (Double) p99.metricValue(), expectedP99 / 10);
+ } catch (AssertionError e) {
+ throw new AssertionError("Assertion failed in randomized test. Reproduce with seed = " + seed + " .", e);
+ }
+ }
+
@Test
public void testRateWindowing() throws Exception {
// Use the default time window. Set 3 samples
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 87662963eb7..19bbbdfa528 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -224,6 +224,9 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
final V value) {
setCurrentNode(child);
child.process(key, value);
+ if (child.isTerminalNode()) {
+ streamTask.maybeRecordE2ELatency(timestamp(), child.name());
+ }
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index f37ff2b7d7d..a91eb6fa916 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -161,6 +161,10 @@ public class ProcessorNode {
maybeMeasureLatency(() -> punctuator.punctuate(timestamp), time, punctuateSensor);
}
+ public boolean isTerminalNode() {
+ return children.isEmpty();
+ }
+
/**
* @return a string representation of this node, useful for debugging.
*/
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 2c9c7edad48..14973290b5e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -29,6 +29,7 @@ public class ProcessorTopology {
private final List> processorNodes;
private final Map> sourcesByTopic;
private final Map> sinksByTopic;
+ private final Set terminalNodes;
private final List stateStores;
private final Set repartitionTopics;
@@ -50,6 +51,13 @@ public class ProcessorTopology {
this.globalStateStores = Collections.unmodifiableList(globalStateStores);
this.storeToChangelogTopic = Collections.unmodifiableMap(storeToChangelogTopic);
this.repartitionTopics = Collections.unmodifiableSet(repartitionTopics);
+
+ this.terminalNodes = new HashSet<>();
+ for (final ProcessorNode, ?> node : processorNodes) {
+ if (node.isTerminalNode()) {
+ terminalNodes.add(node.name());
+ }
+ }
}
public Set sourceTopics() {
@@ -72,6 +80,10 @@ public class ProcessorTopology {
return sinksByTopic.get(topic);
}
+ public Set terminalNodes() {
+ return terminalNodes;
+ }
+
public List> processors() {
return processorNodes;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 5b937dbb8b9..cf1f4433976 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
@@ -37,6 +38,7 @@ import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
@@ -97,6 +99,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
private final Sensor punctuateLatencySensor;
private final Sensor bufferedRecordsSensor;
private final Sensor enforcedProcessingSensor;
+ private final Map e2eLatencySensors = new HashMap<>();
private final InternalProcessorContext processorContext;
private final RecordQueueCreator recordQueueCreator;
@@ -142,6 +145,21 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics);
bufferedRecordsSensor = TaskMetrics.activeBufferedRecordsSensor(threadId, taskId, streamsMetrics);
+ for (final String terminalNode : topology.terminalNodes()) {
+ e2eLatencySensors.put(
+ terminalNode,
+ ProcessorNodeMetrics.recordE2ELatencySensor(threadId, taskId, terminalNode, RecordingLevel.INFO, streamsMetrics)
+ );
+ }
+
+ for (final ProcessorNode, ?> sourceNode : topology.sources()) {
+ final String processorId = sourceNode.name();
+ e2eLatencySensors.put(
+ processorId,
+ ProcessorNodeMetrics.recordE2ELatencySensor(threadId, taskId, processorId, RecordingLevel.INFO, streamsMetrics)
+ );
+ }
+
streamTimePunctuationQueue = new PunctuationQueue();
systemTimePunctuationQueue = new PunctuationQueue();
maxTaskIdleMs = config.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
@@ -587,6 +605,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
log.trace("Start processing one record [{}]", record);
updateProcessorContext(record, currNode, wallClockTime);
+ maybeRecordE2ELatency(record.timestamp, wallClockTime, currNode.name());
maybeMeasureLatency(() -> currNode.process(record.key(), record.value()), time, processLatencySensor);
log.trace("Completed processing one record [{}]", record);
@@ -900,6 +919,19 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
return punctuated;
}
+ void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) {
+ maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName);
+ }
+
+ private void maybeRecordE2ELatency(final long recordTimestamp, final long now, final String nodeName) {
+ final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName);
+ if (e2eLatencySensor == null) {
+ throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName);
+ } else if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {
+ e2eLatencySensor.record(now - recordTimestamp, now);
+ }
+ }
+
/**
* Request committing the current task's state
*/
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java
index c157d833e35..f35d5dd69c4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java
@@ -29,6 +29,7 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_DESCRIPTION;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addMinAndMaxAndP99AndP90ToSensor;
public class ProcessorNodeMetrics {
private ProcessorNodeMetrics() {}
@@ -98,6 +99,15 @@ public class ProcessorNodeMetrics {
private static final String LATE_RECORD_DROP_RATE_DESCRIPTION =
RATE_DESCRIPTION_PREFIX + LATE_RECORD_DROP_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
+ private static final String RECORD_E2E_LATENCY = "record-e2e-latency";
+ private static final String RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX =
+ "end-to-end latency of a record, measuring by comparing the record timestamp with the "
+ + "system time when it has been fully processed by the node";
+ private static final String RECORD_E2E_LATENCY_MIN_DESCRIPTION = "The minimum " + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX;
+ private static final String RECORD_E2E_LATENCY_MAX_DESCRIPTION = "The maximum " + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX;
+ private static final String RECORD_E2E_LATENCY_P99_DESCRIPTION = "The 99th percentile " + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX;
+ private static final String RECORD_E2E_LATENCY_P90_DESCRIPTION = "The 90th percentile " + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX;
+
public static Sensor suppressionEmitSensor(final String threadId,
final String taskId,
final String processorNodeId,
@@ -289,6 +299,26 @@ public class ProcessorNodeMetrics {
return processAtSourceSensor(threadId, taskId, processorNodeId, streamsMetrics);
}
+ public static Sensor recordE2ELatencySensor(final String threadId,
+ final String taskId,
+ final String processorNodeId,
+ final RecordingLevel recordingLevel,
+ final StreamsMetricsImpl streamsMetrics) {
+ final Sensor sensor = streamsMetrics.nodeLevelSensor(threadId, taskId, processorNodeId, RECORD_E2E_LATENCY, recordingLevel);
+ final Map tagMap = streamsMetrics.nodeLevelTagMap(threadId, taskId, processorNodeId);
+ addMinAndMaxAndP99AndP90ToSensor(
+ sensor,
+ PROCESSOR_NODE_LEVEL_GROUP,
+ tagMap,
+ RECORD_E2E_LATENCY,
+ RECORD_E2E_LATENCY_MIN_DESCRIPTION,
+ RECORD_E2E_LATENCY_MAX_DESCRIPTION,
+ RECORD_E2E_LATENCY_P99_DESCRIPTION,
+ RECORD_E2E_LATENCY_P90_DESCRIPTION
+ );
+ return sensor;
+ }
+
private static Sensor throughputAndLatencySensorWithParent(final String threadId,
final String taskId,
final String processorNodeId,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index d3d1ecd511e..cf77126f359 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -28,6 +28,9 @@ import org.apache.kafka.common.metrics.stats.CumulativeCount;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentile;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.metrics.stats.WindowedCount;
@@ -128,6 +131,8 @@ public class StreamsMetricsImpl implements StreamsMetrics {
public static final String RATE_SUFFIX = "-rate";
public static final String TOTAL_SUFFIX = "-total";
public static final String RATIO_SUFFIX = "-ratio";
+ public static final String P99_SUFFIX = "-p99";
+ public static final String P90_SUFFIX = "-p90";
public static final String GROUP_PREFIX_WO_DELIMITER = "stream";
public static final String GROUP_PREFIX = GROUP_PREFIX_WO_DELIMITER + "-";
@@ -149,6 +154,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
public static final String RATE_DESCRIPTION_PREFIX = "The average number of ";
public static final String RATE_DESCRIPTION_SUFFIX = " per second";
+ private static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000; // 1 MB
+ private static double MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000d; // maximum latency is 10 days; values above that will be pinned
+
public StreamsMetricsImpl(final Metrics metrics, final String clientId, final String builtInMetricsVersion) {
Objects.requireNonNull(metrics, "Metrics cannot be null");
Objects.requireNonNull(builtInMetricsVersion, "Built-in metrics version cannot be null");
@@ -642,6 +650,54 @@ public class StreamsMetricsImpl implements StreamsMetrics {
);
}
+ public static void addMinAndMaxAndP99AndP90ToSensor(final Sensor sensor,
+ final String group,
+ final Map tags,
+ final String operation,
+ final String descriptionOfMin,
+ final String descriptionOfMax,
+ final String descriptionOfP99,
+ final String descriptionOfP90) {
+ sensor.add(
+ new MetricName(
+ operation + MIN_SUFFIX,
+ group,
+ descriptionOfMin,
+ tags),
+ new Min()
+ );
+
+ sensor.add(
+ new MetricName(
+ operation + MAX_SUFFIX,
+ group,
+ descriptionOfMax,
+ tags),
+ new Max()
+ );
+
+ sensor.add(
+ new Percentiles(
+ PERCENTILES_SIZE_IN_BYTES,
+ MAXIMUM_E2E_LATENCY,
+ BucketSizing.LINEAR,
+ new Percentile(
+ new MetricName(
+ operation + P99_SUFFIX,
+ group,
+ descriptionOfP99,
+ tags),
+ 99),
+ new Percentile(
+ new MetricName(
+ operation + P90_SUFFIX,
+ group,
+ descriptionOfP90,
+ tags),
+ 90))
+ );
+ }
+
public static void addAvgAndMaxLatencyToSensor(final Sensor sensor,
final String group,
final Map tags,
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index 21000c40d71..f06057f4849 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -204,6 +204,10 @@ public class MetricsIntegrationTest {
private static final String SUPPRESSION_BUFFER_COUNT_MAX = "suppression-buffer-count-max";
private static final String EXPIRED_WINDOW_RECORD_DROP_RATE = "expired-window-record-drop-rate";
private static final String EXPIRED_WINDOW_RECORD_DROP_TOTAL = "expired-window-record-drop-total";
+ private static final String E2E_LATENCY_MIN = "record-e2e-latency-min";
+ private static final String E2E_LATENCY_MAX = "record-e2e-latency-max";
+ private static final String E2E_LATENCY_P99 = "record-e2e-latency-p99";
+ private static final String E2E_LATENCY_P90 = "record-e2e-latency-p90";
// stores name
private static final String TIME_WINDOWED_AGGREGATED_STREAM_STORE = "time-windowed-aggregated-stream-store";
@@ -582,6 +586,8 @@ public class MetricsIntegrationTest {
final int numberOfRemovedMetrics = StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion) ? 18 : 0;
final int numberOfModifiedProcessMetrics = StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion) ? 18 : 4;
final int numberOfModifiedForwardMetrics = StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion) ? 8 : 0;
+ final int numberOfSourceNodes = 4;
+ final int numberOfTerminalNodes = 4;
checkMetricByName(listMetricProcessor, PROCESS_LATENCY_AVG, numberOfRemovedMetrics);
checkMetricByName(listMetricProcessor, PROCESS_LATENCY_MAX, numberOfRemovedMetrics);
checkMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_AVG, numberOfRemovedMetrics);
@@ -600,6 +606,10 @@ public class MetricsIntegrationTest {
checkMetricByName(listMetricProcessor, DESTROY_TOTAL, numberOfRemovedMetrics);
checkMetricByName(listMetricProcessor, FORWARD_TOTAL, numberOfModifiedForwardMetrics);
checkMetricByName(listMetricProcessor, FORWARD_RATE, numberOfModifiedForwardMetrics);
+ checkMetricByName(listMetricProcessor, E2E_LATENCY_MIN, numberOfSourceNodes + numberOfTerminalNodes);
+ checkMetricByName(listMetricProcessor, E2E_LATENCY_MAX, numberOfSourceNodes + numberOfTerminalNodes);
+ checkMetricByName(listMetricProcessor, E2E_LATENCY_P99, numberOfSourceNodes + numberOfTerminalNodes);
+ checkMetricByName(listMetricProcessor, E2E_LATENCY_P90, numberOfSourceNodes + numberOfTerminalNodes);
}
private void checkKeyValueStoreMetrics(final String group0100To24,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index 7442b1a84a7..20c1fa3b37c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -450,6 +450,8 @@ public class ActiveTaskCreatorTest {
expect(topology.source("topic")).andReturn(sourceNode).anyTimes();
expect(sourceNode.getTimestampExtractor()).andReturn(mock(TimestampExtractor.class)).anyTimes();
expect(topology.globalStateStores()).andReturn(Collections.emptyList()).anyTimes();
+ expect(topology.terminalNodes()).andStubReturn(Collections.singleton(sourceNode.name()));
+ expect(topology.sources()).andStubReturn(Collections.singleton(sourceNode));
replay(builder, stateDirectory, topology, sourceNode);
activeTaskCreator = new ActiveTaskCreator(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 76f2de931e4..baa59f32d8e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -58,6 +58,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.function.Supplier;
+import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -135,6 +136,19 @@ public class ProcessorTopologyTest {
assertEquals(processorTopology.source("topic-2"), processorTopology.source("topic-3"));
}
+ @Test
+ public void shouldGetTerminalNodes() {
+ topology.addSource("source-1", "topic-1");
+ topology.addSource("source-2", "topic-2", "topic-3");
+ topology.addProcessor("processor-1", new MockProcessorSupplier<>(), "source-1");
+ topology.addProcessor("processor-2", new MockProcessorSupplier<>(), "source-1", "source-2");
+ topology.addSink("sink-1", "topic-3", "processor-1");
+
+ final ProcessorTopology processorTopology = topology.getInternalBuilder("X").buildTopology();
+
+ assertThat(processorTopology.terminalNodes(), equalTo(mkSet("processor-2", "sink-1")));
+ }
+
@Test
public void testDrivingSimpleTopology() {
final int partition = 10;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index c903c8e89e4..49cba19e1a0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
@@ -83,6 +84,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.StreamTask.encodeTimestamp;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG_0100_TO_24;
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByNameFilterByTags;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -133,11 +135,12 @@ public class StreamTaskTest {
private final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
private final byte[] recordValue = intSerializer.serialize(null, 10);
private final byte[] recordKey = intSerializer.serialize(null, 1);
- private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG));
- private final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(metrics);
private final String threadId = Thread.currentThread().getName();
private final TaskId taskId = new TaskId(0, 0);
- private final MockTime time = new MockTime();
+
+ private MockTime time = new MockTime();
+ private Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), time);
+ private final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(metrics);
private StateDirectory stateDirectory;
private StreamTask task;
@@ -420,6 +423,99 @@ public class StreamTaskTest {
assertThat(metric.metricValue(), equalTo(1.0d));
}
+ @Test
+ public void shouldRecordE2ELatencyOnProcessForSourceNodes() {
+ time = new MockTime(0L, 0L, 0L);
+ metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time);
+ task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
+
+ final String sourceNode = source1.name();
+
+ final Metric maxMetric = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST);
+
+ // e2e latency = 100
+ task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0L)));
+ task.process(100L);
+
+ assertThat(maxMetric.metricValue(), equalTo(100d));
+ }
+
+ @Test
+ public void shouldRecordE2ELatencyOnProcessForTerminalNodes() {
+ time = new MockTime(0L, 0L, 0L);
+ metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time);
+ task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
+
+ final String terminalNode = processorStreamTime.name();
+
+ final Metric maxMetric = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), terminalNode, StreamsConfig.METRICS_LATEST);
+
+ // e2e latency = 100
+ time.setCurrentTimeMs(100L);
+ task.maybeRecordE2ELatency(0L, terminalNode);
+
+ assertThat(maxMetric.metricValue(), equalTo(100d));
+ }
+
+ @Test
+ public void shouldRecordE2ELatencyMinAndMax() {
+ time = new MockTime(0L, 0L, 0L);
+ metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time);
+ task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
+
+ final String sourceNode = source1.name();
+
+ final Metric maxMetric = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST);
+ final Metric minMetric = getProcessorMetric("record-e2e-latency", "%s-min", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST);
+
+ assertThat(minMetric.metricValue(), equalTo(Double.NaN));
+ assertThat(maxMetric.metricValue(), equalTo(Double.NaN));
+
+ // e2e latency = 10
+ time.setCurrentTimeMs(10L);
+ task.maybeRecordE2ELatency(0L, sourceNode);
+ assertThat(minMetric.metricValue(), equalTo(10d));
+ assertThat(maxMetric.metricValue(), equalTo(10d));
+
+ // e2e latency = 15
+ time.setCurrentTimeMs(25L);
+ task.maybeRecordE2ELatency(10L, sourceNode);
+ assertThat(minMetric.metricValue(), equalTo(10d));
+ assertThat(maxMetric.metricValue(), equalTo(15d));
+
+ // e2e latency = 25
+ time.setCurrentTimeMs(30L);
+ task.maybeRecordE2ELatency(5L, sourceNode);
+ assertThat(minMetric.metricValue(), equalTo(10d));
+ assertThat(maxMetric.metricValue(), equalTo(25d));
+
+ // e2e latency = 20
+ time.setCurrentTimeMs(40L);
+ task.maybeRecordE2ELatency(35L, sourceNode);
+ assertThat(minMetric.metricValue(), equalTo(5d));
+ assertThat(maxMetric.metricValue(), equalTo(25d));
+ }
+
+ @Test
+ public void shouldRecordE2ELatencyPercentiles() {
+ time = new MockTime(0L, 0L, 0L);
+ metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time);
+ task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
+
+ final String sourceNode = source1.name();
+
+ final Metric p99Metric = getProcessorMetric("record-e2e-latency", "%s-p99", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST);
+ final Metric p90Metric = getProcessorMetric("record-e2e-latency", "%s-p90", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST);
+
+ for (int i = 0; i < 100; i++) {
+ time.setCurrentTimeMs(i);
+ task.maybeRecordE2ELatency(0L, sourceNode);
+ }
+
+ assertEquals((double) p99Metric.metricValue(), 99d, 5.0);
+ assertEquals((double) p90Metric.metricValue(), 90d, 5.0);
+ }
+
@Test
public void shouldConstructMetricsWithBuiltInMetricsVersion0100To24() {
testMetrics(StreamsConfig.METRICS_0100_TO_24);
@@ -547,6 +643,28 @@ public class StreamTaskTest {
));
}
+ private Metric getProcessorMetric(final String operation,
+ final String nameFormat,
+ final String taskId,
+ final String processorNodeId,
+ final String builtInMetricsVersion) {
+
+ return getMetricByNameFilterByTags(
+ metrics.metrics(),
+ String.format(nameFormat, operation),
+ "stream-processor-node-metrics",
+ mkMap(
+ mkEntry("task-id", taskId),
+ mkEntry("processor-node-id", processorNodeId),
+ mkEntry(
+ StreamsConfig.METRICS_LATEST.equals(builtInMetricsVersion) ? THREAD_ID_TAG
+ : THREAD_ID_TAG_0100_TO_24,
+ Thread.currentThread().getName()
+ )
+ )
+ );
+ }
+
@Test
public void shouldPauseAndResumeBasedOnBufferedRecords() {
task = createStatelessTask(createConfig(false, "100"), StreamsConfig.METRICS_LATEST);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java
index c48c6e9e964..25c4d71d41a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java
@@ -17,7 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest;
-import org.apache.kafka.streams.kstream.internals.metrics.TaskMetricsTest;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetricsTest;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java
index 8563167097a..14f370da807 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java
@@ -35,6 +35,7 @@ import java.util.Collections;
import java.util.Map;
import java.util.function.Supplier;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
import static org.hamcrest.CoreMatchers.is;
@@ -260,6 +261,38 @@ public class ProcessorNodeMetricsTest {
}
}
+ @Test
+ public void shouldGetRecordE2ELatencySensor() {
+ final String operation = "record-e2e-latency";
+ final String recordE2ELatencyMinDescription =
+ "The minimum end-to-end latency of a record, measuring by comparing the record timestamp with the "
+ + "system time when it has been fully processed by the node";
+ final String recordE2ELatencyMaxDescription =
+ "The maximum end-to-end latency of a record, measuring by comparing the record timestamp with the "
+ + "system time when it has been fully processed by the node";
+ final String recordE2ELatencyP99Description =
+ "The 99th percentile end-to-end latency of a record, measuring by comparing the record timestamp with the "
+ + "system time when it has been fully processed by the node";
+ final String recordE2ELatencyP90Description =
+ "The 90th percentile end-to-end latency of a record, measuring by comparing the record timestamp with the "
+ + "system time when it has been fully processed by the node";
+ expect(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, operation, RecordingLevel.INFO))
+ .andReturn(expectedSensor);
+ expect(streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID)).andReturn(tagMap);
+ StreamsMetricsImpl.addMinAndMaxAndP99AndP90ToSensor(
+ expectedSensor,
+ PROCESSOR_NODE_LEVEL_GROUP,
+ tagMap,
+ operation,
+ recordE2ELatencyMinDescription,
+ recordE2ELatencyMaxDescription,
+ recordE2ELatencyP99Description,
+ recordE2ELatencyP90Description
+ );
+
+ verifySensor(() -> ProcessorNodeMetrics.recordE2ELatencySensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, RecordingLevel.INFO, streamsMetrics));
+ }
+
private void shouldGetThroughputAndLatencySensorWithParentOrEmptySensor(final String metricNamePrefix,
final String descriptionOfRate,
final String descriptionOfCount,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index 75057644d81..47fec029700 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -114,6 +114,7 @@ public class StreamsMetricsImplTest {
private final String description1 = "description number one";
private final String description2 = "description number two";
private final String description3 = "description number three";
+ private final String description4 = "description number four";
private final Map clientLevelTags = mkMap(mkEntry("client-id", CLIENT_ID));
private final MetricName metricName1 =
new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, description1, clientLevelTags);
@@ -997,6 +998,20 @@ public class StreamsMetricsImplTest {
assertThat(metrics.metrics().size(), equalTo(3 + 1)); // one metric is added automatically in the constructor of Metrics
}
+ @Test
+ public void shouldAddMinAndMaxAndP99AndP90MetricsToSensor() {
+ StreamsMetricsImpl
+ .addMinAndMaxAndP99AndP90ToSensor(sensor, group, tags, metricNamePrefix, description1, description2, description3, description4);
+
+ final double valueToRecord1 = 18.0;
+ final double valueToRecord2 = 42.0;
+ verifyMetric(metricNamePrefix + "-min", description1, valueToRecord1, valueToRecord2, valueToRecord1);
+ verifyMetric(metricNamePrefix + "-max", description2, valueToRecord1, valueToRecord2, valueToRecord2);
+ verifyMetricWithinError(metricNamePrefix + "-p99", description3, valueToRecord1, valueToRecord2, valueToRecord2, 1.0);
+ verifyMetricWithinError(metricNamePrefix + "-p90", description4, valueToRecord1, valueToRecord2, valueToRecord2, 1.0);
+ assertThat(metrics.metrics().size(), equalTo(4 + 1)); // one metric is added automatically in the constructor of Metrics
+ }
+
@Test
public void shouldReturnMetricsVersionCurrent() {
assertThat(
@@ -1014,10 +1029,10 @@ public class StreamsMetricsImplTest {
}
private void verifyMetric(final String name,
- final String description,
- final double valueToRecord1,
- final double valueToRecord2,
- final double expectedMetricValue) {
+ final String description,
+ final double valueToRecord1,
+ final double valueToRecord2,
+ final double expectedMetricValue) {
final KafkaMetric metric = metrics
.metric(new MetricName(name, group, description, tags));
assertThat(metric, is(notNullValue()));
@@ -1030,6 +1045,25 @@ public class StreamsMetricsImplTest {
);
}
+ private void verifyMetricWithinError(final String name,
+ final String description,
+ final double valueToRecord1,
+ final double valueToRecord2,
+ final double expectedMetricValue,
+ final double acceptableError) {
+ final KafkaMetric metric = metrics
+ .metric(new MetricName(name, group, description, tags));
+ assertThat(metric, is(notNullValue()));
+ assertThat(metric.metricName().description(), equalTo(description));
+ sensor.record(valueToRecord1, time.milliseconds());
+ sensor.record(valueToRecord2, time.milliseconds());
+ assertEquals(
+ expectedMetricValue,
+ metric.measurable().measure(new MetricConfig(), time.milliseconds()),
+ 1.0
+ );
+ }
+
@Test
public void shouldMeasureLatency() {
final long startTime = 6;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/metrics/TaskMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java
similarity index 97%
rename from streams/src/test/java/org/apache/kafka/streams/kstream/internals/metrics/TaskMetricsTest.java
rename to streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java
index 09204d903bd..918a5aa26ae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/metrics/TaskMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java
@@ -14,15 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.kstream.internals.metrics;
+package org.apache.kafka.streams.processor.internals.metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
-import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
-import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
-import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
import org.junit.Before;
import org.junit.Test;