MINOR: add upgrade guide for Kafka Streams API

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Michael G. Noll, Eno Thereska

Closes #2114 from mjsax/updateDocUpgradeSection
This commit is contained in:
Matthias J. Sax 2016-11-09 15:31:11 -08:00 committed by Guozhang Wang
parent da13b53787
commit 6972d9476f
2 changed files with 78 additions and 11 deletions

View File

@ -53,6 +53,43 @@ only support 0.10.1.x or later brokers while 0.10.1.x brokers also support older
<li> Due to the increased number of index files, on some brokers with large amount the log segments (e.g. >15K), the log loading process during the broker startup could be longer. Based on our experiment, setting the num.recovery.threads.per.data.dir to one may reduce the log loading time. </li>
</ul>
<h5><a id="upgrade_1010_streams" href="#upgrade_1010_streams">Streams API changes in 0.10.1.0</a></h5>
<ul>
<li> Stream grouping and aggregation split into two methods:
<ul>
<li> old: KStream #aggregateByKey(), #reduceByKey(), and #countByKey() </li>
<li> new: KStream#groupByKey() plus KGroupedStream #aggregate(), #reduce(), and #count() </li>
<li> Example: stream.countByKey() changes to stream.groupByKey().count() </li>
</ul>
</li>
<li> Auto Repartitioning:
<ul>
<li> a call to through() after a key-changing operator and before an aggregation/join is no longer required </li>
<li> Example: stream.selectKey(...).through(...).countByKey() changes to stream.selectKey().groupByKey().count() </li>
</ul>
</li>
<li> TopologyBuilder:
<ul>
<li> methods #sourceTopics(String applicationId) and #topicGroups(String applicationId) got simplified to #sourceTopics() and #topicGroups() </li>
</ul>
</li>
<li> DSL: new parameter to specify state store names:
<ul>
<li> The new Interactive Queries feature requires to specify a store name for all source KTables and window aggregation result KTables (previous parameter "operator/window name" is now the storeName) </li>
<li> KStreamBuilder#table(String topic) changes to #topic(String topic, String storeName) </li>
<li> KTable#through(String topic) changes to #through(String topic, String storeName) </li>
<li> KGroupedStream #aggregate(), #reduce(), and #count() require additional parameter "String storeName"</li>
<li> Example: stream.countByKey(TimeWindows.of("windowName", 1000)) changes to stream.groupByKey().count(TimeWindows.of(1000), "countStoreName") </li>
</ul>
</li>
<li> Windowing:
<ul>
<li> Windows are not named anymore: TimeWindows.of("name", 1000) changes to TimeWindows.of(1000) (cf. DSL: new parameter to specify state store names) </li>
<li> JoinWindows has no default size anymore: JoinWindows.of("name").within(1000) changes to JoinWindows.of(1000) </li>
</ul>
</li>
</ul>
<h5><a id="upgrade_1010_notable" href="#upgrade_1010_notable">Notable changes in 0.10.1.0</a></h5>
<ul>
<li> The new Java consumer is no longer in beta and we recommend it for all new development. The old Scala consumers are still supported, but they will be deprecated in the next release

View File

@ -18,23 +18,53 @@
package org.apache.kafka.streams.processor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.errors.StreamsException;
/**
* Retrieves built-in timestamps from Kafka messages (introduced in KIP-32: Add timestamps to Kafka message).
*
* Here, "built-in" refers to the fact that compatible Kafka producer clients automatically and
* transparently embed such timestamps into messages they sent to Kafka, which can then be retrieved
* Retrieves embedded metadata timestamps from Kafka messages.
* Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new
* 0.10+ Kafka message format.
* <p>
* Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically and
* transparently embed such timestamps into message metadata they send to Kafka, which can then be retrieved
* via this timestamp extractor.
*
* If <i>CreateTime</i> is used to define the built-in timestamps, using this extractor effectively provide
* <i>event-time</i> semantics. If <i>LogAppendTime</i> is used to define the built-in timestamps, using
* this extractor effectively provides <i>ingestion-time</i> semantics.
*
* <p>
* If the embedded metadata timestamp represents <i>CreateTime</i> (cf. Kafka broker setting
* {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}),
* this extractor effectively provides <i>event-time</i> semantics.
* If <i>LogAppendTime</i> is used as broker/topic setting to define the embedded metadata timestamps,
* using this extractor effectively provides <i>ingestion-time</i> semantics.
* <p>
* If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
* <p>
* If a record has a negative (invalid) timestamp value, this extractor raises an exception.
*
* @see RobustConsumerRecordTimestampExtractor
* @see InferringConsumerRecordTimestampExtractor
* @see WallclockTimestampExtractor
*/
public class ConsumerRecordTimestampExtractor implements TimestampExtractor {
/**
* Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}.
*
* @param record a data record
* @param currentStreamsTime the current value of the internally tracked Streams time (could be -1 if unknown)
* @return the embedded metadata timestamp of the given {@link ConsumerRecord}
* @throws StreamsException if the embedded metadata timestamp is negative
*/
@Override
public long extract(ConsumerRecord<Object, Object> record) {
return record.timestamp();
public long extract(final ConsumerRecord<Object, Object> record, final long currentStreamsTime) {
final long timestamp = record.timestamp();
if (timestamp < 0) {
throw new StreamsException("Input record " + record + " has invalid (negative) timestamp. " +
"Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, " +
"or because the input topic was created before upgrading the Kafka cluster to 0.10+. " +
"Use a different TimestampExtractor to process this data.");
}
return timestamp;
}
}