MINOR: cleanup KStream JavaDocs (3/N) - groupBy[Key] (#18705)

Reviewers: Alieh Saeedi <asaeedi@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-01-30 19:52:14 -08:00 committed by GitHub
parent 0d1e7e04b2
commit 281a3c6a3a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 77 additions and 136 deletions

View File

@ -942,127 +942,68 @@ public interface KStream<K, V> {
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
/** /**
* Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper} * Group the records by their current key into a {@link KGroupedStream} while preserving the original values.
* and default serializers and deserializers.
* {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}. * {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}.
* Grouping a stream on the record key is required before an aggregation operator can be applied to the data * (Co-)Grouping a stream on the record key is required before a windowing or aggregation operator can be applied
* (cf. {@link KGroupedStream}). * to the data (cf. {@link KGroupedStream}).
* The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the * By default, the current key is used as grouping key, but a new grouping key can be set via
* original values. * {@link #groupBy(KeyValueMapper)}.
* If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream} * In either case, if the grouping key is {@code null}, the record will be dropped.
* <p>
* Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a
* later operator depends on the newly selected key.
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* All data of this stream will be redistributed through the repartitioning topic by writing all records to it,
* and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned on the new key.
* <p>
* This operation is equivalent to calling {@link #selectKey(KeyValueMapper)} followed by {@link #groupByKey()}.
* If the key type is changed, it is recommended to use {@link #groupBy(KeyValueMapper, Grouped)} instead.
* *
* @param keySelector a {@link KeyValueMapper} that computes a new key for grouping * <p>If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
* @param <KR> the key type of the result {@link KGroupedStream}
* @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
*/
<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> keySelector);
/**
* Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper}
* and {@link Serde}s as specified by {@link Grouped}.
* {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}.
* Grouping a stream on the record key is required before an aggregation operator can be applied to the data
* (cf. {@link KGroupedStream}).
* The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the
* original values.
* If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
* <p>
* Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a later
* operator depends on the newly selected key.
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an
* internally generated name.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* All data of this stream will be redistributed through the repartitioning topic by writing all records to it,
* and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned on the new key.
* <p>
* This operation is equivalent to calling {@link #selectKey(KeyValueMapper)} followed by {@link #groupByKey()}.
*
* @param keySelector a {@link KeyValueMapper} that computes a new key for grouping
* @param grouped the {@link Grouped} instance used to specify {@link org.apache.kafka.common.serialization.Serdes}
* and part of the name for a repartition topic if repartitioning is required.
* @param <KR> the key type of the result {@link KGroupedStream}
* @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
*/
<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> keySelector,
final Grouped<KR, V> grouped);
/**
* Group the records by their current key into a {@link KGroupedStream} while preserving the original values
* and default serializers and deserializers.
* {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}.
* Grouping a stream on the record key is required before an aggregation operator can be applied to the data
* (cf. {@link KGroupedStream}).
* If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
* <p>
* If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
* {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
* {@link #process(ProcessorSupplier, String...)}) an internal repartitioning topic may need to be created in * {@link #process(ProcessorSupplier, String...)}) Kafka Streams will automatically repartition the data, i.e.,
* Kafka if a later operator depends on the newly selected key. * it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic such that
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in * the resulting {@link KGroupedStream} is correctly partitioned by the grouping key, before the downstream
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, * windowing/aggregation will be applied.
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned
* correctly on its key.
* If the last key changing operator changed the key type, it is recommended to use
* {@link #groupByKey(org.apache.kafka.streams.kstream.Grouped)} instead.
* *
* @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream} * <p>This internal repartition topic will be named "${applicationId}-&lt;name&gt;-repartition",
* @see #groupBy(KeyValueMapper) * where "applicationId" is user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
* The number of partitions for the repartition topic is determined based on the upstream topics partition numbers.
* Furthermore, the topic will be created with infinite retention time and data will be automatically purged
* by Kafka Streams.
*
* <p>You can retrieve all generated internal topic names via {@link Topology#describe()}.
* To explicitly set key/value serdes or to customize the name of the repartition topic, use {@link #groupByKey(Grouped)}.
* For more control over the repartitioning, use {@link #repartition(Repartitioned)} before {@code groupByKey()}.
*
* @return A {@link KGroupedStream} that contains the grouped records of the original {@code KStream}.
*/ */
KGroupedStream<K, V> groupByKey(); KGroupedStream<K, V> groupByKey();
/** /**
* Group the records by their current key into a {@link KGroupedStream} while preserving the original values * See {@link #groupByKey()}.
* and using the serializers as defined by {@link Grouped}.
* {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}.
* Grouping a stream on the record key is required before an aggregation operator can be applied to the data
* (cf. {@link KGroupedStream}).
* If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
* <p>
* If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
* {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
* {@link #process(ProcessorSupplier, String...)}) an internal repartitioning topic may need to be created in
* Kafka if a later operator depends on the newly selected key.
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* &lt;name&gt; is either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally
* generated name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned
* correctly on its key.
* *
* @param grouped the {@link Grouped} instance used to specify {@link Serdes} * <p>Takes an additional {@link Grouped} parameter, that allows to explicitly set key/value serdes or to customize
* and part of the name for a repartition topic if repartitioning is required. * the name of the potentially created internal repartition topic.
* @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
* @see #groupBy(KeyValueMapper)
*/ */
KGroupedStream<K, V> groupByKey(final Grouped<K, V> grouped); KGroupedStream<K, V> groupByKey(final Grouped<K, V> grouped);
/**
* Group the records of this {@code KStream} on a new key (in contrast to {@link #groupByKey()}).
* This operation is semantically equivalent to {@link #selectKey(KeyValueMapper)} followed by {@link #groupByKey()}.
*
* <p>Because a new key is selected, an internal repartitioning topic will be created in Kafka.
* See {@link #groupByKey()} for more details about auto-repartitioning.
*
* @param keySelector
* a {@link KeyValueMapper} that computes a new key for grouping
*
* @param <KOut> the new key type of the result {@link KGroupedStream}
*/
<KOut> KGroupedStream<KOut, V> groupBy(final KeyValueMapper<? super K, ? super V, KOut> keySelector);
/**
* See {@link #groupBy(KeyValueMapper)}.
*
* <p>Takes an additional {@link Grouped} parameter, that allows to explicitly set key/value serdes or to customize
* the name of the created internal repartition topic.
*/
<KOut> KGroupedStream<KOut, V> groupBy(final KeyValueMapper<? super K, ? super V, KOut> keySelector,
final Grouped<KOut, V> grouped);
/** /**
* Join records of this stream with another {@code KStream}'s records using windowed inner equi join with default * Join records of this stream with another {@code KStream}'s records using windowed inner equi join with default
* serializers and deserializers. * serializers and deserializers.
@ -2283,7 +2224,7 @@ public interface KStream<K, V> {
* {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record. * {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as for both joining input records. * The key of the result record is the same as for both joining input records.
* Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. * Note that the key is read-only and should not be modified, as this can lead to undefined behaviour.
* *
* If an {@code KStream} input record key or value is {@code null} the record will not be included in the join * If an {@code KStream} input record key or value is {@code null} the record will not be included in the join
* operation and thus no output record will be added to the resulting {@code KStream}. * operation and thus no output record will be added to the resulting {@code KStream}.
* <p> * <p>

View File

@ -681,32 +681,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
); );
} }
@Override
public <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> keySelector) {
return groupBy(keySelector, Grouped.with(null, valueSerde));
}
@Override
public <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> keySelector,
final Grouped<KR, V> grouped) {
Objects.requireNonNull(keySelector, "keySelector can't be null");
Objects.requireNonNull(grouped, "grouped can't be null");
final GroupedInternal<KR, V> groupedInternal = new GroupedInternal<>(grouped);
final ProcessorGraphNode<K, V> selectKeyMapNode = internalSelectKey(keySelector, new NamedInternal(groupedInternal.name()));
selectKeyMapNode.keyChangingOperation(true);
builder.addGraphNode(graphNode, selectKeyMapNode);
return new KGroupedStreamImpl<>(
selectKeyMapNode.nodeName(),
subTopologySourceNodes,
groupedInternal,
true,
selectKeyMapNode,
builder);
}
@Override @Override
public KGroupedStream<K, V> groupByKey() { public KGroupedStream<K, V> groupByKey() {
return groupByKey(Grouped.with(keySerde, valueSerde)); return groupByKey(Grouped.with(keySerde, valueSerde));
@ -727,6 +701,32 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
builder); builder);
} }
@Override
public <KOUT> KGroupedStream<KOUT, V> groupBy(final KeyValueMapper<? super K, ? super V, KOUT> keySelector) {
return groupBy(keySelector, Grouped.with(null, valueSerde));
}
@Override
public <KOut> KGroupedStream<KOut, V> groupBy(final KeyValueMapper<? super K, ? super V, KOut> keySelector,
final Grouped<KOut, V> grouped) {
Objects.requireNonNull(keySelector, "keySelector can't be null");
Objects.requireNonNull(grouped, "grouped can't be null");
final GroupedInternal<KOut, V> groupedInternal = new GroupedInternal<>(grouped);
final ProcessorGraphNode<K, V> selectKeyMapNode = internalSelectKey(keySelector, new NamedInternal(groupedInternal.name()));
selectKeyMapNode.keyChangingOperation(true);
builder.addGraphNode(graphNode, selectKeyMapNode);
return new KGroupedStreamImpl<>(
selectKeyMapNode.nodeName(),
subTopologySourceNodes,
groupedInternal,
true,
selectKeyMapNode,
builder);
}
@Override @Override
public <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, public <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,