MINOR: cleanup KStream JavaDocs (7/N) - repartition/to/toTable (#18760)

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-02-06 10:57:54 -08:00 committed by GitHub
parent c1a813b740
commit bdab927a7d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 71 additions and 138 deletions

View File

@ -17,14 +17,12 @@
package org.apache.kafka.streams.kstream; package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.ConnectedStoreProvider; import org.apache.kafka.streams.processor.ConnectedStoreProvider;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessor; import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
@ -653,185 +651,120 @@ public interface KStream<K, V> {
/** /**
* Materialize this stream to an auto-generated repartition topic and create a new {@code KStream} * Materialize this stream to an auto-generated repartition topic and create a new {@code KStream}
* from the auto-generated topic using default serializers, deserializers, and producer's default partitioning strategy. * from the auto-generated topic.
* The number of partitions is determined based on the upstream topics partition numbers.
* <p>
* The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance.
* Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams.
* The topic will be named as "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
* *
* @return {@code KStream} that contains the exact same repartitioned records as this {@code KStream}. * <p>The created topic is considered an internal topic and is meant to be used only by the current
* Kafka Streams instance.
* The topic will be named as "${applicationId}-&lt;name&gt;-repartition",
* where "applicationId" is user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
* The number of partitions for the repartition topic is determined based on the upstream topics partition numbers.
* Furthermore, the topic will be created with infinite retention time and data will be automatically purged
* by Kafka Streams.
*
* <p>You can retrieve all generated internal topic names via {@link Topology#describe()}.
* To explicitly set key/value serdes, specify the number of used partitions or the partitioning strategy,
* or to customize the name of the repartition topic, use {@link #repartition(Repartitioned)}.
*
* @return A {@code KStream} that contains the exact same, but repartitioned records as this {@code KStream}.
*/ */
KStream<K, V> repartition(); KStream<K, V> repartition();
/** /**
* Materialize this stream to an auto-generated repartition topic and create a new {@code KStream} * See {@link #repartition()}.
* from the auto-generated topic using {@link Serde key serde}, {@link Serde value serde}, {@link StreamPartitioner},
* number of partitions, and topic name part as defined by {@link Repartitioned}.
* <p>
* The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance.
* Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams.
* The topic will be named as "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is either provided via {@link Repartitioned#as(String)} or an internally
* generated name, and "-repartition" is a fixed suffix.
*
* @param repartitioned the {@link Repartitioned} instance used to specify {@link Serdes},
* {@link StreamPartitioner} which determines how records are distributed among partitions of the topic,
* part of the topic name, and number of partitions for a repartition topic.
* @return a {@code KStream} that contains the exact same repartitioned records as this {@code KStream}.
*/ */
KStream<K, V> repartition(final Repartitioned<K, V> repartitioned); KStream<K, V> repartition(final Repartitioned<K, V> repartitioned);
/** /**
* Materialize this stream to a topic using default serializers specified in the config and producer's * Materialize this stream to a topic.
* default partitioning strategy. * The topic should be manually created before it is used (i.e., before the Kafka Streams application is
* The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
* started). * started).
* *
* @param topic the topic name * <p>To explicitly set key/value serdes or the partitioning strategy, use {@link #to(String, Produced)}.
*
* @param topic
* the output topic name
*
* @see #to(TopicNameExtractor)
*/ */
void to(final String topic); void to(final String topic);
/** /**
* Materialize this stream to a topic using the provided {@link Produced} instance. * See {@link #to(String).}
* The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
* started).
*
* @param topic the topic name
* @param produced the options to use when producing to the topic
*/ */
void to(final String topic, void to(final String topic,
final Produced<K, V> produced); final Produced<K, V> produced);
/** /**
* Dynamically materialize this stream to topics using default serializers specified in the config and producer's * Materialize the record of this stream to different topics.
* default partitioning strategy. * The provided {@link TopicNameExtractor} is applied to each input record to compute the output topic name.
* The topic names for each record to send to is dynamically determined based on the {@link TopicNameExtractor}. * All topics should be manually created before they are used (i.e., before the Kafka Streams application is started).
* *
* @param topicExtractor the extractor to determine the name of the Kafka topic to write to for each record * <p>To explicitly set key/value serdes or the partitioning strategy, use {@link #to(TopicNameExtractor, Produced)}.
*
* @param topicExtractor
* the extractor to determine the name of the Kafka topic to write to for each record
*
* @see #to(String)
*/ */
void to(final TopicNameExtractor<K, V> topicExtractor); void to(final TopicNameExtractor<K, V> topicExtractor);
/** /**
* Dynamically materialize this stream to topics using the provided {@link Produced} instance. * See {@link #to(TopicNameExtractor)}.
* The topic names for each record to send to is dynamically determined based on the {@link TopicNameExtractor}.
*
* @param topicExtractor the extractor to determine the name of the Kafka topic to write to for each record
* @param produced the options to use when producing to the topic
*/ */
void to(final TopicNameExtractor<K, V> topicExtractor, void to(final TopicNameExtractor<K, V> topicExtractor,
final Produced<K, V> produced); final Produced<K, V> produced);
/** /**
* Convert this stream to a {@link KTable}. * Convert this stream to a {@link KTable}.
* <p> * The conversion is a logical operation and only changes the "interpretation" of the records, i.e., each record of
* If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, * this stream is a "fact/event" and is re-interpreted as a "change/update-per-key" now
* {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or * (cf. {@link KStream} vs {@link KTable}). The resulting {@link KTable} is essentially a changelog stream.
* {@link #process(ProcessorSupplier, String...)}) an internal repartitioning topic will be created in Kafka. * To "upsert" the records of this stream into a materialized {@link KTable} (i.e., into a state store),
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in * use {@link #toTable(Materialized)}.
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned
* correctly on its key.
* Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because
* repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of
* a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
* <p>
* Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of
* it was a "fact/event" and is re-interpreted as update now (cf. {@link KStream} vs {@code KTable}).
* *
* @return a {@link KTable} that contains the same records as this {@code KStream} * <p>Note that {@code null} keys are not supported by {@code KTables} and records with {@code null} key will be dropped.
*
* <p>If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
* {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or {@link #process(ProcessorSupplier, String...)})
* Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in
* Kafka and write and re-read the data via this topic such that the resulting {@link KTable} is correctly
* partitioned by its key.
*
* <p>This internal repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition",
* where "applicationId" is user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
* The number of partitions for the repartition topic is determined based on the upstream topics partition numbers.
* Furthermore, the topic will be created with infinite retention time and data will be automatically purged
* by Kafka Streams.
*
* <p>Note: If the result {@link KTable} is materialized, it is not possible to apply
* {@link StreamsConfig#REUSE_KTABLE_SOURCE_TOPICS "source topic optimization"}, because
* repartition topics are considered transient and don't allow to recover the result {@link KTable} in case of
* a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
*
* <p>You can retrieve all generated internal topic names via {@link Topology#describe()}.
* To customize the name of the repartition topic, use {@link #toTable(Named)}.
* For more control over the repartitioning, use {@link #repartition(Repartitioned)} before {@code toTable()}.
*
* @return A {@link KTable} that contains the same records as this {@code KStream}.
*/ */
KTable<K, V> toTable(); KTable<K, V> toTable();
/** /**
* Convert this stream to a {@link KTable}. * See {@link #toTable()}.
* <p>
* If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
* {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
* {@link #process(ProcessorSupplier, String...)}}) an internal repartitioning topic will be created in Kafka.
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned
* correctly on its key.
* Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because
* repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of
* a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
* <p>
* Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of
* it was a "fact/event" and is re-interpreted as update now (cf. {@link KStream} vs {@code KTable}).
*
* @param named a {@link Named} config used to name the processor in the topology
* @return a {@link KTable} that contains the same records as this {@code KStream}
*/ */
KTable<K, V> toTable(final Named named); KTable<K, V> toTable(final Named named);
/** /**
* Convert this stream to a {@link KTable}. * See {@link #toTable()}.
* <p>
* If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
* {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
* {@link #process(ProcessorSupplier, String...)}}) an internal repartitioning topic will be created in Kafka.
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned
* correctly on its key.
* Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because
* repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of
* a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
* <p>
* Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of
* it was a "fact/event" and is re-interpreted as update now (cf. {@link KStream} vs {@code KTable}).
*
* @param materialized an instance of {@link Materialized} used to describe how the state store of the
* resulting table should be materialized.
* @return a {@link KTable} that contains the same records as this {@code KStream}
*/ */
KTable<K, V> toTable(final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); KTable<K, V> toTable(final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
/** /**
* Convert this stream to a {@link KTable}. * See {@link #toTable()}.
* <p>
* If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
* {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
* {@link #process(ProcessorSupplier, String...)}}) an internal repartitioning topic will be created in Kafka.
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned
* correctly on its key.
* Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because
* repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of
* a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
* <p>
* Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of
* it was a "fact/event" and is re-interpreted as update now (cf. {@link KStream} vs {@code KTable}).
*
* @param named a {@link Named} config used to name the processor in the topology
* @param materialized an instance of {@link Materialized} used to describe how the state store of the
* resulting table should be materialized.
* @return a {@link KTable} that contains the same records as this {@code KStream}
*/ */
KTable<K, V> toTable(final Named named, KTable<K, V> toTable(final Named named,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);