MINOR: cleanup top level class JavaDocs for main interfaces of Kafka Streams DSL (2/N) (#18882)

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-02-14 13:47:23 -08:00 committed by GitHub
parent 835d8f3097
commit bcc58b4cfe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 77 additions and 76 deletions

View File

@ -18,27 +18,30 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.TimestampedKeyValueStore;
/** /**
* {@code CogroupedKStream} is an abstraction of multiple <i>grouped</i> record streams of {@link KeyValue} pairs. * {@code CogroupedKStream} is an abstraction of one or more {@link KGroupedStream grouped record streams} of
* <p> * {@link Record key-value} pairs.
* It is an intermediate representation after a grouping of {@link KStream}s, before the
* aggregations are applied to the new partitions resulting in a {@link KTable}.
* <p>
* A {@code CogroupedKStream} must be obtained from a {@link KGroupedStream} via
* {@link KGroupedStream#cogroup(Aggregator) cogroup(...)}.
* *
* @param <K> Type of keys * <p>A {@code CogroupedKStream} can be either windowed by applying {@code windowedBy(...)} operation,
* @param <VAgg> Type of values after agg * or can be {@link #aggregate(Initializer) aggregated} into a {@link KTable}.
*
* <p>A {@code CogroupedKStream} is initialized from a single
* {@link KGroupedStream#cogroup(Aggregator) grouped record stream}, and can be combined with one or more other
* {@link CogroupedKStream#cogroup(KGroupedStream, Aggregator) grouped record streams},
* before windowing or aggregation is applied.
*
* @param <K> the key type of this co-grouped stream
* @param <VOut> the result value type of the applied aggregation
*/ */
public interface CogroupedKStream<K, VAgg> { public interface CogroupedKStream<K, VOut> {
/** /**
* Add an already {@link KGroupedStream grouped KStream} to this {@code CogroupedKStream}. * Add an already {@link KGroupedStream grouped KStream} to this {@code CogroupedKStream}.
@ -63,8 +66,8 @@ public interface CogroupedKStream<K, VAgg> {
* *
* @return a {@code CogroupedKStream} * @return a {@code CogroupedKStream}
*/ */
<V> CogroupedKStream<K, VAgg> cogroup(final KGroupedStream<K, V> groupedStream, <V> CogroupedKStream<K, VOut> cogroup(final KGroupedStream<K, V> groupedStream,
final Aggregator<? super K, ? super V, VAgg> aggregator); final Aggregator<? super K, ? super V, VOut> aggregator);
/** /**
* Aggregate the values of records in these streams by the grouped key. * Aggregate the values of records in these streams by the grouped key.
@ -116,7 +119,7 @@ public interface CogroupedKStream<K, VAgg> {
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that * @return a {@link KTable} that contains "update" records with unmodified keys, and values that
* represent the latest (rolling) aggregate for each key * represent the latest (rolling) aggregate for each key
*/ */
KTable<K, VAgg> aggregate(final Initializer<VAgg> initializer); KTable<K, VOut> aggregate(final Initializer<VOut> initializer);
/** /**
* Aggregate the values of records in these streams by the grouped key. * Aggregate the values of records in these streams by the grouped key.
@ -170,7 +173,7 @@ public interface CogroupedKStream<K, VAgg> {
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that * @return a {@link KTable} that contains "update" records with unmodified keys, and values that
* represent the latest (rolling) aggregate for each key * represent the latest (rolling) aggregate for each key
*/ */
KTable<K, VAgg> aggregate(final Initializer<VAgg> initializer, KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
final Named named); final Named named);
/** /**
@ -224,8 +227,8 @@ public interface CogroupedKStream<K, VAgg> {
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that * @return a {@link KTable} that contains "update" records with unmodified keys, and values that
* represent the latest (rolling) aggregate for each key * represent the latest (rolling) aggregate for each key
*/ */
KTable<K, VAgg> aggregate(final Initializer<VAgg> initializer, KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
final Materialized<K, VAgg, KeyValueStore<Bytes, byte[]>> materialized); final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized);
/** /**
* Aggregate the values of records in these streams by the grouped key. * Aggregate the values of records in these streams by the grouped key.
@ -281,9 +284,9 @@ public interface CogroupedKStream<K, VAgg> {
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that * @return a {@link KTable} that contains "update" records with unmodified keys, and values that
* represent the latest (rolling) aggregate for each key * represent the latest (rolling) aggregate for each key
*/ */
KTable<K, VAgg> aggregate(final Initializer<VAgg> initializer, KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
final Named named, final Named named,
final Materialized<K, VAgg, KeyValueStore<Bytes, byte[]>> materialized); final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized);
/** /**
* Create a new {@link TimeWindowedCogroupedKStream} instance that can be used to perform windowed * Create a new {@link TimeWindowedCogroupedKStream} instance that can be used to perform windowed
@ -296,7 +299,7 @@ public interface CogroupedKStream<K, VAgg> {
* *
* @return an instance of {@link TimeWindowedCogroupedKStream} * @return an instance of {@link TimeWindowedCogroupedKStream}
*/ */
<W extends Window> TimeWindowedCogroupedKStream<K, VAgg> windowedBy(final Windows<W> windows); <W extends Window> TimeWindowedCogroupedKStream<K, VOut> windowedBy(final Windows<W> windows);
/** /**
* Create a new {@link TimeWindowedCogroupedKStream} instance that can be used to perform sliding * Create a new {@link TimeWindowedCogroupedKStream} instance that can be used to perform sliding
@ -307,7 +310,7 @@ public interface CogroupedKStream<K, VAgg> {
* *
* @return an instance of {@link TimeWindowedCogroupedKStream} * @return an instance of {@link TimeWindowedCogroupedKStream}
*/ */
TimeWindowedCogroupedKStream<K, VAgg> windowedBy(final SlidingWindows windows); TimeWindowedCogroupedKStream<K, VOut> windowedBy(final SlidingWindows windows);
/** /**
* Create a new {@link SessionWindowedCogroupedKStream} instance that can be used to perform session * Create a new {@link SessionWindowedCogroupedKStream} instance that can be used to perform session
@ -318,6 +321,6 @@ public interface CogroupedKStream<K, VAgg> {
* *
* @return an instance of {@link SessionWindowedCogroupedKStream} * @return an instance of {@link SessionWindowedCogroupedKStream}
*/ */
SessionWindowedCogroupedKStream<K, VAgg> windowedBy(final SessionWindows windows); SessionWindowedCogroupedKStream<K, VOut> windowedBy(final SessionWindows windows);
} }

View File

@ -22,24 +22,25 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.TimestampedKeyValueStore;
/** /**
* {@code KGroupedStream} is an abstraction of a <i>grouped</i> record stream of {@link KeyValue} pairs. * {@code KGroupedStream} is an abstraction of a <em>grouped</em> record stream of {@link Record key-value} pairs.
* It is an intermediate representation of a {@link KStream} in order to apply an aggregation operation on the original * It is an intermediate representation of a {@link KStream} in order to apply a (windowed) aggregation operation
* {@link KStream} records. * on the original {@link KStream} records.
* <p> *
* It is an intermediate representation after a grouping of a {@link KStream} before an aggregation is applied to the * <p>A {@code KGroupedStream} can be either {@link #cogroup(Aggregator) co-grouped} with other
* new partitions resulting in a {@link KTable}. * {@link CogroupedKStream#cogroup(KGroupedStream, Aggregator) grouped record streams}, windowed by applying
* <p> * {@code windowedBy(...)} operation, or can be aggregated into a {@link KTable}.
* A {@code KGroupedStream} must be obtained from a {@link KStream} via {@link KStream#groupByKey() groupByKey()} or *
* <p>A {@code KGroupedStream} is obtained from a {@link KStream} via {@link KStream#groupByKey() groupByKey()} or
* {@link KStream#groupBy(KeyValueMapper) groupBy(...)}. * {@link KStream#groupBy(KeyValueMapper) groupBy(...)}.
* *
* @param <K> Type of keys * @param <K> the key type of this grouped stream
* @param <V> Type of values * @param <V> the value type of this grouped stream
* @see KStream
*/ */
public interface KGroupedStream<K, V> { public interface KGroupedStream<K, V> {
@ -201,7 +202,7 @@ public interface KGroupedStream<K, V> {
* Combine the values of records in this stream by the grouped key. * Combine the values of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored. * Records with {@code null} key or value are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value * Combining implies that the type of the aggregate result is the same as the type of the input value
* (c.f. {@link #aggregate(Initializer, Aggregator)}). * (cf. {@link #aggregate(Initializer, Aggregator)}).
* <p> * <p>
* The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
* aggregate and the record's value. * aggregate and the record's value.
@ -239,7 +240,7 @@ public interface KGroupedStream<K, V> {
* Combine the value of records in this stream by the grouped key. * Combine the value of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored. * Records with {@code null} key or value are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value * Combining implies that the type of the aggregate result is the same as the type of the input value
* (c.f. {@link #aggregate(Initializer, Aggregator, Materialized)}). * (cf. {@link #aggregate(Initializer, Aggregator, Materialized)}).
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* provided by the given store name in {@code materialized}. * provided by the given store name in {@code materialized}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@ -304,7 +305,7 @@ public interface KGroupedStream<K, V> {
* Combine the value of records in this stream by the grouped key. * Combine the value of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored. * Records with {@code null} key or value are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value * Combining implies that the type of the aggregate result is the same as the type of the input value
* (c.f. {@link #aggregate(Initializer, Aggregator, Materialized)}). * (cf. {@link #aggregate(Initializer, Aggregator, Materialized)}).
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* provided by the given store name in {@code materialized}. * provided by the given store name in {@code materialized}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@ -380,7 +381,7 @@ public interface KGroupedStream<K, V> {
* aggregate (or for the very first record using the intermediate aggregation result provided via the * aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value. * {@link Initializer}) and the record's value.
* Thus, {@code aggregate(Initializer, Aggregator)} can be used to compute aggregate functions like * Thus, {@code aggregate(Initializer, Aggregator)} can be used to compute aggregate functions like
* count (c.f. {@link #count()}). * count (cf. {@link #count()}).
* <p> * <p>
* The default value serde from config will be used for serializing the result. * The default value serde from config will be used for serializing the result.
* If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Materialized)}. * If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Materialized)}.
@ -405,14 +406,14 @@ public interface KGroupedStream<K, V> {
* *
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
* @param aggregator an {@link Aggregator} that computes a new aggregate result * @param aggregator an {@link Aggregator} that computes a new aggregate result
* @param <VR> the value type of the resulting {@link KTable} * @param <VOut> the value type of the resulting {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key. If the aggregate function returns {@code null}, it is then interpreted as * latest (rolling) aggregate for each key. If the aggregate function returns {@code null}, it is then interpreted as
* deletion for the key, and future messages of the same key coming from upstream operators * deletion for the key, and future messages of the same key coming from upstream operators
* will be handled as newly initialized value. * will be handled as newly initialized value.
*/ */
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, <VOut> KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
final Aggregator<? super K, ? super V, VR> aggregator); final Aggregator<? super K, ? super V, VOut> aggregator);
/** /**
* Aggregate the values of records in this stream by the grouped key. * Aggregate the values of records in this stream by the grouped key.
@ -429,7 +430,7 @@ public interface KGroupedStream<K, V> {
* aggregate (or for the very first record using the intermediate aggregation result provided via the * aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value. * {@link Initializer}) and the record's value.
* Thus, {@code aggregate(Initializer, Aggregator, Materialized)} can be used to compute aggregate functions like * Thus, {@code aggregate(Initializer, Aggregator, Materialized)} can be used to compute aggregate functions like
* count (c.f. {@link #count()}). * count (cf. {@link #count()}).
* <p> * <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same key. * the same key.
@ -466,13 +467,13 @@ public interface KGroupedStream<K, V> {
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
* @param aggregator an {@link Aggregator} that computes a new aggregate result * @param aggregator an {@link Aggregator} that computes a new aggregate result
* @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
* @param <VR> the value type of the resulting {@link KTable} * @param <VOut> the value type of the resulting {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key * latest (rolling) aggregate for each key
*/ */
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, <VOut> KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
final Aggregator<? super K, ? super V, VR> aggregator, final Aggregator<? super K, ? super V, VOut> aggregator,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized);
/** /**
* Aggregate the values of records in this stream by the grouped key. * Aggregate the values of records in this stream by the grouped key.
@ -489,7 +490,7 @@ public interface KGroupedStream<K, V> {
* aggregate (or for the very first record using the intermediate aggregation result provided via the * aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value. * {@link Initializer}) and the record's value.
* Thus, {@code aggregate(Initializer, Aggregator, Materialized)} can be used to compute aggregate functions like * Thus, {@code aggregate(Initializer, Aggregator, Materialized)} can be used to compute aggregate functions like
* count (c.f. {@link #count()}). * count (cf. {@link #count()}).
* <p> * <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same key. * the same key.
@ -527,16 +528,16 @@ public interface KGroupedStream<K, V> {
* @param aggregator an {@link Aggregator} that computes a new aggregate result * @param aggregator an {@link Aggregator} that computes a new aggregate result
* @param named a {@link Named} config used to name the processor in the topology * @param named a {@link Named} config used to name the processor in the topology
* @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
* @param <VR> the value type of the resulting {@link KTable} * @param <VOut> the value type of the resulting {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key. If the aggregate function returns {@code null}, it is then interpreted as * latest (rolling) aggregate for each key. If the aggregate function returns {@code null}, it is then interpreted as
* deletion for the key, and future messages of the same key coming from upstream operators * deletion for the key, and future messages of the same key coming from upstream operators
* will be handled as newly initialized value. * will be handled as newly initialized value.
*/ */
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, <VOut> KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
final Aggregator<? super K, ? super V, VR> aggregator, final Aggregator<? super K, ? super V, VOut> aggregator,
final Named named, final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized);
/** /**
* Create a new {@link TimeWindowedKStream} instance that can be used to perform windowed aggregations. * Create a new {@link TimeWindowedKStream} instance that can be used to perform windowed aggregations.

View File

@ -25,18 +25,15 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
/** /**
* {@code KGroupedTable} is an abstraction of a <i>re-grouped changelog stream</i> from a primary-keyed table, * {@code KGroupedTable} is an abstraction of a <em>re-grouped changelog stream</em> from a primary-keyed table,
* usually on a different grouping key than the original primary key. * on a different grouping key than the original primary key.
* <p> * It is an intermediate representation of a {@link KTable} in order to apply an aggregation operation on the original
* It is an intermediate representation after a re-grouping of a {@link KTable} before an aggregation is applied to the * {@link KTable} records.
* new partitions resulting in a new {@link KTable}.
* <p>
* A {@code KGroupedTable} must be obtained from a {@link KTable} via {@link KTable#groupBy(KeyValueMapper)
* groupBy(...)}.
* *
* @param <K> Type of keys * <p>A {@code KGroupedTable} is obtained from a {@link KTable} via {@link KTable#groupBy(KeyValueMapper) groupBy(...)}.
* @param <V> Type of values *
* @see KTable * @param <K> the (new) grouping key type of this re-grouped table
* @param <V> the (new) value type of this re-grouped table
*/ */
public interface KGroupedTable<K, V> { public interface KGroupedTable<K, V> {
@ -192,7 +189,7 @@ public interface KGroupedTable<K, V> {
* mapped} to the same key into a new instance of {@link KTable}. * mapped} to the same key into a new instance of {@link KTable}.
* Records with {@code null} key are ignored. * Records with {@code null} key are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value * Combining implies that the type of the aggregate result is the same as the type of the input value
* (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}). * (cf. {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}).
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* that can be queried using the provided {@code queryableStoreName}. * that can be queried using the provided {@code queryableStoreName}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@ -266,7 +263,7 @@ public interface KGroupedTable<K, V> {
* mapped} to the same key into a new instance of {@link KTable}. * mapped} to the same key into a new instance of {@link KTable}.
* Records with {@code null} key are ignored. * Records with {@code null} key are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value * Combining implies that the type of the aggregate result is the same as the type of the input value
* (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}). * (cf. {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}).
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* that can be queried using the provided {@code queryableStoreName}. * that can be queried using the provided {@code queryableStoreName}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@ -341,7 +338,7 @@ public interface KGroupedTable<K, V> {
* mapped} to the same key into a new instance of {@link KTable}. * mapped} to the same key into a new instance of {@link KTable}.
* Records with {@code null} key are ignored. * Records with {@code null} key are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value * Combining implies that the type of the aggregate result is the same as the type of the input value
* (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator)}). * (cf. {@link #aggregate(Initializer, Aggregator, Aggregator)}).
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p> * <p>

View File

@ -107,23 +107,23 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
} }
@Override @Override
public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, public <VOut> KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
final Aggregator<? super K, ? super V, VR> aggregator, final Aggregator<? super K, ? super V, VOut> aggregator,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) { final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized) {
return aggregate(initializer, aggregator, NamedInternal.empty(), materialized); return aggregate(initializer, aggregator, NamedInternal.empty(), materialized);
} }
@Override @Override
public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, public <VOut> KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
final Aggregator<? super K, ? super V, VR> aggregator, final Aggregator<? super K, ? super V, VOut> aggregator,
final Named named, final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) { final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(initializer, "initializer can't be null"); Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null"); Objects.requireNonNull(aggregator, "aggregator can't be null");
Objects.requireNonNull(materialized, "materialized can't be null"); Objects.requireNonNull(materialized, "materialized can't be null");
Objects.requireNonNull(named, "named can't be null"); Objects.requireNonNull(named, "named can't be null");
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = final MaterializedInternal<K, VOut, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
if (materializedInternal.keySerde() == null) { if (materializedInternal.keySerde() == null) {
@ -131,7 +131,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
} }
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
final KeyValueStoreMaterializer<K, VR> storeFactory = new KeyValueStoreMaterializer<>(materializedInternal); final KeyValueStoreMaterializer<K, VOut> storeFactory = new KeyValueStoreMaterializer<>(materializedInternal);
return doAggregate( return doAggregate(
new KStreamAggregate<>(storeFactory, initializer, aggregator), new KStreamAggregate<>(storeFactory, initializer, aggregator),
@ -141,8 +141,8 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
} }
@Override @Override
public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, public <VOut> KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
final Aggregator<? super K, ? super V, VR> aggregator) { final Aggregator<? super K, ? super V, VOut> aggregator) {
return aggregate(initializer, aggregator, Materialized.with(keySerde, null)); return aggregate(initializer, aggregator, Materialized.with(keySerde, null));
} }