KAFKA-4393: Improve invalid/negative TS handling

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Michael G. Noll, Eno Thereska, Damian Guy, Guozhang Wang

Closes #2117 from mjsax/kafka-4393-improveInvalidTsHandling
This commit is contained in:
Matthias J. Sax 2016-12-09 16:17:36 -08:00 committed by Guozhang Wang
parent 7f8edbc8e8
commit 9bed8fbcfc
24 changed files with 617 additions and 76 deletions

View File

@ -27,6 +27,7 @@
<allow pkg="javax.management" /> <allow pkg="javax.management" />
<allow pkg="org.slf4j" /> <allow pkg="org.slf4j" />
<allow pkg="org.junit" /> <allow pkg="org.junit" />
<allow pkg="org.hamcrest" />
<allow pkg="org.easymock" /> <allow pkg="org.easymock" />
<allow pkg="org.powermock" /> <allow pkg="org.powermock" />
<allow pkg="java.security" /> <allow pkg="java.security" />
@ -151,7 +152,6 @@
<allow pkg="scala" /> <allow pkg="scala" />
<allow pkg="scala.collection" /> <allow pkg="scala.collection" />
<allow pkg="org.I0Itec.zkclient" /> <allow pkg="org.I0Itec.zkclient" />
<allow pkg="org.hamcrest" />
</subpackage> </subpackage>
<subpackage name="state"> <subpackage name="state">

View File

@ -26,6 +26,19 @@ can upgrade the brokers one at a time: shut down the broker, update the code, an
Please refer to the <code>KafkaConsumer</code> Javadoc for a more in-depth explanation of this change.</li> Please refer to the <code>KafkaConsumer</code> Javadoc for a more in-depth explanation of this change.</li>
</ul> </ul>
<h4><a id="upgrade_10_2" href="#upgrade_10_2">Upgrading from 0.8.x, 0.9.x, 0.10.0.X, or 0.10.1.X to 0.10.2.0</a></h4>
<p><b>For a rolling upgrade:</b></p>
<ol>
<li>Upgrading a Kafka Streams Applications:
<ul>
<li>You need to recompile your code. Just swapping the jar file will not work and will break your appliation.</li>
<li>If you use a custom timestamp extractor, you will need to update this code, because the <code>TimestampExtractor</code> interface got changed.</li>
</ul>
</li>
</ol>
<h4><a id="upgrade_10_1" href="#upgrade_10_1">Upgrading from 0.8.x, 0.9.x or 0.10.0.X to 0.10.1.0</a></h4> <h4><a id="upgrade_10_1" href="#upgrade_10_1">Upgrading from 0.8.x, 0.9.x or 0.10.0.X to 0.10.1.0</a></h4>
0.10.1.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade. 0.10.1.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade.
However, please notice the <a href="#upgrade_10_1_breaking">Potential breaking changes in 0.10.1.0</a> before upgrade. However, please notice the <a href="#upgrade_10_1_breaking">Potential breaking changes in 0.10.1.0</a> before upgrade.

View File

@ -28,7 +28,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
public class JsonTimestampExtractor implements TimestampExtractor { public class JsonTimestampExtractor implements TimestampExtractor {
@Override @Override
public long extract(ConsumerRecord<Object, Object> record) { public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
if (record.value() instanceof PageViewTypedDemo.PageView) { if (record.value() instanceof PageViewTypedDemo.PageView) {
return ((PageViewTypedDemo.PageView) record.value()).timestamp; return ((PageViewTypedDemo.PageView) record.value()).timestamp;
} }

View File

@ -28,7 +28,7 @@ import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.DefaultPartitionGrouper; import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor; import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.StreamThread;
@ -172,7 +172,7 @@ public class StreamsConfig extends AbstractConfig {
REPLICATION_FACTOR_DOC) REPLICATION_FACTOR_DOC)
.define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG, .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
Type.CLASS, Type.CLASS,
ConsumerRecordTimestampExtractor.class.getName(), FailOnInvalidTimestamp.class.getName(),
Importance.MEDIUM, Importance.MEDIUM,
TIMESTAMP_EXTRACTOR_CLASS_DOC) TIMESTAMP_EXTRACTOR_CLASS_DOC)
.define(PARTITION_GROUPER_CLASS_CONFIG, .define(PARTITION_GROUPER_CLASS_CONFIG,

View File

@ -1,40 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
/**
* Retrieves built-in timestamps from Kafka messages (introduced in KIP-32: Add timestamps to Kafka message).
*
* Here, "built-in" refers to the fact that compatible Kafka producer clients automatically and
* transparently embed such timestamps into messages they sent to Kafka, which can then be retrieved
* via this timestamp extractor.
*
* If <i>CreateTime</i> is used to define the built-in timestamps, using this extractor effectively provide
* <i>event-time</i> semantics. If <i>LogAppendTime</i> is used to define the built-in timestamps, using
* this extractor effectively provides <i>ingestion-time</i> semantics.
*
* If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
*/
public class ConsumerRecordTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record) {
return record.timestamp();
}
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
/**
* Retrieves embedded metadata timestamps from Kafka messages.
* If a record has a negative (invalid) timestamp value, an error handler method is called.
* <p>
* Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new
* 0.10+ Kafka message format.
* <p>
* Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically and
* transparently embed such timestamps into message metadata they send to Kafka, which can then be retrieved
* via this timestamp extractor.
* <p>
* If the embedded metadata timestamp represents <i>CreateTime</i> (cf. Kafka broker setting
* {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}),
* this extractor effectively provides <i>event-time</i> semantics.
* If <i>LogAppendTime</i> is used as broker/topic setting to define the embedded metadata timestamps,
* using this extractor effectively provides <i>ingestion-time</i> semantics.
* <p>
* If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
*
* @see FailOnInvalidTimestamp
* @see LogAndSkipOnInvalidTimestamp
* @see UsePreviousTimeOnInvalidTimestamp
* @see WallclockTimestampExtractor
*/
abstract class ExtractRecordMetadataTimestamp implements TimestampExtractor {
/**
* Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}.
*
* @param record a data record
* @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @return the embedded metadata timestamp of the given {@link ConsumerRecord}
*/
@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
final long timestamp = record.timestamp();
if (timestamp < 0) {
return onInvalidTimestamp(record, timestamp, previousTimestamp);
}
return timestamp;
}
/**
* Called if no valid timestamp is embedded in the record meta data.
*
* @param record a data record
* @param recordTimestamp the timestamp extractor from the record
* @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @return a new timestamp for the record (if negative, record will not be processed but dropped silently)
*/
public abstract long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
final long recordTimestamp,
final long previousTimestamp);
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.errors.StreamsException;
/**
* Retrieves embedded metadata timestamps from Kafka messages.
* If a record has a negative (invalid) timestamp value, this extractor raises an exception.
* <p>
* Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new
* 0.10+ Kafka message format.
* <p>
* Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically and
* transparently embed such timestamps into message metadata they send to Kafka, which can then be retrieved
* via this timestamp extractor.
* <p>
* If the embedded metadata timestamp represents <i>CreateTime</i> (cf. Kafka broker setting
* {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}),
* this extractor effectively provides <i>event-time</i> semantics.
* If <i>LogAppendTime</i> is used as broker/topic setting to define the embedded metadata timestamps,
* using this extractor effectively provides <i>ingestion-time</i> semantics.
* <p>
* If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
*
* @see LogAndSkipOnInvalidTimestamp
* @see UsePreviousTimeOnInvalidTimestamp
* @see WallclockTimestampExtractor
*/
public class FailOnInvalidTimestamp extends ExtractRecordMetadataTimestamp {
/**
* Raises an exception on every call.
*
* @param record a data record
* @param recordTimestamp the timestamp extractor from the record
* @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @return nothing; always raises an exception
* @throws StreamsException on every invocation
*/
@Override
public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
final long recordTimestamp,
final long previousTimestamp)
throws StreamsException {
throw new StreamsException("Input record " + record + " has invalid (negative) timestamp. " +
"Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, " +
"or because the input topic was created before upgrading the Kafka cluster to 0.10+. " +
"Use a different TimestampExtractor to process this data.");
}
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Retrieves embedded metadata timestamps from Kafka messages.
* If a record has a negative (invalid) timestamp value the timestamp is returned as-is;
* in addition, a WARN message is logged in your application.
* Returning the timestamp as-is results in dropping the record, i.e., the record will not be processed.
* <p>
* Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new
* 0.10+ Kafka message format.
* <p>
* Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically and
* transparently embed such timestamps into message metadata they send to Kafka, which can then be retrieved
* via this timestamp extractor.
* <p>
* If the embedded metadata timestamp represents <i>CreateTime</i> (cf. Kafka broker setting
* {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}),
* this extractor effectively provides <i>event-time</i> semantics.
* If <i>LogAppendTime</i> is used as broker/topic setting to define the embedded metadata timestamps,
* using this extractor effectively provides <i>ingestion-time</i> semantics.
* <p>
* If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
*
* @see FailOnInvalidTimestamp
* @see UsePreviousTimeOnInvalidTimestamp
* @see WallclockTimestampExtractor
*/
public class LogAndSkipOnInvalidTimestamp extends ExtractRecordMetadataTimestamp {
private static final Logger log = LoggerFactory.getLogger(LogAndSkipOnInvalidTimestamp.class);
/**
* Writes a log WARN message when the extracted timestamp is invalid (negative) but returns the invalid timestamp as-is,
* which ultimately causes the record to be skipped and not to be processed.
*
* @param record a data record
* @param recordTimestamp the timestamp extractor from the record
* @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @return the originally extracted timestamp of the record
*/
@Override
public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
final long recordTimestamp,
final long previousTimestamp) {
log.warn("Input record {} will be dropped because it has an invalid (negative) timestamp.", record);
return recordTimestamp;
}
}

View File

@ -27,17 +27,19 @@ import org.apache.kafka.streams.kstream.KTable;
public interface TimestampExtractor { public interface TimestampExtractor {
/** /**
* Extracts a timestamp from a record. * Extracts a timestamp from a record. The timestamp must be positive to be considered a valid timestamp.
* Returning a negative timestamp will cause the record not to be processed but rather silently skipped.
* <p> * <p>
* The extracted timestamp MUST represent the milliseconds since midnight, January 1, 1970 UTC. * The extracted timestamp MUST represent the milliseconds since midnight, January 1, 1970 UTC.
* * <p>
* It is important to note that this timestamp may become the message timestamp for any messages sent to changelogs updated by {@link KTable}s * It is important to note that this timestamp may become the message timestamp for any messages sent to changelogs updated by {@link KTable}s
* and joins. The message timestamp is used for log retention and log rolling, so using nonsensical values may result in * and joins. The message timestamp is used for log retention and log rolling, so using nonsensical values may result in
* excessive log rolling and therefore broker performance degradation. * excessive log rolling and therefore broker performance degradation.
* *
* *
* @param record a data record * @param record a data record
* @return the timestamp of the record * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @return the timestamp of the record
*/ */
long extract(ConsumerRecord<Object, Object> record); long extract(ConsumerRecord<Object, Object> record, long previousTimestamp);
} }

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.errors.StreamsException;
/**
* Retrieves embedded metadata timestamps from Kafka messages.
* If a record has a negative (invalid) timestamp, a new timestamp will be inferred from the current stream-time.
* <p></p>
* Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new
* 0.10+ Kafka message format.
* <p>
* Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically and
* transparently embed such timestamps into message metadata they send to Kafka, which can then be retrieved
* via this timestamp extractor.
* <p>
* If the embedded metadata timestamp represents <i>CreateTime</i> (cf. Kafka broker setting
* {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}),
* this extractor effectively provides <i>event-time</i> semantics.
* If <i>LogAppendTime</i> is used as broker/topic setting to define the embedded metadata timestamps,
* using this extractor effectively provides <i>ingestion-time</i> semantics.
* <p>
* If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
*
* @see FailOnInvalidTimestamp
* @see LogAndSkipOnInvalidTimestamp
* @see WallclockTimestampExtractor
*/
public class UsePreviousTimeOnInvalidTimestamp extends ExtractRecordMetadataTimestamp {
/**
* Returns the current stream-time as new timestamp for the record.
*
* @param record a data record
* @param recordTimestamp the timestamp extractor from the record
* @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @return the provided latest extracted valid timestamp as new timestamp for the record
* @throws StreamsException if latest extracted valid timestamp is unknown
*/
@Override
public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
final long recordTimestamp,
final long previousTimestamp)
throws StreamsException {
if (previousTimestamp < 0) {
throw new StreamsException("Could not infer new timestamp for input record " + record
+ " because latest extracted valid timestamp is unknown.");
}
return previousTimestamp;
}
}

View File

@ -21,22 +21,26 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
/** /**
* Retrieves current wall clock timestamps as {@link System#currentTimeMillis()}. * Retrieves current wall clock timestamps as {@link System#currentTimeMillis()}.
* * <p>
* Using this extractor effectively provides <i>processing-time</i> semantics. * Using this extractor effectively provides <i>processing-time</i> semantics.
* * <p>
* If you need <i>event-time</i> semantics, use {@link ConsumerRecordTimestampExtractor} with * If you need <i>event-time</i> semantics, use {@link FailOnInvalidTimestamp} with
* built-in <i>CreateTime</i> or <i>LogAppendTime</i> timestamp (see KIP-32: Add timestamps to Kafka message for details). * built-in <i>CreateTime</i> or <i>LogAppendTime</i> timestamp (see KIP-32: Add timestamps to Kafka message for details).
*
* @see FailOnInvalidTimestamp
* @see LogAndSkipOnInvalidTimestamp
*/ */
public class WallclockTimestampExtractor implements TimestampExtractor { public class WallclockTimestampExtractor implements TimestampExtractor {
/** /**
* Return the current wall clock time as timestamp. * Return the current wall clock time as timestamp.
* *
* @param record a data record * @param record a data record
* @return the current wall clock time, expressed in milliseconds since midnight, January 1, 1970 UTC * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @return the current wall clock time, expressed in milliseconds since midnight, January 1, 1970 UTC
*/ */
@Override @Override
public long extract(ConsumerRecord<Object, Object> record) { public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
return System.currentTimeMillis(); return System.currentTimeMillis();
} }
} }

View File

@ -102,13 +102,14 @@ public class RecordQueue {
rawRecord.checksum(), rawRecord.checksum(),
rawRecord.serializedKeySize(), rawRecord.serializedKeySize(),
rawRecord.serializedValueSize(), key, value); rawRecord.serializedValueSize(), key, value);
long timestamp = timestampExtractor.extract(record); long timestamp = timestampExtractor.extract(record, timeTracker.get());
log.trace("Source node {} extracted timestamp {} for record {} when adding to buffered queue", source.name(), timestamp, record); log.trace("Source node {} extracted timestamp {} for record {} when adding to buffered queue", source.name(), timestamp, record);
// validate that timestamp must be non-negative // drop message if TS is invalid, i.e., negative
if (timestamp < 0) if (timestamp < 0) {
throw new StreamsException("Extracted timestamp value is negative, which is not allowed."); continue;
}
StampedRecord stampedRecord = new StampedRecord(record, timestamp); StampedRecord stampedRecord = new StampedRecord(record, timestamp);

View File

@ -73,10 +73,7 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
final long timestamp = context.timestamp(); final long timestamp = context.timestamp();
if (timestamp < 0) { if (timestamp < 0) {
throw new StreamsException("A record consumed from an input topic has invalid (negative) timestamp, " + throw new StreamsException("Invalid (negative) timestamp of " + timestamp + " for output record <" + key + ":" + value + ">.");
"possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, " +
"or because the input topic was created before upgrading the Kafka cluster to 0.10+. " +
"Use a different TimestampExtractor to process this data.");
} }
try { try {

View File

@ -123,22 +123,27 @@ public class StreamTask extends AbstractTask implements Punctuator {
} }
/** /**
* Adds records to queues * Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the record is skipped
* and not added to the queue for processing
* *
* @param partition the partition * @param partition the partition
* @param records the records * @param records the records
* @returns the number of added records
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records) { public int addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records) {
int queueSize = partitionGroup.addRawRecords(partition, records); final int oldQueueSize = partitionGroup.numBuffered();
final int newQueueSize = partitionGroup.addRawRecords(partition, records);
log.trace("{} Added records into the buffered queue of partition {}, new queue size is {}", logPrefix, partition, queueSize); log.trace("{} Added records into the buffered queue of partition {}, new queue size is {}", logPrefix, partition, newQueueSize);
// if after adding these records, its partition queue's buffered size has been // if after adding these records, its partition queue's buffered size has been
// increased beyond the threshold, we can then pause the consumption for this partition // increased beyond the threshold, we can then pause the consumption for this partition
if (queueSize > this.maxBufferedSize) { if (newQueueSize > this.maxBufferedSize) {
consumer.pause(singleton(partition)); consumer.pause(singleton(partition));
} }
return newQueueSize - oldQueueSize;
} }
/** /**

View File

@ -549,10 +549,12 @@ public class StreamThread extends Thread {
throw new StreamsException(logPrefix + " Failed to rebalance", rebalanceException); throw new StreamsException(logPrefix + " Failed to rebalance", rebalanceException);
if (!records.isEmpty()) { if (!records.isEmpty()) {
int numAddedRecords = 0;
for (TopicPartition partition : records.partitions()) { for (TopicPartition partition : records.partitions()) {
StreamTask task = activeTasksByPartition.get(partition); StreamTask task = activeTasksByPartition.get(partition);
task.addRecords(partition, records.records(partition)); numAddedRecords += task.addRecords(partition, records.records(partition));
} }
sensors.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
polledRecords = true; polledRecords = true;
} else { } else {
polledRecords = false; polledRecords = false;
@ -1020,6 +1022,7 @@ public class StreamThread extends Thread {
final Sensor punctuateTimeSensor; final Sensor punctuateTimeSensor;
final Sensor taskCreationSensor; final Sensor taskCreationSensor;
final Sensor taskDestructionSensor; final Sensor taskDestructionSensor;
final Sensor skippedRecordsSensor;
public StreamsMetricsImpl(Metrics metrics) { public StreamsMetricsImpl(Metrics metrics) {
this.metrics = metrics; this.metrics = metrics;
@ -1052,6 +1055,9 @@ public class StreamThread extends Thread {
this.taskDestructionSensor = metrics.sensor(sensorNamePrefix + ".task-destruction"); this.taskDestructionSensor = metrics.sensor(sensorNamePrefix + ".task-destruction");
this.taskDestructionSensor.add(metrics.metricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count())); this.taskDestructionSensor.add(metrics.metricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count()));
this.skippedRecordsSensor = metrics.sensor(sensorNamePrefix + ".skipped-records");
this.skippedRecordsSensor.add(metrics.metricName("skipped-records-count", metricGrpName, "The average per-second number of skipped records.", metricTags), new Rate(new Count()));
} }
@Override @Override

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.errors.StreamsException;
import org.junit.Test;
public class FailOnInvalidTimestampTest extends TimestampExtractorTest {
@Test
public void extractMetadataTimestamp() {
testExtractMetadataTimestamp(new FailOnInvalidTimestamp());
}
@Test(expected = StreamsException.class)
public void failOnInvalidTimestamp() {
final TimestampExtractor extractor = new FailOnInvalidTimestamp();
extractor.extract(new ConsumerRecord<>("anyTopic", 0, 0, null, null), 42);
}
}

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.record.TimestampType;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
public class LogAndSkipOnInvalidTimestampTest extends TimestampExtractorTest {
@Test
public void extractMetadataTimestamp() {
testExtractMetadataTimestamp(new LogAndSkipOnInvalidTimestamp());
}
@Test
public void logAndSkipOnInvalidTimestamp() {
final long invalidMetadataTimestamp = -42;
final TimestampExtractor extractor = new LogAndSkipOnInvalidTimestamp();
final long timestamp = extractor.extract(
new ConsumerRecord<>(
"anyTopic",
0,
0,
invalidMetadataTimestamp,
TimestampType.NO_TIMESTAMP_TYPE,
0,
0,
0,
null,
null),
0
);
assertThat(timestamp, is(invalidMetadataTimestamp));
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.record.TimestampType;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
class TimestampExtractorTest {
void testExtractMetadataTimestamp(TimestampExtractor extractor) {
final long metadataTimestamp = 42;
final long timestamp = extractor.extract(
new ConsumerRecord<>(
"anyTopic",
0,
0,
metadataTimestamp,
TimestampType.NO_TIMESTAMP_TYPE,
0,
0,
0,
null,
null),
0
);
assertThat(timestamp, is(metadataTimestamp));
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
public class UsePreviousTimeOnInvalidTimestampTest extends TimestampExtractorTest {
@Test
public void extractMetadataTimestamp() {
testExtractMetadataTimestamp(new UsePreviousTimeOnInvalidTimestamp());
}
@Test
public void usePreviousTimeOnInvalidTimestamp() {
final long previousTime = 42;
final TimestampExtractor extractor = new UsePreviousTimeOnInvalidTimestamp();
final long timestamp = extractor.extract(
new ConsumerRecord<>("anyTopic", 0, 0, null, null),
previousTime
);
assertThat(timestamp, is(previousTime));
}
}

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
public class WallclockTimestampExtractorTest {
@Test
public void extractSystemTimestamp() {
final TimestampExtractor extractor = new WallclockTimestampExtractor();
final long before = System.currentTimeMillis();
final long timestamp = extractor.extract(new ConsumerRecord<>("anyTopic", 0, 0, null, null), 42);
final long after = System.currentTimeMillis();
assertThat(timestamp, is(new InBetween(before, after)));
}
private static class InBetween extends BaseMatcher<Long> {
private final long before;
private final long after;
public InBetween(long before, long after) {
this.before = before;
this.after = after;
}
@Override
public boolean matches(Object item) {
final long timestamp = (Long) item;
return before <= timestamp && timestamp <= after;
}
@Override
public void describeMismatch(Object item, Description mismatchDescription) {}
@Override
public void describeTo(Description description) {}
}
}

View File

@ -17,10 +17,6 @@
package org.apache.kafka.streams.processor.internals; package org.apache.kafka.streams.processor.internals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
@ -49,6 +45,10 @@ import org.junit.Test;
import java.io.File; import java.io.File;
import java.util.Properties; import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
public class ProcessorTopologyTest { public class ProcessorTopologyTest {
private static final Serializer<String> STRING_SERIALIZER = new StringSerializer(); private static final Serializer<String> STRING_SERIALIZER = new StringSerializer();
@ -404,7 +404,7 @@ public class ProcessorTopologyTest {
public static class CustomTimestampExtractor implements TimestampExtractor { public static class CustomTimestampExtractor implements TimestampExtractor {
@Override @Override
public long extract(ConsumerRecord<Object, Object> record) { public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
return timestamp; return timestamp;
} }
} }

View File

@ -29,6 +29,8 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.test.MockSourceNode; import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.MockTimestampExtractor;
@ -136,4 +138,24 @@ public class RecordQueueTest {
queue.addRawRecords(records, timestampExtractor); queue.addRawRecords(records, timestampExtractor);
} }
@Test(expected = StreamsException.class)
public void shouldThrowOnNegativeTimestamp() {
final byte[] value = Serdes.Long().serializer().serialize("foo", 1L);
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
queue.addRawRecords(records, new FailOnInvalidTimestamp());
}
@Test
public void shouldDropOnNegativeTimestamp() {
final byte[] value = Serdes.Long().serializer().serialize("foo", 1L);
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
queue.addRawRecords(records, new LogAndSkipOnInvalidTimestamp());
assertEquals(0, queue.size());
}
} }

View File

@ -25,7 +25,7 @@ public class TestTimestampExtractor implements TimestampExtractor {
private final long base = SmokeTestUtil.START_TIME; private final long base = SmokeTestUtil.START_TIME;
@Override @Override
public long extract(ConsumerRecord<Object, Object> record) { public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
switch (record.topic()) { switch (record.topic()) {
case "data": case "data":
return base + (Integer) record.value(); return base + (Integer) record.value();

View File

@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
public class MockTimestampExtractor implements TimestampExtractor { public class MockTimestampExtractor implements TimestampExtractor {
@Override @Override
public long extract(ConsumerRecord<Object, Object> record) { public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
return record.offset(); return record.offset();
} }
} }