diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
index f4c975848b2..fa6ed8c9a56 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
@@ -18,27 +18,30 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
/**
- * {@code CogroupedKStream} is an abstraction of multiple grouped record streams of {@link KeyValue} pairs.
- *
- * It is an intermediate representation after a grouping of {@link KStream}s, before the
- * aggregations are applied to the new partitions resulting in a {@link KTable}.
- *
- * A {@code CogroupedKStream} must be obtained from a {@link KGroupedStream} via
- * {@link KGroupedStream#cogroup(Aggregator) cogroup(...)}.
+ * {@code CogroupedKStream} is an abstraction of one or more {@link KGroupedStream grouped record streams} of
+ * {@link Record key-value} pairs.
*
- * @param Type of keys
- * @param Type of values after agg
+ * A {@code CogroupedKStream} can be either windowed by applying {@code windowedBy(...)} operation,
+ * or can be {@link #aggregate(Initializer) aggregated} into a {@link KTable}.
+ *
+ *
A {@code CogroupedKStream} is initialized from a single
+ * {@link KGroupedStream#cogroup(Aggregator) grouped record stream}, and can be combined with one or more other
+ * {@link CogroupedKStream#cogroup(KGroupedStream, Aggregator) grouped record streams},
+ * before windowing or aggregation is applied.
+ *
+ * @param the key type of this co-grouped stream
+ * @param the result value type of the applied aggregation
*/
-public interface CogroupedKStream {
+public interface CogroupedKStream {
/**
* Add an already {@link KGroupedStream grouped KStream} to this {@code CogroupedKStream}.
@@ -63,8 +66,8 @@ public interface CogroupedKStream {
*
* @return a {@code CogroupedKStream}
*/
- CogroupedKStream cogroup(final KGroupedStream groupedStream,
- final Aggregator super K, ? super V, VAgg> aggregator);
+ CogroupedKStream cogroup(final KGroupedStream groupedStream,
+ final Aggregator super K, ? super V, VOut> aggregator);
/**
* Aggregate the values of records in these streams by the grouped key.
@@ -116,7 +119,7 @@ public interface CogroupedKStream {
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that
* represent the latest (rolling) aggregate for each key
*/
- KTable aggregate(final Initializer initializer);
+ KTable aggregate(final Initializer initializer);
/**
* Aggregate the values of records in these streams by the grouped key.
@@ -170,7 +173,7 @@ public interface CogroupedKStream {
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that
* represent the latest (rolling) aggregate for each key
*/
- KTable aggregate(final Initializer initializer,
+ KTable aggregate(final Initializer initializer,
final Named named);
/**
@@ -224,8 +227,8 @@ public interface CogroupedKStream {
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that
* represent the latest (rolling) aggregate for each key
*/
- KTable aggregate(final Initializer initializer,
- final Materialized> materialized);
+ KTable aggregate(final Initializer initializer,
+ final Materialized> materialized);
/**
* Aggregate the values of records in these streams by the grouped key.
@@ -281,9 +284,9 @@ public interface CogroupedKStream {
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that
* represent the latest (rolling) aggregate for each key
*/
- KTable aggregate(final Initializer initializer,
+ KTable aggregate(final Initializer initializer,
final Named named,
- final Materialized> materialized);
+ final Materialized> materialized);
/**
* Create a new {@link TimeWindowedCogroupedKStream} instance that can be used to perform windowed
@@ -296,7 +299,7 @@ public interface CogroupedKStream {
*
* @return an instance of {@link TimeWindowedCogroupedKStream}
*/
- TimeWindowedCogroupedKStream windowedBy(final Windows windows);
+ TimeWindowedCogroupedKStream windowedBy(final Windows windows);
/**
* Create a new {@link TimeWindowedCogroupedKStream} instance that can be used to perform sliding
@@ -307,7 +310,7 @@ public interface CogroupedKStream {
*
* @return an instance of {@link TimeWindowedCogroupedKStream}
*/
- TimeWindowedCogroupedKStream windowedBy(final SlidingWindows windows);
+ TimeWindowedCogroupedKStream windowedBy(final SlidingWindows windows);
/**
* Create a new {@link SessionWindowedCogroupedKStream} instance that can be used to perform session
@@ -318,6 +321,6 @@ public interface CogroupedKStream {
*
* @return an instance of {@link SessionWindowedCogroupedKStream}
*/
- SessionWindowedCogroupedKStream windowedBy(final SessionWindows windows);
+ SessionWindowedCogroupedKStream windowedBy(final SessionWindows windows);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 669511dc7f4..9f6d020baf3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -22,24 +22,25 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
/**
- * {@code KGroupedStream} is an abstraction of a grouped record stream of {@link KeyValue} pairs.
- * It is an intermediate representation of a {@link KStream} in order to apply an aggregation operation on the original
- * {@link KStream} records.
- *
- * It is an intermediate representation after a grouping of a {@link KStream} before an aggregation is applied to the
- * new partitions resulting in a {@link KTable}.
- *
- * A {@code KGroupedStream} must be obtained from a {@link KStream} via {@link KStream#groupByKey() groupByKey()} or
+ * {@code KGroupedStream} is an abstraction of a grouped record stream of {@link Record key-value} pairs.
+ * It is an intermediate representation of a {@link KStream} in order to apply a (windowed) aggregation operation
+ * on the original {@link KStream} records.
+ *
+ *
A {@code KGroupedStream} can be either {@link #cogroup(Aggregator) co-grouped} with other
+ * {@link CogroupedKStream#cogroup(KGroupedStream, Aggregator) grouped record streams}, windowed by applying
+ * {@code windowedBy(...)} operation, or can be aggregated into a {@link KTable}.
+ *
+ *
A {@code KGroupedStream} is obtained from a {@link KStream} via {@link KStream#groupByKey() groupByKey()} or
* {@link KStream#groupBy(KeyValueMapper) groupBy(...)}.
*
- * @param Type of keys
- * @param Type of values
- * @see KStream
+ * @param the key type of this grouped stream
+ * @param the value type of this grouped stream
*/
public interface KGroupedStream {
@@ -201,7 +202,7 @@ public interface KGroupedStream {
* Combine the values of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value
- * (c.f. {@link #aggregate(Initializer, Aggregator)}).
+ * (cf. {@link #aggregate(Initializer, Aggregator)}).
*
* The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
* aggregate and the record's value.
@@ -239,7 +240,7 @@ public interface KGroupedStream {
* Combine the value of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value
- * (c.f. {@link #aggregate(Initializer, Aggregator, Materialized)}).
+ * (cf. {@link #aggregate(Initializer, Aggregator, Materialized)}).
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* provided by the given store name in {@code materialized}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@@ -304,7 +305,7 @@ public interface KGroupedStream {
* Combine the value of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value
- * (c.f. {@link #aggregate(Initializer, Aggregator, Materialized)}).
+ * (cf. {@link #aggregate(Initializer, Aggregator, Materialized)}).
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* provided by the given store name in {@code materialized}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@@ -380,7 +381,7 @@ public interface KGroupedStream {
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* Thus, {@code aggregate(Initializer, Aggregator)} can be used to compute aggregate functions like
- * count (c.f. {@link #count()}).
+ * count (cf. {@link #count()}).
*
* The default value serde from config will be used for serializing the result.
* If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Materialized)}.
@@ -405,14 +406,14 @@ public interface KGroupedStream {
*
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
* @param aggregator an {@link Aggregator} that computes a new aggregate result
- * @param the value type of the resulting {@link KTable}
+ * @param the value type of the resulting {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key. If the aggregate function returns {@code null}, it is then interpreted as
* deletion for the key, and future messages of the same key coming from upstream operators
* will be handled as newly initialized value.
*/
- KTable aggregate(final Initializer initializer,
- final Aggregator super K, ? super V, VR> aggregator);
+ KTable aggregate(final Initializer initializer,
+ final Aggregator super K, ? super V, VOut> aggregator);
/**
* Aggregate the values of records in this stream by the grouped key.
@@ -429,7 +430,7 @@ public interface KGroupedStream {
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* Thus, {@code aggregate(Initializer, Aggregator, Materialized)} can be used to compute aggregate functions like
- * count (c.f. {@link #count()}).
+ * count (cf. {@link #count()}).
*
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same key.
@@ -466,13 +467,13 @@ public interface KGroupedStream {
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
* @param aggregator an {@link Aggregator} that computes a new aggregate result
* @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
- * @param the value type of the resulting {@link KTable}
+ * @param the value type of the resulting {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
*/
- KTable aggregate(final Initializer initializer,
- final Aggregator super K, ? super V, VR> aggregator,
- final Materialized> materialized);
+ KTable aggregate(final Initializer initializer,
+ final Aggregator super K, ? super V, VOut> aggregator,
+ final Materialized> materialized);
/**
* Aggregate the values of records in this stream by the grouped key.
@@ -489,7 +490,7 @@ public interface KGroupedStream {
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* Thus, {@code aggregate(Initializer, Aggregator, Materialized)} can be used to compute aggregate functions like
- * count (c.f. {@link #count()}).
+ * count (cf. {@link #count()}).
*
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same key.
@@ -527,16 +528,16 @@ public interface KGroupedStream {
* @param aggregator an {@link Aggregator} that computes a new aggregate result
* @param named a {@link Named} config used to name the processor in the topology
* @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
- * @param the value type of the resulting {@link KTable}
+ * @param the value type of the resulting {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key. If the aggregate function returns {@code null}, it is then interpreted as
* deletion for the key, and future messages of the same key coming from upstream operators
* will be handled as newly initialized value.
*/
- KTable aggregate(final Initializer initializer,
- final Aggregator super K, ? super V, VR> aggregator,
- final Named named,
- final Materialized> materialized);
+ KTable aggregate(final Initializer initializer,
+ final Aggregator super K, ? super V, VOut> aggregator,
+ final Named named,
+ final Materialized> materialized);
/**
* Create a new {@link TimeWindowedKStream} instance that can be used to perform windowed aggregations.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index 1eb9be3a02c..dee237f7ae6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -25,18 +25,15 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
/**
- * {@code KGroupedTable} is an abstraction of a re-grouped changelog stream from a primary-keyed table,
- * usually on a different grouping key than the original primary key.
- *
- * It is an intermediate representation after a re-grouping of a {@link KTable} before an aggregation is applied to the
- * new partitions resulting in a new {@link KTable}.
- *
- * A {@code KGroupedTable} must be obtained from a {@link KTable} via {@link KTable#groupBy(KeyValueMapper)
- * groupBy(...)}.
+ * {@code KGroupedTable} is an abstraction of a re-grouped changelog stream from a primary-keyed table,
+ * on a different grouping key than the original primary key.
+ * It is an intermediate representation of a {@link KTable} in order to apply an aggregation operation on the original
+ * {@link KTable} records.
*
- * @param Type of keys
- * @param Type of values
- * @see KTable
+ * A {@code KGroupedTable} is obtained from a {@link KTable} via {@link KTable#groupBy(KeyValueMapper) groupBy(...)}.
+ *
+ * @param the (new) grouping key type of this re-grouped table
+ * @param the (new) value type of this re-grouped table
*/
public interface KGroupedTable {
@@ -192,7 +189,7 @@ public interface KGroupedTable {
* mapped} to the same key into a new instance of {@link KTable}.
* Records with {@code null} key are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value
- * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}).
+ * (cf. {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}).
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* that can be queried using the provided {@code queryableStoreName}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@@ -266,7 +263,7 @@ public interface KGroupedTable {
* mapped} to the same key into a new instance of {@link KTable}.
* Records with {@code null} key are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value
- * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}).
+ * (cf. {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}).
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* that can be queried using the provided {@code queryableStoreName}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@@ -341,7 +338,7 @@ public interface KGroupedTable {
* mapped} to the same key into a new instance of {@link KTable}.
* Records with {@code null} key are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value
- * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator)}).
+ * (cf. {@link #aggregate(Initializer, Aggregator, Aggregator)}).
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
*
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 73c6174b27b..72af21601ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -107,23 +107,23 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedS
}
@Override
- public KTable aggregate(final Initializer initializer,
- final Aggregator super K, ? super V, VR> aggregator,
- final Materialized> materialized) {
+ public KTable aggregate(final Initializer initializer,
+ final Aggregator super K, ? super V, VOut> aggregator,
+ final Materialized> materialized) {
return aggregate(initializer, aggregator, NamedInternal.empty(), materialized);
}
@Override
- public KTable aggregate(final Initializer initializer,
- final Aggregator super K, ? super V, VR> aggregator,
- final Named named,
- final Materialized> materialized) {
+ public KTable aggregate(final Initializer initializer,
+ final Aggregator super K, ? super V, VOut> aggregator,
+ final Named named,
+ final Materialized> materialized) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
Objects.requireNonNull(named, "named can't be null");
- final MaterializedInternal> materializedInternal =
+ final MaterializedInternal> materializedInternal =
new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
if (materializedInternal.keySerde() == null) {
@@ -131,7 +131,7 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedS
}
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
- final KeyValueStoreMaterializer storeFactory = new KeyValueStoreMaterializer<>(materializedInternal);
+ final KeyValueStoreMaterializer storeFactory = new KeyValueStoreMaterializer<>(materializedInternal);
return doAggregate(
new KStreamAggregate<>(storeFactory, initializer, aggregator),
@@ -141,8 +141,8 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedS
}
@Override
- public KTable aggregate(final Initializer initializer,
- final Aggregator super K, ? super V, VR> aggregator) {
+ public KTable aggregate(final Initializer initializer,
+ final Aggregator super K, ? super V, VOut> aggregator) {
return aggregate(initializer, aggregator, Materialized.with(keySerde, null));
}