KAFKA-9983: KIP-613: add INFO level e2e latency metrics (#8697)

Add e2e latency metrics at the beginning and end of task topologies
as INFO-level processor-node-level metrics.

Implements: KIP-613
Reviewers: John Roesler <vvcephei@apache.org>, Andrew Choi <a24choi@edu.uwaterloo.ca>, Bruno Cadonna <cadonna@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
A. Sophie Blee-Goldman 2020-05-27 13:55:29 -07:00 committed by GitHub
parent 1672a75e1f
commit 83c616f706
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 470 additions and 22 deletions

View File

@ -170,6 +170,8 @@
files="StreamsPartitionAssignor.java"/>
<suppress checks="JavaNCSS"
files="EosBetaUpgradeIntegrationTest.java"/>
<suppress checks="StaticVariableName"
files="StreamsMetricsImpl.java"/>
<suppress checks="NPathComplexity"
files="(AssignorConfiguration|EosBetaUpgradeIntegrationTest|InternalTopologyBuilder|KafkaStreams|ProcessorStateManager|StreamsPartitionAssignor|StreamThread|TaskManager).java"/>

View File

@ -116,7 +116,7 @@ public final class Sensor {
this.lastRecordTime = time.milliseconds();
this.recordingLevel = recordingLevel;
this.metricLock = new Object();
checkForest(new HashSet<Sensor>());
checkForest(new HashSet<>());
}
/* Validate that this sensor doesn't end up referencing itself */

View File

@ -20,17 +20,20 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.common.metrics.CompoundStat;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.stats.Histogram.BinScheme;
import org.apache.kafka.common.metrics.stats.Histogram.ConstantBinScheme;
import org.apache.kafka.common.metrics.stats.Histogram.LinearBinScheme;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A compound stat that reports one or more percentiles
*/
public class Percentiles extends SampledStat implements CompoundStat {
private final Logger log = LoggerFactory.getLogger(Percentiles.class);
public enum BucketSizing {
CONSTANT, LINEAR
}
@ -38,6 +41,8 @@ public class Percentiles extends SampledStat implements CompoundStat {
private final int buckets;
private final Percentile[] percentiles;
private final BinScheme binScheme;
private final double min;
private final double max;
public Percentiles(int sizeInBytes, double max, BucketSizing bucketing, Percentile... percentiles) {
this(sizeInBytes, 0.0, max, bucketing, percentiles);
@ -47,6 +52,8 @@ public class Percentiles extends SampledStat implements CompoundStat {
super(0.0);
this.percentiles = percentiles;
this.buckets = sizeInBytes / 4;
this.min = min;
this.max = max;
if (bucketing == BucketSizing.CONSTANT) {
this.binScheme = new ConstantBinScheme(buckets, min, max);
} else if (bucketing == BucketSizing.LINEAR) {
@ -63,11 +70,10 @@ public class Percentiles extends SampledStat implements CompoundStat {
List<NamedMeasurable> ms = new ArrayList<>(this.percentiles.length);
for (Percentile percentile : this.percentiles) {
final double pct = percentile.percentile();
ms.add(new NamedMeasurable(percentile.name(), new Measurable() {
public double measure(MetricConfig config, long now) {
return value(config, now, pct / 100.0);
}
}));
ms.add(new NamedMeasurable(
percentile.name(),
(config, now) -> value(config, now, pct / 100.0))
);
}
return ms;
}
@ -105,8 +111,21 @@ public class Percentiles extends SampledStat implements CompoundStat {
@Override
protected void update(Sample sample, MetricConfig config, double value, long timeMs) {
final double boundedValue;
if (value > max) {
log.warn("Received value {} which is greater than max recordable value {}, will be pinned to the max value",
value, max);
boundedValue = max;
} else if (value < min) {
log.warn("Received value {} which is less than min recordable value {}, will be pinned to the min value",
value, min);
boundedValue = min;
} else {
boundedValue = value;
}
HistogramSample hist = (HistogramSample) sample;
hist.histogram.record(value);
hist.histogram.record(boundedValue);
}
private static class HistogramSample extends SampledStat.Sample {

View File

@ -40,7 +40,7 @@ public abstract class SampledStat implements MeasurableStat {
public SampledStat(double initialValue) {
this.initialValue = initialValue;
this.samples = new ArrayList<Sample>(2);
this.samples = new ArrayList<>(2);
}
@Override

View File

@ -25,6 +25,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
@ -492,6 +493,88 @@ public class MetricsTest {
assertEquals(75, (Double) p75.metricValue(), 1.0);
}
@Test
public void shouldPinSmallerValuesToMin() {
final double min = 0.0d;
final double max = 100d;
Percentiles percs = new Percentiles(1000,
min,
max,
BucketSizing.LINEAR,
new Percentile(metrics.metricName("test.p50", "grp1"), 50));
MetricConfig config = new MetricConfig().eventWindow(50).samples(2);
Sensor sensor = metrics.sensor("test", config);
sensor.add(percs);
Metric p50 = this.metrics.metrics().get(metrics.metricName("test.p50", "grp1"));
sensor.record(min - 100);
sensor.record(min - 100);
assertEquals(min, (double) p50.metricValue(), 0d);
}
@Test
public void shouldPinLargerValuesToMax() {
final double min = 0.0d;
final double max = 100d;
Percentiles percs = new Percentiles(1000,
min,
max,
BucketSizing.LINEAR,
new Percentile(metrics.metricName("test.p50", "grp1"), 50));
MetricConfig config = new MetricConfig().eventWindow(50).samples(2);
Sensor sensor = metrics.sensor("test", config);
sensor.add(percs);
Metric p50 = this.metrics.metrics().get(metrics.metricName("test.p50", "grp1"));
sensor.record(max + 100);
sensor.record(max + 100);
assertEquals(max, (double) p50.metricValue(), 0d);
}
@Test
public void testPercentilesWithRandomNumbersAndLinearBucketing() {
long seed = new Random().nextLong();
int sizeInBytes = 1000 * 1000; // 1MB
long maximumValue = 1000 * 24 * 60 * 60 * 1000L; // if values are ms, max is 1000 days
try {
Random prng = new Random(seed);
int numberOfValues = 5000 + prng.nextInt(10_000); // ranges is [5000, 15000]
Percentiles percs = new Percentiles(sizeInBytes,
maximumValue,
BucketSizing.LINEAR,
new Percentile(metrics.metricName("test.p90", "grp1"), 90),
new Percentile(metrics.metricName("test.p99", "grp1"), 99));
MetricConfig config = new MetricConfig().eventWindow(50).samples(2);
Sensor sensor = metrics.sensor("test", config);
sensor.add(percs);
Metric p90 = this.metrics.metrics().get(metrics.metricName("test.p90", "grp1"));
Metric p99 = this.metrics.metrics().get(metrics.metricName("test.p99", "grp1"));
final List<Long> values = new ArrayList<>(numberOfValues);
// record two windows worth of sequential values
for (int i = 0; i < numberOfValues; ++i) {
long value = (Math.abs(prng.nextLong()) - 1) % maximumValue;
values.add(value);
sensor.record(value);
}
Collections.sort(values);
int p90Index = (int) Math.ceil(((double) (90 * numberOfValues)) / 100);
int p99Index = (int) Math.ceil(((double) (99 * numberOfValues)) / 100);
double expectedP90 = values.get(p90Index - 1);
double expectedP99 = values.get(p99Index - 1);
assertEquals(expectedP90, (Double) p90.metricValue(), expectedP90 / 10);
assertEquals(expectedP99, (Double) p99.metricValue(), expectedP99 / 10);
} catch (AssertionError e) {
throw new AssertionError("Assertion failed in randomized test. Reproduce with seed = " + seed + " .", e);
}
}
@Test
public void testRateWindowing() throws Exception {
// Use the default time window. Set 3 samples

View File

@ -224,6 +224,9 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
final V value) {
setCurrentNode(child);
child.process(key, value);
if (child.isTerminalNode()) {
streamTask.maybeRecordE2ELatency(timestamp(), child.name());
}
}
@Override

View File

@ -161,6 +161,10 @@ public class ProcessorNode<K, V> {
maybeMeasureLatency(() -> punctuator.punctuate(timestamp), time, punctuateSensor);
}
public boolean isTerminalNode() {
return children.isEmpty();
}
/**
* @return a string representation of this node, useful for debugging.
*/

View File

@ -29,6 +29,7 @@ public class ProcessorTopology {
private final List<ProcessorNode<?, ?>> processorNodes;
private final Map<String, SourceNode<?, ?>> sourcesByTopic;
private final Map<String, SinkNode<?, ?>> sinksByTopic;
private final Set<String> terminalNodes;
private final List<StateStore> stateStores;
private final Set<String> repartitionTopics;
@ -50,6 +51,13 @@ public class ProcessorTopology {
this.globalStateStores = Collections.unmodifiableList(globalStateStores);
this.storeToChangelogTopic = Collections.unmodifiableMap(storeToChangelogTopic);
this.repartitionTopics = Collections.unmodifiableSet(repartitionTopics);
this.terminalNodes = new HashSet<>();
for (final ProcessorNode<?, ?> node : processorNodes) {
if (node.isTerminalNode()) {
terminalNodes.add(node.name());
}
}
}
public Set<String> sourceTopics() {
@ -72,6 +80,10 @@ public class ProcessorTopology {
return sinksByTopic.get(topic);
}
public Set<String> terminalNodes() {
return terminalNodes;
}
public List<ProcessorNode<?, ?>> processors() {
return processorNodes;
}

View File

@ -24,6 +24,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
@ -37,6 +38,7 @@ import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
@ -97,6 +99,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
private final Sensor punctuateLatencySensor;
private final Sensor bufferedRecordsSensor;
private final Sensor enforcedProcessingSensor;
private final Map<String, Sensor> e2eLatencySensors = new HashMap<>();
private final InternalProcessorContext processorContext;
private final RecordQueueCreator recordQueueCreator;
@ -142,6 +145,21 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics);
bufferedRecordsSensor = TaskMetrics.activeBufferedRecordsSensor(threadId, taskId, streamsMetrics);
for (final String terminalNode : topology.terminalNodes()) {
e2eLatencySensors.put(
terminalNode,
ProcessorNodeMetrics.recordE2ELatencySensor(threadId, taskId, terminalNode, RecordingLevel.INFO, streamsMetrics)
);
}
for (final ProcessorNode<?, ?> sourceNode : topology.sources()) {
final String processorId = sourceNode.name();
e2eLatencySensors.put(
processorId,
ProcessorNodeMetrics.recordE2ELatencySensor(threadId, taskId, processorId, RecordingLevel.INFO, streamsMetrics)
);
}
streamTimePunctuationQueue = new PunctuationQueue();
systemTimePunctuationQueue = new PunctuationQueue();
maxTaskIdleMs = config.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
@ -587,6 +605,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
log.trace("Start processing one record [{}]", record);
updateProcessorContext(record, currNode, wallClockTime);
maybeRecordE2ELatency(record.timestamp, wallClockTime, currNode.name());
maybeMeasureLatency(() -> currNode.process(record.key(), record.value()), time, processLatencySensor);
log.trace("Completed processing one record [{}]", record);
@ -900,6 +919,19 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
return punctuated;
}
void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) {
maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName);
}
private void maybeRecordE2ELatency(final long recordTimestamp, final long now, final String nodeName) {
final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName);
if (e2eLatencySensor == null) {
throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName);
} else if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {
e2eLatencySensor.record(now - recordTimestamp, now);
}
}
/**
* Request committing the current task's state
*/

View File

@ -29,6 +29,7 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_DESCRIPTION;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addMinAndMaxAndP99AndP90ToSensor;
public class ProcessorNodeMetrics {
private ProcessorNodeMetrics() {}
@ -98,6 +99,15 @@ public class ProcessorNodeMetrics {
private static final String LATE_RECORD_DROP_RATE_DESCRIPTION =
RATE_DESCRIPTION_PREFIX + LATE_RECORD_DROP_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
private static final String RECORD_E2E_LATENCY = "record-e2e-latency";
private static final String RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX =
"end-to-end latency of a record, measuring by comparing the record timestamp with the "
+ "system time when it has been fully processed by the node";
private static final String RECORD_E2E_LATENCY_MIN_DESCRIPTION = "The minimum " + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX;
private static final String RECORD_E2E_LATENCY_MAX_DESCRIPTION = "The maximum " + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX;
private static final String RECORD_E2E_LATENCY_P99_DESCRIPTION = "The 99th percentile " + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX;
private static final String RECORD_E2E_LATENCY_P90_DESCRIPTION = "The 90th percentile " + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX;
public static Sensor suppressionEmitSensor(final String threadId,
final String taskId,
final String processorNodeId,
@ -289,6 +299,26 @@ public class ProcessorNodeMetrics {
return processAtSourceSensor(threadId, taskId, processorNodeId, streamsMetrics);
}
public static Sensor recordE2ELatencySensor(final String threadId,
final String taskId,
final String processorNodeId,
final RecordingLevel recordingLevel,
final StreamsMetricsImpl streamsMetrics) {
final Sensor sensor = streamsMetrics.nodeLevelSensor(threadId, taskId, processorNodeId, RECORD_E2E_LATENCY, recordingLevel);
final Map<String, String> tagMap = streamsMetrics.nodeLevelTagMap(threadId, taskId, processorNodeId);
addMinAndMaxAndP99AndP90ToSensor(
sensor,
PROCESSOR_NODE_LEVEL_GROUP,
tagMap,
RECORD_E2E_LATENCY,
RECORD_E2E_LATENCY_MIN_DESCRIPTION,
RECORD_E2E_LATENCY_MAX_DESCRIPTION,
RECORD_E2E_LATENCY_P99_DESCRIPTION,
RECORD_E2E_LATENCY_P90_DESCRIPTION
);
return sensor;
}
private static Sensor throughputAndLatencySensorWithParent(final String threadId,
final String taskId,
final String processorNodeId,

View File

@ -28,6 +28,9 @@ import org.apache.kafka.common.metrics.stats.CumulativeCount;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.metrics.stats.Percentile;
import org.apache.kafka.common.metrics.stats.Percentiles;
import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.metrics.stats.WindowedCount;
@ -128,6 +131,8 @@ public class StreamsMetricsImpl implements StreamsMetrics {
public static final String RATE_SUFFIX = "-rate";
public static final String TOTAL_SUFFIX = "-total";
public static final String RATIO_SUFFIX = "-ratio";
public static final String P99_SUFFIX = "-p99";
public static final String P90_SUFFIX = "-p90";
public static final String GROUP_PREFIX_WO_DELIMITER = "stream";
public static final String GROUP_PREFIX = GROUP_PREFIX_WO_DELIMITER + "-";
@ -149,6 +154,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
public static final String RATE_DESCRIPTION_PREFIX = "The average number of ";
public static final String RATE_DESCRIPTION_SUFFIX = " per second";
private static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000; // 1 MB
private static double MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000d; // maximum latency is 10 days; values above that will be pinned
public StreamsMetricsImpl(final Metrics metrics, final String clientId, final String builtInMetricsVersion) {
Objects.requireNonNull(metrics, "Metrics cannot be null");
Objects.requireNonNull(builtInMetricsVersion, "Built-in metrics version cannot be null");
@ -642,6 +650,54 @@ public class StreamsMetricsImpl implements StreamsMetrics {
);
}
public static void addMinAndMaxAndP99AndP90ToSensor(final Sensor sensor,
final String group,
final Map<String, String> tags,
final String operation,
final String descriptionOfMin,
final String descriptionOfMax,
final String descriptionOfP99,
final String descriptionOfP90) {
sensor.add(
new MetricName(
operation + MIN_SUFFIX,
group,
descriptionOfMin,
tags),
new Min()
);
sensor.add(
new MetricName(
operation + MAX_SUFFIX,
group,
descriptionOfMax,
tags),
new Max()
);
sensor.add(
new Percentiles(
PERCENTILES_SIZE_IN_BYTES,
MAXIMUM_E2E_LATENCY,
BucketSizing.LINEAR,
new Percentile(
new MetricName(
operation + P99_SUFFIX,
group,
descriptionOfP99,
tags),
99),
new Percentile(
new MetricName(
operation + P90_SUFFIX,
group,
descriptionOfP90,
tags),
90))
);
}
public static void addAvgAndMaxLatencyToSensor(final Sensor sensor,
final String group,
final Map<String, String> tags,

View File

@ -204,6 +204,10 @@ public class MetricsIntegrationTest {
private static final String SUPPRESSION_BUFFER_COUNT_MAX = "suppression-buffer-count-max";
private static final String EXPIRED_WINDOW_RECORD_DROP_RATE = "expired-window-record-drop-rate";
private static final String EXPIRED_WINDOW_RECORD_DROP_TOTAL = "expired-window-record-drop-total";
private static final String E2E_LATENCY_MIN = "record-e2e-latency-min";
private static final String E2E_LATENCY_MAX = "record-e2e-latency-max";
private static final String E2E_LATENCY_P99 = "record-e2e-latency-p99";
private static final String E2E_LATENCY_P90 = "record-e2e-latency-p90";
// stores name
private static final String TIME_WINDOWED_AGGREGATED_STREAM_STORE = "time-windowed-aggregated-stream-store";
@ -582,6 +586,8 @@ public class MetricsIntegrationTest {
final int numberOfRemovedMetrics = StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion) ? 18 : 0;
final int numberOfModifiedProcessMetrics = StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion) ? 18 : 4;
final int numberOfModifiedForwardMetrics = StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion) ? 8 : 0;
final int numberOfSourceNodes = 4;
final int numberOfTerminalNodes = 4;
checkMetricByName(listMetricProcessor, PROCESS_LATENCY_AVG, numberOfRemovedMetrics);
checkMetricByName(listMetricProcessor, PROCESS_LATENCY_MAX, numberOfRemovedMetrics);
checkMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_AVG, numberOfRemovedMetrics);
@ -600,6 +606,10 @@ public class MetricsIntegrationTest {
checkMetricByName(listMetricProcessor, DESTROY_TOTAL, numberOfRemovedMetrics);
checkMetricByName(listMetricProcessor, FORWARD_TOTAL, numberOfModifiedForwardMetrics);
checkMetricByName(listMetricProcessor, FORWARD_RATE, numberOfModifiedForwardMetrics);
checkMetricByName(listMetricProcessor, E2E_LATENCY_MIN, numberOfSourceNodes + numberOfTerminalNodes);
checkMetricByName(listMetricProcessor, E2E_LATENCY_MAX, numberOfSourceNodes + numberOfTerminalNodes);
checkMetricByName(listMetricProcessor, E2E_LATENCY_P99, numberOfSourceNodes + numberOfTerminalNodes);
checkMetricByName(listMetricProcessor, E2E_LATENCY_P90, numberOfSourceNodes + numberOfTerminalNodes);
}
private void checkKeyValueStoreMetrics(final String group0100To24,

View File

@ -450,6 +450,8 @@ public class ActiveTaskCreatorTest {
expect(topology.source("topic")).andReturn(sourceNode).anyTimes();
expect(sourceNode.getTimestampExtractor()).andReturn(mock(TimestampExtractor.class)).anyTimes();
expect(topology.globalStateStores()).andReturn(Collections.emptyList()).anyTimes();
expect(topology.terminalNodes()).andStubReturn(Collections.singleton(sourceNode.name()));
expect(topology.sources()).andStubReturn(Collections.singleton(sourceNode));
replay(builder, stateDirectory, topology, sourceNode);
activeTaskCreator = new ActiveTaskCreator(

View File

@ -58,6 +58,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.function.Supplier;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@ -135,6 +136,19 @@ public class ProcessorTopologyTest {
assertEquals(processorTopology.source("topic-2"), processorTopology.source("topic-3"));
}
@Test
public void shouldGetTerminalNodes() {
topology.addSource("source-1", "topic-1");
topology.addSource("source-2", "topic-2", "topic-3");
topology.addProcessor("processor-1", new MockProcessorSupplier<>(), "source-1");
topology.addProcessor("processor-2", new MockProcessorSupplier<>(), "source-1", "source-2");
topology.addSink("sink-1", "topic-3", "processor-1");
final ProcessorTopology processorTopology = topology.getInternalBuilder("X").buildTopology();
assertThat(processorTopology.terminalNodes(), equalTo(mkSet("processor-2", "sink-1")));
}
@Test
public void testDrivingSimpleTopology() {
final int partition = 10;

View File

@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
@ -83,6 +84,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.StreamTask.encodeTimestamp;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG_0100_TO_24;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByNameFilterByTags;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
@ -133,11 +135,12 @@ public class StreamTaskTest {
private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
private final byte[] recordValue = intSerializer.serialize(null, 10);
private final byte[] recordKey = intSerializer.serialize(null, 1);
private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG));
private final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(metrics);
private final String threadId = Thread.currentThread().getName();
private final TaskId taskId = new TaskId(0, 0);
private final MockTime time = new MockTime();
private MockTime time = new MockTime();
private Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), time);
private final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(metrics);
private StateDirectory stateDirectory;
private StreamTask task;
@ -420,6 +423,99 @@ public class StreamTaskTest {
assertThat(metric.metricValue(), equalTo(1.0d));
}
@Test
public void shouldRecordE2ELatencyOnProcessForSourceNodes() {
time = new MockTime(0L, 0L, 0L);
metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time);
task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
final String sourceNode = source1.name();
final Metric maxMetric = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST);
// e2e latency = 100
task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0L)));
task.process(100L);
assertThat(maxMetric.metricValue(), equalTo(100d));
}
@Test
public void shouldRecordE2ELatencyOnProcessForTerminalNodes() {
time = new MockTime(0L, 0L, 0L);
metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time);
task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
final String terminalNode = processorStreamTime.name();
final Metric maxMetric = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), terminalNode, StreamsConfig.METRICS_LATEST);
// e2e latency = 100
time.setCurrentTimeMs(100L);
task.maybeRecordE2ELatency(0L, terminalNode);
assertThat(maxMetric.metricValue(), equalTo(100d));
}
@Test
public void shouldRecordE2ELatencyMinAndMax() {
time = new MockTime(0L, 0L, 0L);
metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time);
task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
final String sourceNode = source1.name();
final Metric maxMetric = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST);
final Metric minMetric = getProcessorMetric("record-e2e-latency", "%s-min", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST);
assertThat(minMetric.metricValue(), equalTo(Double.NaN));
assertThat(maxMetric.metricValue(), equalTo(Double.NaN));
// e2e latency = 10
time.setCurrentTimeMs(10L);
task.maybeRecordE2ELatency(0L, sourceNode);
assertThat(minMetric.metricValue(), equalTo(10d));
assertThat(maxMetric.metricValue(), equalTo(10d));
// e2e latency = 15
time.setCurrentTimeMs(25L);
task.maybeRecordE2ELatency(10L, sourceNode);
assertThat(minMetric.metricValue(), equalTo(10d));
assertThat(maxMetric.metricValue(), equalTo(15d));
// e2e latency = 25
time.setCurrentTimeMs(30L);
task.maybeRecordE2ELatency(5L, sourceNode);
assertThat(minMetric.metricValue(), equalTo(10d));
assertThat(maxMetric.metricValue(), equalTo(25d));
// e2e latency = 20
time.setCurrentTimeMs(40L);
task.maybeRecordE2ELatency(35L, sourceNode);
assertThat(minMetric.metricValue(), equalTo(5d));
assertThat(maxMetric.metricValue(), equalTo(25d));
}
@Test
public void shouldRecordE2ELatencyPercentiles() {
time = new MockTime(0L, 0L, 0L);
metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time);
task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
final String sourceNode = source1.name();
final Metric p99Metric = getProcessorMetric("record-e2e-latency", "%s-p99", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST);
final Metric p90Metric = getProcessorMetric("record-e2e-latency", "%s-p90", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST);
for (int i = 0; i < 100; i++) {
time.setCurrentTimeMs(i);
task.maybeRecordE2ELatency(0L, sourceNode);
}
assertEquals((double) p99Metric.metricValue(), 99d, 5.0);
assertEquals((double) p90Metric.metricValue(), 90d, 5.0);
}
@Test
public void shouldConstructMetricsWithBuiltInMetricsVersion0100To24() {
testMetrics(StreamsConfig.METRICS_0100_TO_24);
@ -547,6 +643,28 @@ public class StreamTaskTest {
));
}
private Metric getProcessorMetric(final String operation,
final String nameFormat,
final String taskId,
final String processorNodeId,
final String builtInMetricsVersion) {
return getMetricByNameFilterByTags(
metrics.metrics(),
String.format(nameFormat, operation),
"stream-processor-node-metrics",
mkMap(
mkEntry("task-id", taskId),
mkEntry("processor-node-id", processorNodeId),
mkEntry(
StreamsConfig.METRICS_LATEST.equals(builtInMetricsVersion) ? THREAD_ID_TAG
: THREAD_ID_TAG_0100_TO_24,
Thread.currentThread().getName()
)
)
);
}
@Test
public void shouldPauseAndResumeBasedOnBufferedRecords() {
task = createStatelessTask(createConfig(false, "100"), StreamsConfig.METRICS_LATEST);

View File

@ -17,7 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest;
import org.apache.kafka.streams.kstream.internals.metrics.TaskMetricsTest;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetricsTest;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;

View File

@ -35,6 +35,7 @@ import java.util.Collections;
import java.util.Map;
import java.util.function.Supplier;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
import static org.hamcrest.CoreMatchers.is;
@ -260,6 +261,38 @@ public class ProcessorNodeMetricsTest {
}
}
@Test
public void shouldGetRecordE2ELatencySensor() {
final String operation = "record-e2e-latency";
final String recordE2ELatencyMinDescription =
"The minimum end-to-end latency of a record, measuring by comparing the record timestamp with the "
+ "system time when it has been fully processed by the node";
final String recordE2ELatencyMaxDescription =
"The maximum end-to-end latency of a record, measuring by comparing the record timestamp with the "
+ "system time when it has been fully processed by the node";
final String recordE2ELatencyP99Description =
"The 99th percentile end-to-end latency of a record, measuring by comparing the record timestamp with the "
+ "system time when it has been fully processed by the node";
final String recordE2ELatencyP90Description =
"The 90th percentile end-to-end latency of a record, measuring by comparing the record timestamp with the "
+ "system time when it has been fully processed by the node";
expect(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, operation, RecordingLevel.INFO))
.andReturn(expectedSensor);
expect(streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID)).andReturn(tagMap);
StreamsMetricsImpl.addMinAndMaxAndP99AndP90ToSensor(
expectedSensor,
PROCESSOR_NODE_LEVEL_GROUP,
tagMap,
operation,
recordE2ELatencyMinDescription,
recordE2ELatencyMaxDescription,
recordE2ELatencyP99Description,
recordE2ELatencyP90Description
);
verifySensor(() -> ProcessorNodeMetrics.recordE2ELatencySensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, RecordingLevel.INFO, streamsMetrics));
}
private void shouldGetThroughputAndLatencySensorWithParentOrEmptySensor(final String metricNamePrefix,
final String descriptionOfRate,
final String descriptionOfCount,

View File

@ -114,6 +114,7 @@ public class StreamsMetricsImplTest {
private final String description1 = "description number one";
private final String description2 = "description number two";
private final String description3 = "description number three";
private final String description4 = "description number four";
private final Map<String, String> clientLevelTags = mkMap(mkEntry("client-id", CLIENT_ID));
private final MetricName metricName1 =
new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, description1, clientLevelTags);
@ -997,6 +998,20 @@ public class StreamsMetricsImplTest {
assertThat(metrics.metrics().size(), equalTo(3 + 1)); // one metric is added automatically in the constructor of Metrics
}
@Test
public void shouldAddMinAndMaxAndP99AndP90MetricsToSensor() {
StreamsMetricsImpl
.addMinAndMaxAndP99AndP90ToSensor(sensor, group, tags, metricNamePrefix, description1, description2, description3, description4);
final double valueToRecord1 = 18.0;
final double valueToRecord2 = 42.0;
verifyMetric(metricNamePrefix + "-min", description1, valueToRecord1, valueToRecord2, valueToRecord1);
verifyMetric(metricNamePrefix + "-max", description2, valueToRecord1, valueToRecord2, valueToRecord2);
verifyMetricWithinError(metricNamePrefix + "-p99", description3, valueToRecord1, valueToRecord2, valueToRecord2, 1.0);
verifyMetricWithinError(metricNamePrefix + "-p90", description4, valueToRecord1, valueToRecord2, valueToRecord2, 1.0);
assertThat(metrics.metrics().size(), equalTo(4 + 1)); // one metric is added automatically in the constructor of Metrics
}
@Test
public void shouldReturnMetricsVersionCurrent() {
assertThat(
@ -1014,10 +1029,10 @@ public class StreamsMetricsImplTest {
}
private void verifyMetric(final String name,
final String description,
final double valueToRecord1,
final double valueToRecord2,
final double expectedMetricValue) {
final String description,
final double valueToRecord1,
final double valueToRecord2,
final double expectedMetricValue) {
final KafkaMetric metric = metrics
.metric(new MetricName(name, group, description, tags));
assertThat(metric, is(notNullValue()));
@ -1030,6 +1045,25 @@ public class StreamsMetricsImplTest {
);
}
private void verifyMetricWithinError(final String name,
final String description,
final double valueToRecord1,
final double valueToRecord2,
final double expectedMetricValue,
final double acceptableError) {
final KafkaMetric metric = metrics
.metric(new MetricName(name, group, description, tags));
assertThat(metric, is(notNullValue()));
assertThat(metric.metricName().description(), equalTo(description));
sensor.record(valueToRecord1, time.milliseconds());
sensor.record(valueToRecord2, time.milliseconds());
assertEquals(
expectedMetricValue,
metric.measurable().measure(new MetricConfig(), time.milliseconds()),
1.0
);
}
@Test
public void shouldMeasureLatency() {
final long startTime = 6;

View File

@ -14,15 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.metrics;
package org.apache.kafka.streams.processor.internals.metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
import org.junit.Before;
import org.junit.Test;