diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 304ce8bb709..b84ea0f1a9f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -17,14 +17,12 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.processor.ConnectedStoreProvider; -import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.api.FixedKeyProcessor; import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; @@ -653,185 +651,120 @@ public interface KStream { /** * Materialize this stream to an auto-generated repartition topic and create a new {@code KStream} - * from the auto-generated topic using default serializers, deserializers, and producer's default partitioning strategy. - * The number of partitions is determined based on the upstream topics partition numbers. - *

- * The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance. - * Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams. - * The topic will be named as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, - * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. + * from the auto-generated topic. * - * @return {@code KStream} that contains the exact same repartitioned records as this {@code KStream}. + *

The created topic is considered an internal topic and is meant to be used only by the current + * Kafka Streams instance. + * The topic will be named as "${applicationId}-<name>-repartition", + * where "applicationId" is user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. + * The number of partitions for the repartition topic is determined based on the upstream topics partition numbers. + * Furthermore, the topic will be created with infinite retention time and data will be automatically purged + * by Kafka Streams. + * + *

You can retrieve all generated internal topic names via {@link Topology#describe()}. + * To explicitly set key/value serdes, specify the number of used partitions or the partitioning strategy, + * or to customize the name of the repartition topic, use {@link #repartition(Repartitioned)}. + * + * @return A {@code KStream} that contains the exact same, but repartitioned records as this {@code KStream}. */ KStream repartition(); /** - * Materialize this stream to an auto-generated repartition topic and create a new {@code KStream} - * from the auto-generated topic using {@link Serde key serde}, {@link Serde value serde}, {@link StreamPartitioner}, - * number of partitions, and topic name part as defined by {@link Repartitioned}. - *

- * The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance. - * Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams. - * The topic will be named as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, - * "<name>" is either provided via {@link Repartitioned#as(String)} or an internally - * generated name, and "-repartition" is a fixed suffix. - * - * @param repartitioned the {@link Repartitioned} instance used to specify {@link Serdes}, - * {@link StreamPartitioner} which determines how records are distributed among partitions of the topic, - * part of the topic name, and number of partitions for a repartition topic. - * @return a {@code KStream} that contains the exact same repartitioned records as this {@code KStream}. + * See {@link #repartition()}. */ KStream repartition(final Repartitioned repartitioned); /** - * Materialize this stream to a topic using default serializers specified in the config and producer's - * default partitioning strategy. - * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is + * Materialize this stream to a topic. + * The topic should be manually created before it is used (i.e., before the Kafka Streams application is * started). * - * @param topic the topic name + *

To explicitly set key/value serdes or the partitioning strategy, use {@link #to(String, Produced)}. + * + * @param topic + * the output topic name + * + * @see #to(TopicNameExtractor) */ void to(final String topic); /** - * Materialize this stream to a topic using the provided {@link Produced} instance. - * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is - * started). - * - * @param topic the topic name - * @param produced the options to use when producing to the topic + * See {@link #to(String).} */ void to(final String topic, final Produced produced); /** - * Dynamically materialize this stream to topics using default serializers specified in the config and producer's - * default partitioning strategy. - * The topic names for each record to send to is dynamically determined based on the {@link TopicNameExtractor}. + * Materialize the record of this stream to different topics. + * The provided {@link TopicNameExtractor} is applied to each input record to compute the output topic name. + * All topics should be manually created before they are used (i.e., before the Kafka Streams application is started). * - * @param topicExtractor the extractor to determine the name of the Kafka topic to write to for each record + *

To explicitly set key/value serdes or the partitioning strategy, use {@link #to(TopicNameExtractor, Produced)}. + * + * @param topicExtractor + * the extractor to determine the name of the Kafka topic to write to for each record + * + * @see #to(String) */ void to(final TopicNameExtractor topicExtractor); /** - * Dynamically materialize this stream to topics using the provided {@link Produced} instance. - * The topic names for each record to send to is dynamically determined based on the {@link TopicNameExtractor}. - * - * @param topicExtractor the extractor to determine the name of the Kafka topic to write to for each record - * @param produced the options to use when producing to the topic + * See {@link #to(TopicNameExtractor)}. */ void to(final TopicNameExtractor topicExtractor, final Produced produced); /** * Convert this stream to a {@link KTable}. - *

- * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, - * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or - * {@link #process(ProcessorSupplier, String...)}) an internal repartitioning topic will be created in Kafka. - * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, - * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - *

- * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned - * correctly on its key. - * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because - * repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of - * a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance. - *

- * Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of - * it was a "fact/event" and is re-interpreted as update now (cf. {@link KStream} vs {@code KTable}). + * The conversion is a logical operation and only changes the "interpretation" of the records, i.e., each record of + * this stream is a "fact/event" and is re-interpreted as a "change/update-per-key" now + * (cf. {@link KStream} vs {@link KTable}). The resulting {@link KTable} is essentially a changelog stream. + * To "upsert" the records of this stream into a materialized {@link KTable} (i.e., into a state store), + * use {@link #toTable(Materialized)}. * - * @return a {@link KTable} that contains the same records as this {@code KStream} + *

Note that {@code null} keys are not supported by {@code KTables} and records with {@code null} key will be dropped. + * + *

If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, + * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or {@link #process(ProcessorSupplier, String...)}) + * Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in + * Kafka and write and re-read the data via this topic such that the resulting {@link KTable} is correctly + * partitioned by its key. + * + *

This internal repartitioning topic will be named "${applicationId}-<name>-repartition", + * where "applicationId" is user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. + * The number of partitions for the repartition topic is determined based on the upstream topics partition numbers. + * Furthermore, the topic will be created with infinite retention time and data will be automatically purged + * by Kafka Streams. + * + *

Note: If the result {@link KTable} is materialized, it is not possible to apply + * {@link StreamsConfig#REUSE_KTABLE_SOURCE_TOPICS "source topic optimization"}, because + * repartition topics are considered transient and don't allow to recover the result {@link KTable} in case of + * a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance. + * + *

You can retrieve all generated internal topic names via {@link Topology#describe()}. + * To customize the name of the repartition topic, use {@link #toTable(Named)}. + * For more control over the repartitioning, use {@link #repartition(Repartitioned)} before {@code toTable()}. + * + * @return A {@link KTable} that contains the same records as this {@code KStream}. */ KTable toTable(); /** - * Convert this stream to a {@link KTable}. - *

- * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, - * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or - * {@link #process(ProcessorSupplier, String...)}}) an internal repartitioning topic will be created in Kafka. - * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, - * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - *

- * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned - * correctly on its key. - * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because - * repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of - * a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance. - *

- * Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of - * it was a "fact/event" and is re-interpreted as update now (cf. {@link KStream} vs {@code KTable}). - * - * @param named a {@link Named} config used to name the processor in the topology - * @return a {@link KTable} that contains the same records as this {@code KStream} + * See {@link #toTable()}. */ KTable toTable(final Named named); - /** - * Convert this stream to a {@link KTable}. - *

- * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, - * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or - * {@link #process(ProcessorSupplier, String...)}}) an internal repartitioning topic will be created in Kafka. - * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, - * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - *

- * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned - * correctly on its key. - * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because - * repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of - * a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance. - *

- * Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of - * it was a "fact/event" and is re-interpreted as update now (cf. {@link KStream} vs {@code KTable}). - * - * @param materialized an instance of {@link Materialized} used to describe how the state store of the - * resulting table should be materialized. - * @return a {@link KTable} that contains the same records as this {@code KStream} + /** + * See {@link #toTable()}. */ KTable toTable(final Materialized> materialized); /** - * Convert this stream to a {@link KTable}. - *

- * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, - * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or - * {@link #process(ProcessorSupplier, String...)}}) an internal repartitioning topic will be created in Kafka. - * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, - * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - *

- * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned - * correctly on its key. - * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because - * repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of - * a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance. - *

- * Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of - * it was a "fact/event" and is re-interpreted as update now (cf. {@link KStream} vs {@code KTable}). - * - * @param named a {@link Named} config used to name the processor in the topology - * @param materialized an instance of {@link Materialized} used to describe how the state store of the - * resulting table should be materialized. - * @return a {@link KTable} that contains the same records as this {@code KStream} + * See {@link #toTable()}. */ KTable toTable(final Named named, final Materialized> materialized);