KAFKA-7223: Add late-record metrics (#5742)

Add late record metrics, as specified in KIP-328

Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
This commit is contained in:
John Roesler 2018-10-12 11:12:51 -05:00 committed by Guozhang Wang
parent f393b2f7dd
commit 21f88a595b
10 changed files with 248 additions and 120 deletions

View File

@ -974,6 +974,7 @@ project(':streams') {
testCompile libs.junit
testCompile libs.easymock
testCompile libs.bcpkix
testCompile libs.hamcrest
testRuntimeOnly project(':streams:test-utils')
testRuntime libs.slf4jlog4j

View File

@ -57,6 +57,7 @@ versions += [
jetty: "9.4.12.v20180830",
jersey: "2.27",
jmh: "1.21",
hamcrest: "1.3",
log4j: "1.2.17",
scalaLogging: "3.9.0",
jaxb: "2.3.0",
@ -117,6 +118,7 @@ libs += [
jmhGeneratorAnnProcess: "org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh",
joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",
junit: "junit:junit:$versions.junit",
hamcrest: "org.hamcrest:hamcrest-all:1.3",
kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100",
kafkaStreams_0101: "org.apache.kafka:kafka-streams:$versions.kafka_0101",
kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102",

View File

@ -16,10 +16,15 @@
*/
package org.apache.kafka.streams.kstream.internals.metrics;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import java.util.Map;
public class Sensors {
private Sensors() {}
@ -39,4 +44,35 @@ public class Sensors {
);
return sensor;
}
public static Sensor recordLatenessSensor(final InternalProcessorContext context) {
final StreamsMetricsImpl metrics = context.metrics();
final Sensor sensor = metrics.taskLevelSensor(
context.taskId().toString(),
"record-lateness",
Sensor.RecordingLevel.DEBUG
);
final Map<String, String> tags = metrics.tagMap(
"task-id", context.taskId().toString()
);
sensor.add(
new MetricName(
"record-lateness-avg",
"stream-processor-node-metrics",
"The average observed lateness of records.",
tags),
new Avg()
);
sensor.add(
new MetricName(
"record-lateness-max",
"stream-processor-node-metrics",
"The max observed lateness of records.",
tags),
new Max()
);
return sensor;
}
}

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import java.util.Collections;
import java.util.Comparator;
@ -38,6 +39,7 @@ import java.util.Set;
public class PartitionGroup {
private final Map<TopicPartition, RecordQueue> partitionQueues;
private final Sensor recordLatenessSensor;
private final PriorityQueue<RecordQueue> nonEmptyQueuesByTime;
private long streamTime;
@ -61,9 +63,10 @@ public class PartitionGroup {
}
}
PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues) {
PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues, final Sensor recordLatenessSensor) {
nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::timestamp));
this.partitionQueues = partitionQueues;
this.recordLatenessSensor = recordLatenessSensor;
totalBuffered = 0;
allBuffered = false;
streamTime = RecordQueue.UNKNOWN;
@ -95,7 +98,12 @@ public class PartitionGroup {
}
// always update the stream time to the record's timestamp yet to be processed if it is larger
streamTime = Math.max(streamTime, record.timestamp);
if (record.timestamp > streamTime) {
streamTime = record.timestamp;
recordLatenessSensor.record(0);
} else {
recordLatenessSensor.record(streamTime - record.timestamp);
}
}
}

View File

@ -52,6 +52,7 @@ import java.util.concurrent.TimeUnit;
import static java.lang.String.format;
import static java.util.Collections.singleton;
import static org.apache.kafka.streams.kstream.internals.metrics.Sensors.recordLatenessSensor;
/**
* A StreamTask is associated with a {@link PartitionGroup}, and is assigned to a StreamThread for processing.
@ -234,7 +235,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
}
recordInfo = new PartitionGroup.RecordInfo();
partitionGroup = new PartitionGroup(partitionQueues);
partitionGroup = new PartitionGroup(partitionQueues, recordLatenessSensor(processorContextImpl));
processorContextImpl.setStreamTimeSupplier(partitionGroup::timestamp);
stateMgr.registerGlobalStateStores(topology.globalStateStores());

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
@ -55,7 +54,9 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -110,7 +111,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
final StoreBuilder<SessionStore<String, Long>> storeBuilder = Stores.sessionStoreBuilder(Stores.persistentSessionStore(STORE_NAME, ofMillis(GAP_MS * 3)),
Serdes.String(),
Serdes.Long())
.withLoggingDisabled();
.withLoggingDisabled();
if (enableCaching) {
storeBuilder.withCachingEnabled();
@ -335,9 +336,11 @@ public class KStreamSessionWindowAggregateProcessorTest {
context.setStreamTime(20);
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
processor.process("A", "1");
context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null));
processor.process("A", "1");
LogCaptureAppender.unregister(appender);
final Metric dropMetric = metrics.metrics().get(new MetricName(
final MetricName dropMetric = new MetricName(
"late-record-drop-total",
"stream-processor-node-metrics",
"The total number of occurrence of late-record-drop operations.",
@ -346,8 +349,24 @@ public class KStreamSessionWindowAggregateProcessorTest {
mkEntry("task-id", "0_0"),
mkEntry("processor-node-id", "TESTING_NODE")
)
));
assertEquals(1.0, dropMetric.metricValue());
);
assertThat(metrics.metrics().get(dropMetric).metricValue(), is(2.0));
final MetricName dropRate = new MetricName(
"late-record-drop-rate",
"stream-processor-node-metrics",
"The average number of occurrence of late-record-drop operations.",
mkMap(
mkEntry("client-id", "test"),
mkEntry("task-id", "0_0"),
mkEntry("processor-node-id", "TESTING_NODE")
)
);
assertThat((Double) metrics.metrics().get(dropRate).metricValue(), greaterThan(0.0));
assertThat(appender.getMessages(), hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0) expiration=[10]"));
assertThat(appender.getMessages(), hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[1] window=[1,1) expiration=[10]"));
}
}

View File

@ -27,6 +27,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
@ -42,6 +43,7 @@ import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.Matcher;
import org.junit.Test;
import java.util.List;
@ -51,9 +53,10 @@ import static java.time.Duration.ofMillis;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
@ -70,7 +73,7 @@ public class KStreamWindowAggregateTest {
final KTable<Windowed<String>, String> table2 = builder
.stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)))
.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String()));
@ -128,7 +131,7 @@ public class KStreamWindowAggregateTest {
final KTable<Windowed<String>, String> table1 = builder
.stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)))
.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String()));
@ -137,7 +140,7 @@ public class KStreamWindowAggregateTest {
final KTable<Windowed<String>, String> table2 = builder
.stream(topic2, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)))
.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic2-Canonized").withValueSerde(Serdes.String()));
@ -231,8 +234,9 @@ public class KStreamWindowAggregateTest {
final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic";
final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
builder
.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)))
.aggregate(
MockInitializer.STRING_INIT,
@ -258,15 +262,15 @@ public class KStreamWindowAggregateTest {
final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).until(100))
.aggregate(
() -> "",
MockAggregator.toStringInstance("+"),
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
)
.toStream()
.map((key, value) -> new KeyValue<>(key.toString(), value))
.to("output");
.windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).until(100))
.aggregate(
() -> "",
MockAggregator.toStringInstance("+"),
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
)
.toStream()
.map((key, value) -> new KeyValue<>(key.toString(), value))
.to("output");
LogCaptureAppender.setClassLoggerToDebug(KStreamWindowAggregate.class);
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
@ -281,17 +285,13 @@ public class KStreamWindowAggregateTest {
driver.pipeInput(recordFactory.create(topic, "k", "6", 6L));
LogCaptureAppender.unregister(appender);
final MetricName metricName = new MetricName(
"late-record-drop-total",
"stream-processor-node-metrics",
"The total number of occurrence of late-record-drop operations.",
mkMap(
mkEntry("client-id", "topology-test-driver-virtual-thread"),
mkEntry("task-id", "0_0"),
mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001")
)
assertLatenessMetrics(
driver,
is(7.0), // how many events get dropped
is(100.0), // k:0 is 100ms late, since its time is 0, but it arrives at stream time 100.
is(84.875) // (0 + 100 + 99 + 98 + 97 + 96 + 95 + 94) / 8
);
assertThat(driver.metrics().get(metricName).metricValue(), equalTo(7.0));
assertThat(appender.getMessages(), hasItems(
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10]",
@ -316,59 +316,101 @@ public class KStreamWindowAggregateTest {
final String topic = "topic";
final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).grace(ofMillis(90L)))
.aggregate(
() -> "",
MockAggregator.toStringInstance("+"),
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
)
.toStream()
.map((key, value) -> new KeyValue<>(key.toString(), value))
.to("output");
stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(10)).grace(ofMillis(90L)))
.aggregate(
() -> "",
MockAggregator.toStringInstance("+"),
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
)
.toStream()
.map((key, value) -> new KeyValue<>(key.toString(), value))
.to("output");
LogCaptureAppender.setClassLoggerToDebug(KStreamWindowAggregate.class);
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
driver.pipeInput(recordFactory.create(topic, "k", "100", 100L));
driver.pipeInput(recordFactory.create(topic, "k", "0", 0L));
driver.pipeInput(recordFactory.create(topic, "k", "1", 1L));
driver.pipeInput(recordFactory.create(topic, "k", "2", 2L));
driver.pipeInput(recordFactory.create(topic, "k", "3", 3L));
driver.pipeInput(recordFactory.create(topic, "k", "4", 4L));
driver.pipeInput(recordFactory.create(topic, "k", "5", 5L));
driver.pipeInput(recordFactory.create(topic, "k", "100", 200L));
driver.pipeInput(recordFactory.create(topic, "k", "0", 100L));
driver.pipeInput(recordFactory.create(topic, "k", "1", 101L));
driver.pipeInput(recordFactory.create(topic, "k", "2", 102L));
driver.pipeInput(recordFactory.create(topic, "k", "3", 103L));
driver.pipeInput(recordFactory.create(topic, "k", "4", 104L));
driver.pipeInput(recordFactory.create(topic, "k", "5", 105L));
driver.pipeInput(recordFactory.create(topic, "k", "6", 6L));
LogCaptureAppender.unregister(appender);
final MetricName metricName = new MetricName(
"late-record-drop-total",
"stream-processor-node-metrics",
"The total number of occurrence of late-record-drop operations.",
mkMap(
mkEntry("client-id", "topology-test-driver-virtual-thread"),
mkEntry("task-id", "0_0"),
mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001")
)
);
assertThat(driver.metrics().get(metricName).metricValue(), equalTo(7.0));
assertLatenessMetrics(driver, is(7.0), is(194.0), is(97.375));
assertThat(appender.getMessages(), hasItems(
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10]"
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[100,110) expiration=[110]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[100,110) expiration=[110]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[100,110) expiration=[110]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[100,110) expiration=[110]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[100,110) expiration=[110]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[100,110) expiration=[110]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[110]"
));
OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@95/105]", "+100", 100);
OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@100/110]", "+100", 100);
OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@5/15]", "+5", 5);
OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@5/15]", "+5+6", 6);
OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@200/210]", "+100", 200);
assertThat(driver.readOutput("output"), nullValue());
}
}
private void assertLatenessMetrics(final TopologyTestDriver driver,
final Matcher<Object> dropTotal,
final Matcher<Object> maxLateness,
final Matcher<Object> avgLateness) {
final MetricName dropMetric = new MetricName(
"late-record-drop-total",
"stream-processor-node-metrics",
"The total number of occurrence of late-record-drop operations.",
mkMap(
mkEntry("client-id", "topology-test-driver-virtual-thread"),
mkEntry("task-id", "0_0"),
mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001")
)
);
assertThat(driver.metrics().get(dropMetric).metricValue(), dropTotal);
final MetricName dropRate = new MetricName(
"late-record-drop-rate",
"stream-processor-node-metrics",
"The average number of occurrence of late-record-drop operations.",
mkMap(
mkEntry("client-id", "topology-test-driver-virtual-thread"),
mkEntry("task-id", "0_0"),
mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001")
)
);
assertThat(driver.metrics().get(dropRate).metricValue(), not(0.0));
final MetricName latenessMaxMetric = new MetricName(
"record-lateness-max",
"stream-processor-node-metrics",
"The max observed lateness of records.",
mkMap(
mkEntry("client-id", "topology-test-driver-virtual-thread"),
mkEntry("task-id", "0_0")
)
);
assertThat(driver.metrics().get(latenessMaxMetric).metricValue(), maxLateness);
final MetricName latenessAvgMetric = new MetricName(
"record-lateness-avg",
"stream-processor-node-metrics",
"The average observed lateness of records.",
mkMap(
mkEntry("client-id", "topology-test-driver-virtual-thread"),
mkEntry("task-id", "0_0")
)
);
assertThat(driver.metrics().get(latenessAvgMetric).metricValue(), avgLateness);
}
private ProducerRecord<String, String> getOutput(final TopologyTestDriver driver) {
return driver.readOutput("output", new StringDeserializer(), new StringDeserializer());
}

View File

@ -17,7 +17,11 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
@ -65,7 +69,19 @@ public class PartitionGroupTest {
private final byte[] recordValue = intSerializer.serialize(null, 10);
private final byte[] recordKey = intSerializer.serialize(null, 1);
private final PartitionGroup group = new PartitionGroup(mkMap(mkEntry(partition1, queue1), mkEntry(partition2, queue2)));
private final Metrics metrics = new Metrics();
private final MetricName lastLatenessValue = new MetricName("record-lateness-last-value", "", "", mkMap());
private final PartitionGroup group = new PartitionGroup(
mkMap(mkEntry(partition1, queue1), mkEntry(partition2, queue2)),
getValueSensor(metrics, lastLatenessValue)
);
private static Sensor getValueSensor(final Metrics metrics, final MetricName metricName) {
final Sensor lastRecordedValue = metrics.sensor(metricName.name());
lastRecordedValue.add(metricName, new Value());
return lastRecordedValue;
}
@Test
public void testTimeTracking() {
@ -90,10 +106,9 @@ public class PartitionGroupTest {
// 2:[2, 4, 6]
// st: -1 since no records was being processed yet
assertEquals(6, group.numBuffered());
assertEquals(3, group.numBuffered(partition1));
assertEquals(3, group.numBuffered(partition2));
verifyBuffered(6, 3, 3);
assertEquals(-1L, group.timestamp());
assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
StampedRecord record;
final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
@ -104,11 +119,9 @@ public class PartitionGroupTest {
// 2:[2, 4, 6]
// st: 2
assertEquals(partition1, info.partition());
assertEquals(1L, record.timestamp);
assertEquals(5, group.numBuffered());
assertEquals(2, group.numBuffered(partition1));
assertEquals(3, group.numBuffered(partition2));
assertEquals(1L, group.timestamp());
verifyTimes(record, 1L, 1L);
verifyBuffered(5, 2, 3);
assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
// get one record, now the time should be advanced
record = group.nextRecord(info);
@ -116,11 +129,9 @@ public class PartitionGroupTest {
// 2:[4, 6]
// st: 3
assertEquals(partition2, info.partition());
assertEquals(2L, record.timestamp);
assertEquals(4, group.numBuffered());
assertEquals(2, group.numBuffered(partition1));
assertEquals(2, group.numBuffered(partition2));
assertEquals(2L, group.timestamp());
verifyTimes(record, 2L, 2L);
verifyBuffered(4, 2, 2);
assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
// add 2 more records with timestamp 2, 4 to partition-1
final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
@ -131,10 +142,9 @@ public class PartitionGroupTest {
// 1:[3, 5, 2, 4]
// 2:[4, 6]
// st: 3 (non-decreasing, so adding 2 doesn't change it)
assertEquals(6, group.numBuffered());
assertEquals(4, group.numBuffered(partition1));
assertEquals(2, group.numBuffered(partition2));
verifyBuffered(6, 4, 2);
assertEquals(2L, group.timestamp());
assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
// get one record, time should not be advanced
record = group.nextRecord(info);
@ -142,11 +152,9 @@ public class PartitionGroupTest {
// 2:[4, 6]
// st: 4 as partition st is now {5, 4}
assertEquals(partition1, info.partition());
assertEquals(3L, record.timestamp);
assertEquals(5, group.numBuffered());
assertEquals(3, group.numBuffered(partition1));
assertEquals(2, group.numBuffered(partition2));
assertEquals(3L, group.timestamp());
verifyTimes(record, 3L, 3L);
verifyBuffered(5, 3, 2);
assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
// get one record, time should not be advanced
record = group.nextRecord(info);
@ -154,11 +162,9 @@ public class PartitionGroupTest {
// 2:[6]
// st: 5 as partition st is now {5, 6}
assertEquals(partition2, info.partition());
assertEquals(4L, record.timestamp);
assertEquals(4, group.numBuffered());
assertEquals(3, group.numBuffered(partition1));
assertEquals(1, group.numBuffered(partition2));
assertEquals(4L, group.timestamp());
verifyTimes(record, 4L, 4L);
verifyBuffered(4, 3, 1);
assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
// get one more record, now time should be advanced
record = group.nextRecord(info);
@ -166,11 +172,9 @@ public class PartitionGroupTest {
// 2:[6]
// st: 5
assertEquals(partition1, info.partition());
assertEquals(5L, record.timestamp);
assertEquals(3, group.numBuffered());
assertEquals(2, group.numBuffered(partition1));
assertEquals(1, group.numBuffered(partition2));
assertEquals(5L, group.timestamp());
verifyTimes(record, 5L, 5L);
verifyBuffered(3, 2, 1);
assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
// get one more record, time should not be advanced
record = group.nextRecord(info);
@ -178,11 +182,9 @@ public class PartitionGroupTest {
// 2:[6]
// st: 5
assertEquals(partition1, info.partition());
assertEquals(2L, record.timestamp);
assertEquals(2, group.numBuffered());
assertEquals(1, group.numBuffered(partition1));
assertEquals(1, group.numBuffered(partition2));
assertEquals(5L, group.timestamp());
verifyTimes(record, 2L, 5L);
verifyBuffered(2, 1, 1);
assertEquals(3.0, metrics.metric(lastLatenessValue).metricValue());
// get one more record, time should not be advanced
record = group.nextRecord(info);
@ -190,11 +192,9 @@ public class PartitionGroupTest {
// 2:[6]
// st: 4 (doesn't advance because 1 is empty, so it's still reporting the last-known time of 4)
assertEquals(partition1, info.partition());
assertEquals(4L, record.timestamp);
assertEquals(1, group.numBuffered());
assertEquals(0, group.numBuffered(partition1));
assertEquals(1, group.numBuffered(partition2));
assertEquals(5L, group.timestamp());
verifyTimes(record, 4L, 5L);
verifyBuffered(1, 0, 1);
assertEquals(1.0, metrics.metric(lastLatenessValue).metricValue());
// get one more record, time should not be advanced
record = group.nextRecord(info);
@ -202,11 +202,20 @@ public class PartitionGroupTest {
// 2:[]
// st: 4 (1 and 2 are empty, so they are still reporting the last-known times of 4 and 6.)
assertEquals(partition2, info.partition());
assertEquals(6L, record.timestamp);
assertEquals(0, group.numBuffered());
assertEquals(0, group.numBuffered(partition1));
assertEquals(0, group.numBuffered(partition2));
assertEquals(6L, group.timestamp());
verifyTimes(record, 6L, 6L);
verifyBuffered(0, 0, 0);
assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
}
private void verifyTimes(final StampedRecord record, final long recordTime, final long streamTime) {
assertEquals(recordTime, record.timestamp);
assertEquals(streamTime, group.timestamp());
}
private void verifyBuffered(final int totalBuffered, final int partitionOneBuffered, final int partitionTwoBuffered) {
assertEquals(totalBuffered, group.numBuffered());
assertEquals(partitionOneBuffered, group.numBuffered(partition1));
assertEquals(partitionTwoBuffered, group.numBuffered(partition2));
}
}

View File

@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@ -50,6 +51,7 @@ public final class StreamsTestUtils {
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerdeClassName);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerdeClassName);
props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, DEBUG.name);
props.putAll(additional);
return props;
}

View File

@ -29,7 +29,9 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
@ -255,7 +257,13 @@ public class TopologyTestDriver implements Closeable {
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
stateDirectory = new StateDirectory(streamsConfig, mockWallClockTime);
metrics = new Metrics();
final MetricConfig metricConfig = new MetricConfig()
.samples(streamsConfig.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
.recordLevel(Sensor.RecordingLevel.forName(streamsConfig.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.timeWindow(streamsConfig.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS);
metrics = new Metrics(metricConfig, mockWallClockTime);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
metrics,
"topology-test-driver-virtual-thread"