From bcc58b4cfef222c2e54b313a1db657b5754833d3 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 14 Feb 2025 13:47:23 -0800 Subject: [PATCH] MINOR: cleanup top level class JavaDocs for main interfaces of Kafka Streams DSL (2/N) (#18882) Reviewers: Bill Bejeck --- .../streams/kstream/CogroupedKStream.java | 47 ++++++++------- .../kafka/streams/kstream/KGroupedStream.java | 59 ++++++++++--------- .../kafka/streams/kstream/KGroupedTable.java | 25 ++++---- .../kstream/internals/KGroupedStreamImpl.java | 22 +++---- 4 files changed, 77 insertions(+), 76 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java index f4c975848b2..fa6ed8c9a56 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java @@ -18,27 +18,30 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.TimestampedKeyValueStore; /** - * {@code CogroupedKStream} is an abstraction of multiple grouped record streams of {@link KeyValue} pairs. - *

- * It is an intermediate representation after a grouping of {@link KStream}s, before the - * aggregations are applied to the new partitions resulting in a {@link KTable}. - *

- * A {@code CogroupedKStream} must be obtained from a {@link KGroupedStream} via - * {@link KGroupedStream#cogroup(Aggregator) cogroup(...)}. + * {@code CogroupedKStream} is an abstraction of one or more {@link KGroupedStream grouped record streams} of + * {@link Record key-value} pairs. * - * @param Type of keys - * @param Type of values after agg + *

A {@code CogroupedKStream} can be either windowed by applying {@code windowedBy(...)} operation, + * or can be {@link #aggregate(Initializer) aggregated} into a {@link KTable}. + * + *

A {@code CogroupedKStream} is initialized from a single + * {@link KGroupedStream#cogroup(Aggregator) grouped record stream}, and can be combined with one or more other + * {@link CogroupedKStream#cogroup(KGroupedStream, Aggregator) grouped record streams}, + * before windowing or aggregation is applied. + * + * @param the key type of this co-grouped stream + * @param the result value type of the applied aggregation */ -public interface CogroupedKStream { +public interface CogroupedKStream { /** * Add an already {@link KGroupedStream grouped KStream} to this {@code CogroupedKStream}. @@ -63,8 +66,8 @@ public interface CogroupedKStream { * * @return a {@code CogroupedKStream} */ - CogroupedKStream cogroup(final KGroupedStream groupedStream, - final Aggregator aggregator); + CogroupedKStream cogroup(final KGroupedStream groupedStream, + final Aggregator aggregator); /** * Aggregate the values of records in these streams by the grouped key. @@ -116,7 +119,7 @@ public interface CogroupedKStream { * @return a {@link KTable} that contains "update" records with unmodified keys, and values that * represent the latest (rolling) aggregate for each key */ - KTable aggregate(final Initializer initializer); + KTable aggregate(final Initializer initializer); /** * Aggregate the values of records in these streams by the grouped key. @@ -170,7 +173,7 @@ public interface CogroupedKStream { * @return a {@link KTable} that contains "update" records with unmodified keys, and values that * represent the latest (rolling) aggregate for each key */ - KTable aggregate(final Initializer initializer, + KTable aggregate(final Initializer initializer, final Named named); /** @@ -224,8 +227,8 @@ public interface CogroupedKStream { * @return a {@link KTable} that contains "update" records with unmodified keys, and values that * represent the latest (rolling) aggregate for each key */ - KTable aggregate(final Initializer initializer, - final Materialized> materialized); + KTable aggregate(final Initializer initializer, + final Materialized> materialized); /** * Aggregate the values of records in these streams by the grouped key. @@ -281,9 +284,9 @@ public interface CogroupedKStream { * @return a {@link KTable} that contains "update" records with unmodified keys, and values that * represent the latest (rolling) aggregate for each key */ - KTable aggregate(final Initializer initializer, + KTable aggregate(final Initializer initializer, final Named named, - final Materialized> materialized); + final Materialized> materialized); /** * Create a new {@link TimeWindowedCogroupedKStream} instance that can be used to perform windowed @@ -296,7 +299,7 @@ public interface CogroupedKStream { * * @return an instance of {@link TimeWindowedCogroupedKStream} */ - TimeWindowedCogroupedKStream windowedBy(final Windows windows); + TimeWindowedCogroupedKStream windowedBy(final Windows windows); /** * Create a new {@link TimeWindowedCogroupedKStream} instance that can be used to perform sliding @@ -307,7 +310,7 @@ public interface CogroupedKStream { * * @return an instance of {@link TimeWindowedCogroupedKStream} */ - TimeWindowedCogroupedKStream windowedBy(final SlidingWindows windows); + TimeWindowedCogroupedKStream windowedBy(final SlidingWindows windows); /** * Create a new {@link SessionWindowedCogroupedKStream} instance that can be used to perform session @@ -318,6 +321,6 @@ public interface CogroupedKStream { * * @return an instance of {@link SessionWindowedCogroupedKStream} */ - SessionWindowedCogroupedKStream windowedBy(final SessionWindows windows); + SessionWindowedCogroupedKStream windowedBy(final SessionWindows windows); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java index 669511dc7f4..9f6d020baf3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java @@ -22,24 +22,25 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.TimestampedKeyValueStore; /** - * {@code KGroupedStream} is an abstraction of a grouped record stream of {@link KeyValue} pairs. - * It is an intermediate representation of a {@link KStream} in order to apply an aggregation operation on the original - * {@link KStream} records. - *

- * It is an intermediate representation after a grouping of a {@link KStream} before an aggregation is applied to the - * new partitions resulting in a {@link KTable}. - *

- * A {@code KGroupedStream} must be obtained from a {@link KStream} via {@link KStream#groupByKey() groupByKey()} or + * {@code KGroupedStream} is an abstraction of a grouped record stream of {@link Record key-value} pairs. + * It is an intermediate representation of a {@link KStream} in order to apply a (windowed) aggregation operation + * on the original {@link KStream} records. + * + *

A {@code KGroupedStream} can be either {@link #cogroup(Aggregator) co-grouped} with other + * {@link CogroupedKStream#cogroup(KGroupedStream, Aggregator) grouped record streams}, windowed by applying + * {@code windowedBy(...)} operation, or can be aggregated into a {@link KTable}. + * + *

A {@code KGroupedStream} is obtained from a {@link KStream} via {@link KStream#groupByKey() groupByKey()} or * {@link KStream#groupBy(KeyValueMapper) groupBy(...)}. * - * @param Type of keys - * @param Type of values - * @see KStream + * @param the key type of this grouped stream + * @param the value type of this grouped stream */ public interface KGroupedStream { @@ -201,7 +202,7 @@ public interface KGroupedStream { * Combine the values of records in this stream by the grouped key. * Records with {@code null} key or value are ignored. * Combining implies that the type of the aggregate result is the same as the type of the input value - * (c.f. {@link #aggregate(Initializer, Aggregator)}). + * (cf. {@link #aggregate(Initializer, Aggregator)}). *

* The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current * aggregate and the record's value. @@ -239,7 +240,7 @@ public interface KGroupedStream { * Combine the value of records in this stream by the grouped key. * Records with {@code null} key or value are ignored. * Combining implies that the type of the aggregate result is the same as the type of the input value - * (c.f. {@link #aggregate(Initializer, Aggregator, Materialized)}). + * (cf. {@link #aggregate(Initializer, Aggregator, Materialized)}). * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) * provided by the given store name in {@code materialized}. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. @@ -304,7 +305,7 @@ public interface KGroupedStream { * Combine the value of records in this stream by the grouped key. * Records with {@code null} key or value are ignored. * Combining implies that the type of the aggregate result is the same as the type of the input value - * (c.f. {@link #aggregate(Initializer, Aggregator, Materialized)}). + * (cf. {@link #aggregate(Initializer, Aggregator, Materialized)}). * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) * provided by the given store name in {@code materialized}. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. @@ -380,7 +381,7 @@ public interface KGroupedStream { * aggregate (or for the very first record using the intermediate aggregation result provided via the * {@link Initializer}) and the record's value. * Thus, {@code aggregate(Initializer, Aggregator)} can be used to compute aggregate functions like - * count (c.f. {@link #count()}). + * count (cf. {@link #count()}). *

* The default value serde from config will be used for serializing the result. * If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Materialized)}. @@ -405,14 +406,14 @@ public interface KGroupedStream { * * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result * @param aggregator an {@link Aggregator} that computes a new aggregate result - * @param the value type of the resulting {@link KTable} + * @param the value type of the resulting {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key. If the aggregate function returns {@code null}, it is then interpreted as * deletion for the key, and future messages of the same key coming from upstream operators * will be handled as newly initialized value. */ - KTable aggregate(final Initializer initializer, - final Aggregator aggregator); + KTable aggregate(final Initializer initializer, + final Aggregator aggregator); /** * Aggregate the values of records in this stream by the grouped key. @@ -429,7 +430,7 @@ public interface KGroupedStream { * aggregate (or for the very first record using the intermediate aggregation result provided via the * {@link Initializer}) and the record's value. * Thus, {@code aggregate(Initializer, Aggregator, Materialized)} can be used to compute aggregate functions like - * count (c.f. {@link #count()}). + * count (cf. {@link #count()}). *

* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to * the same key. @@ -466,13 +467,13 @@ public interface KGroupedStream { * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result * @param aggregator an {@link Aggregator} that computes a new aggregate result * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. - * @param the value type of the resulting {@link KTable} + * @param the value type of the resulting {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key */ - KTable aggregate(final Initializer initializer, - final Aggregator aggregator, - final Materialized> materialized); + KTable aggregate(final Initializer initializer, + final Aggregator aggregator, + final Materialized> materialized); /** * Aggregate the values of records in this stream by the grouped key. @@ -489,7 +490,7 @@ public interface KGroupedStream { * aggregate (or for the very first record using the intermediate aggregation result provided via the * {@link Initializer}) and the record's value. * Thus, {@code aggregate(Initializer, Aggregator, Materialized)} can be used to compute aggregate functions like - * count (c.f. {@link #count()}). + * count (cf. {@link #count()}). *

* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to * the same key. @@ -527,16 +528,16 @@ public interface KGroupedStream { * @param aggregator an {@link Aggregator} that computes a new aggregate result * @param named a {@link Named} config used to name the processor in the topology * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. - * @param the value type of the resulting {@link KTable} + * @param the value type of the resulting {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key. If the aggregate function returns {@code null}, it is then interpreted as * deletion for the key, and future messages of the same key coming from upstream operators * will be handled as newly initialized value. */ - KTable aggregate(final Initializer initializer, - final Aggregator aggregator, - final Named named, - final Materialized> materialized); + KTable aggregate(final Initializer initializer, + final Aggregator aggregator, + final Named named, + final Materialized> materialized); /** * Create a new {@link TimeWindowedKStream} instance that can be used to perform windowed aggregations. diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java index 1eb9be3a02c..dee237f7ae6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java @@ -25,18 +25,15 @@ import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; /** - * {@code KGroupedTable} is an abstraction of a re-grouped changelog stream from a primary-keyed table, - * usually on a different grouping key than the original primary key. - *

- * It is an intermediate representation after a re-grouping of a {@link KTable} before an aggregation is applied to the - * new partitions resulting in a new {@link KTable}. - *

- * A {@code KGroupedTable} must be obtained from a {@link KTable} via {@link KTable#groupBy(KeyValueMapper) - * groupBy(...)}. + * {@code KGroupedTable} is an abstraction of a re-grouped changelog stream from a primary-keyed table, + * on a different grouping key than the original primary key. + * It is an intermediate representation of a {@link KTable} in order to apply an aggregation operation on the original + * {@link KTable} records. * - * @param Type of keys - * @param Type of values - * @see KTable + *

A {@code KGroupedTable} is obtained from a {@link KTable} via {@link KTable#groupBy(KeyValueMapper) groupBy(...)}. + * + * @param the (new) grouping key type of this re-grouped table + * @param the (new) value type of this re-grouped table */ public interface KGroupedTable { @@ -192,7 +189,7 @@ public interface KGroupedTable { * mapped} to the same key into a new instance of {@link KTable}. * Records with {@code null} key are ignored. * Combining implies that the type of the aggregate result is the same as the type of the input value - * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}). + * (cf. {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}). * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) * that can be queried using the provided {@code queryableStoreName}. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. @@ -266,7 +263,7 @@ public interface KGroupedTable { * mapped} to the same key into a new instance of {@link KTable}. * Records with {@code null} key are ignored. * Combining implies that the type of the aggregate result is the same as the type of the input value - * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}). + * (cf. {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}). * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) * that can be queried using the provided {@code queryableStoreName}. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. @@ -341,7 +338,7 @@ public interface KGroupedTable { * mapped} to the same key into a new instance of {@link KTable}. * Records with {@code null} key are ignored. * Combining implies that the type of the aggregate result is the same as the type of the input value - * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator)}). + * (cf. {@link #aggregate(Initializer, Aggregator, Aggregator)}). * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. *

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java index 73c6174b27b..72af21601ac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java @@ -107,23 +107,23 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedS } @Override - public KTable aggregate(final Initializer initializer, - final Aggregator aggregator, - final Materialized> materialized) { + public KTable aggregate(final Initializer initializer, + final Aggregator aggregator, + final Materialized> materialized) { return aggregate(initializer, aggregator, NamedInternal.empty(), materialized); } @Override - public KTable aggregate(final Initializer initializer, - final Aggregator aggregator, - final Named named, - final Materialized> materialized) { + public KTable aggregate(final Initializer initializer, + final Aggregator aggregator, + final Named named, + final Materialized> materialized) { Objects.requireNonNull(initializer, "initializer can't be null"); Objects.requireNonNull(aggregator, "aggregator can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); Objects.requireNonNull(named, "named can't be null"); - final MaterializedInternal> materializedInternal = + final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); if (materializedInternal.keySerde() == null) { @@ -131,7 +131,7 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedS } final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); - final KeyValueStoreMaterializer storeFactory = new KeyValueStoreMaterializer<>(materializedInternal); + final KeyValueStoreMaterializer storeFactory = new KeyValueStoreMaterializer<>(materializedInternal); return doAggregate( new KStreamAggregate<>(storeFactory, initializer, aggregator), @@ -141,8 +141,8 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedS } @Override - public KTable aggregate(final Initializer initializer, - final Aggregator aggregator) { + public KTable aggregate(final Initializer initializer, + final Aggregator aggregator) { return aggregate(initializer, aggregator, Materialized.with(keySerde, null)); }