mirror of https://github.com/apache/kafka.git
MINOR: cleanup top level class JavaDocs for main interfaces of Kafka Streams DSL (3/N) (#18883)
Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
parent
d6146644b3
commit
490ba8a8a3
|
@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream;
|
|||
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.StoreQueryParameters;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.Topology;
|
||||
|
@ -26,31 +25,9 @@ import org.apache.kafka.streams.state.ReadOnlyWindowStore;
|
|||
import org.apache.kafka.streams.state.TimestampedWindowStore;
|
||||
import org.apache.kafka.streams.state.WindowStore;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
/**
|
||||
* {@code TimeWindowedCogroupKStream} is an abstraction of a <i>windowed</i> record stream of {@link KeyValue} pairs.
|
||||
* It is an intermediate representation of a {@link CogroupedKStream} in order to apply a windowed aggregation operation
|
||||
* on the original {@link KGroupedStream} records resulting in a windowed {@link KTable} (a <emph>windowed</emph>
|
||||
* {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}).
|
||||
* <p>
|
||||
* The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
|
||||
* {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
|
||||
* <p>
|
||||
* The result is written into a local {@link WindowStore} (which is basically an ever-updating
|
||||
* materialized view) that can be queried using the name provided in the {@link Materialized} instance.
|
||||
* Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
|
||||
* "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
|
||||
* New events are added to windows until their grace period ends (see {@link TimeWindows#ofSizeAndGrace(Duration, Duration)}).
|
||||
* <p>
|
||||
* A {@code TimeWindowedCogroupedKStream} must be obtained from a {@link CogroupedKStream} via
|
||||
* {@link CogroupedKStream#windowedBy(Windows)}.
|
||||
*
|
||||
* @param <K> Type of keys
|
||||
* @param <V> Type of values
|
||||
* @see KStream
|
||||
* @see KGroupedStream
|
||||
* @see CogroupedKStream
|
||||
* Same as a {@link TimeWindowedKStream}, however, for multiple co-grouped {@link KStream KStreams}.
|
||||
*/
|
||||
public interface TimeWindowedCogroupedKStream<K, V> {
|
||||
|
||||
|
|
|
@ -18,10 +18,10 @@ package org.apache.kafka.streams.kstream;
|
|||
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.StoreQueryParameters;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.Topology;
|
||||
import org.apache.kafka.streams.processor.api.Record;
|
||||
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
|
||||
import org.apache.kafka.streams.state.TimestampedWindowStore;
|
||||
import org.apache.kafka.streams.state.WindowStore;
|
||||
|
@ -29,27 +29,35 @@ import org.apache.kafka.streams.state.WindowStore;
|
|||
import java.time.Duration;
|
||||
|
||||
/**
|
||||
* {@code TimeWindowedKStream} is an abstraction of a <i>windowed</i> record stream of {@link KeyValue} pairs.
|
||||
* It is an intermediate representation after a grouping and windowing of a {@link KStream} before an aggregation is
|
||||
* applied to the new (partitioned) windows resulting in a windowed {@link KTable} (a <emph>windowed</emph>
|
||||
* {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}).
|
||||
* <p>
|
||||
* The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
|
||||
* {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
|
||||
* <p>
|
||||
* The result is written into a local {@link WindowStore} (which is basically an ever-updating
|
||||
* {@code TimeWindowedKStream} is an abstraction of a <em>windowed</em> record stream of {@link Record key-value} pairs.
|
||||
* It is an intermediate representation of a {@link KStream}, that is aggregated into a windowed {@link KTable}
|
||||
* (a <em>windowed</em> {@link KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}).
|
||||
*
|
||||
* <p>A {@code TimeWindowedKStream} can represent one of four different windowed types:
|
||||
* <ul>
|
||||
* <li>a {@link TimeWindows tumbling/hopping} window</li>
|
||||
* <li>a {@link UnlimitedWindows landmark} window</li>
|
||||
* <li>a {@link SlidingWindows sliding} window</li>
|
||||
* </ul>
|
||||
*
|
||||
* The result of the aggregation is written into a local {@link WindowStore} (which is basically an ever-updating
|
||||
* materialized view) that can be queried using the name provided in the {@link Materialized} instance.
|
||||
* Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
|
||||
* "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
|
||||
* New events are added to {@link TimeWindows} until their grace period ends (see {@link TimeWindows#ofSizeAndGrace(Duration, Duration)}).
|
||||
* <p>
|
||||
* A {@code TimeWindowedKStream} must be obtained from a {@link KGroupedStream} via
|
||||
* {@link KGroupedStream#windowedBy(Windows)}.
|
||||
* New events are added to {@link TimeWindows}/{@link SlidingWindows} until their grace period ends
|
||||
* (see {@link TimeWindows#ofSizeAndGrace(Duration, Duration)} or
|
||||
* {@link SlidingWindows#ofTimeDifferenceAndGrace(Duration, Duration)};
|
||||
* note that landmark windows don't have a grace period).
|
||||
*
|
||||
* @param <K> Type of keys
|
||||
* @param <V> Type of values
|
||||
* @see KStream
|
||||
* @see KGroupedStream
|
||||
* <p>A {@code TimeWindowedKStream} is obtained from a {@link KStream} by {@link KStream#groupByKey() grouping} and
|
||||
* windowing
|
||||
* (cf. {@link KGroupedStream#windowedBy(Windows) KGroupedStream#windowedBy(Windows) [for tumbling/hopping/landmark]}
|
||||
* or {@link KGroupedStream#windowedBy(SlidingWindows)}).
|
||||
*
|
||||
* @param <K> the key type of this time-windowed stream
|
||||
* @param <V> the value type of this time-windowed stream
|
||||
*
|
||||
* @see SessionWindowedKStream
|
||||
*/
|
||||
public interface TimeWindowedKStream<K, V> {
|
||||
|
||||
|
@ -216,228 +224,11 @@ public interface TimeWindowedKStream<K, V> {
|
|||
KTable<Windowed<K>, Long> count(final Named named,
|
||||
final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized);
|
||||
|
||||
/**
|
||||
* Aggregate the values of records in this stream by the grouped key and defined windows.
|
||||
* Records with {@code null} key or value are ignored.
|
||||
* Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
|
||||
* allows the result to have a different type than the input values.
|
||||
* The result is written into a local {@link WindowStore} (which is basically an ever-updating materialized view).
|
||||
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
|
||||
* <p>
|
||||
* The specified {@link Initializer} is applied directly before the first input record (per key) in each window is
|
||||
* processed to provide an initial intermediate aggregation result that is used to process the first record for
|
||||
* the window (per key).
|
||||
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
|
||||
* aggregate (or for the very first record using the intermediate aggregation result provided via the
|
||||
* {@link Initializer}) and the record's value.
|
||||
* Thus, {@code aggregate()} can be used to compute aggregate functions like count (c.f. {@link #count()}).
|
||||
* <p>
|
||||
* The default key and value serde from the config will be used for serializing the result.
|
||||
* If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Materialized)}.
|
||||
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
|
||||
* the same window and key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
|
||||
* an internal changelog topic that will be created in Kafka.
|
||||
* The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
|
||||
* user-specified in {@link StreamsConfig} via parameter
|
||||
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
|
||||
* and "-changelog" is a fixed suffix.
|
||||
* Note that the internal store name may not be queryable through Interactive Queries.
|
||||
* <p>
|
||||
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
|
||||
*
|
||||
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}.
|
||||
* @param aggregator an {@link Aggregator} that computes a new aggregate result. Cannot be {@code null}.
|
||||
* @param <VR> the value type of the resulting {@link KTable}
|
||||
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
|
||||
* the latest (rolling) aggregate for each key within a window
|
||||
*/
|
||||
<VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
|
||||
final Aggregator<? super K, ? super V, VR> aggregator);
|
||||
|
||||
/**
|
||||
* Aggregate the values of records in this stream by the grouped key and defined windows.
|
||||
* Records with {@code null} key or value are ignored.
|
||||
* Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
|
||||
* allows the result to have a different type than the input values.
|
||||
* The result is written into a local {@link WindowStore} (which is basically an ever-updating materialized view).
|
||||
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
|
||||
* <p>
|
||||
* The specified {@link Initializer} is applied directly before the first input record (per key) in each window is
|
||||
* processed to provide an initial intermediate aggregation result that is used to process the first record for
|
||||
* the window (per key).
|
||||
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
|
||||
* aggregate (or for the very first record using the intermediate aggregation result provided via the
|
||||
* {@link Initializer}) and the record's value.
|
||||
* Thus, {@code aggregate()} can be used to compute aggregate functions like count (c.f. {@link #count()}).
|
||||
* <p>
|
||||
* The default key and value serde from the config will be used for serializing the result.
|
||||
* If a different serde is required then you should use
|
||||
* {@link #aggregate(Initializer, Aggregator, Named, Materialized)}.
|
||||
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
|
||||
* the same window and key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct
|
||||
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
|
||||
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
|
||||
* an internal changelog topic that will be created in Kafka.
|
||||
* The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
|
||||
* user-specified in {@link StreamsConfig} via parameter
|
||||
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
|
||||
* and "-changelog" is a fixed suffix.
|
||||
* Note that the internal store name may not be queryable through Interactive Queries.
|
||||
* <p>
|
||||
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
|
||||
*
|
||||
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}.
|
||||
* @param aggregator an {@link Aggregator} that computes a new aggregate result. Cannot be {@code null}.
|
||||
* @param named a {@link Named} config used to name the processor in the topology. Cannot be {@code null}.
|
||||
* @param <VR> the value type of the resulting {@link KTable}
|
||||
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
|
||||
* the latest (rolling) aggregate for each key within a window
|
||||
*/
|
||||
<VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
|
||||
final Aggregator<? super K, ? super V, VR> aggregator,
|
||||
final Named named);
|
||||
|
||||
/**
|
||||
* Aggregate the values of records in this stream by the grouped key and defined windows.
|
||||
* Records with {@code null} key or value are ignored.
|
||||
* Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
|
||||
* allows the result to have a different type than the input values.
|
||||
* The result is written into a local {@link WindowStore} (which is basically an ever-updating materialized view)
|
||||
* that can be queried using the store name as provided with {@link Materialized}.
|
||||
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
|
||||
* <p>
|
||||
* The specified {@link Initializer} is applied directly before the first input record (per key) in each window is
|
||||
* processed to provide an initial intermediate aggregation result that is used to process the first record for
|
||||
* the window (per key).
|
||||
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
|
||||
* aggregate (or for the very first record using the intermediate aggregation result provided via the
|
||||
* {@link Initializer}) and the record's value.
|
||||
* Thus, {@code aggregate()} can be used to compute aggregate functions like count (c.f. {@link #count()}).
|
||||
* <p>
|
||||
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
|
||||
* the same window and key if caching is enabled on the {@link Materialized} instance.
|
||||
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
|
||||
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
|
||||
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
|
||||
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
|
||||
* <pre>{@code
|
||||
* KafkaStreams streams = ... // counting words
|
||||
* Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
|
||||
* StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedWindowStore());
|
||||
* ReadOnlyWindowStore<K, ValueAndTimestamp<VR>> localWindowStore = streams.store(storeQueryParams);
|
||||
*
|
||||
* K key = "some-word";
|
||||
* long fromTime = ...;
|
||||
* long toTime = ...;
|
||||
* WindowStoreIterator<ValueAndTimestamp<VR>> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore} -- regardless of what
|
||||
* is specified in the parameter {@code materialized}) will be backed by an internal changelog topic that will be created in Kafka.
|
||||
* Therefore, the store name defined by the {@link Materialized} instance must be a valid Kafka topic name and
|
||||
* cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
|
||||
* The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
|
||||
* user-specified in {@link StreamsConfig} via parameter
|
||||
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
|
||||
* name of the store defined in {@link Materialized}, and "-changelog" is a fixed suffix.
|
||||
* <p>
|
||||
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
|
||||
*
|
||||
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}.
|
||||
* @param aggregator an {@link Aggregator} that computes a new aggregate result. Cannot be {@code null}.
|
||||
* @param materialized a {@link Materialized} config used to materialize a state store. Cannot be {@code null}.
|
||||
* @param <VR> the value type of the resulting {@link KTable}
|
||||
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
|
||||
* the latest (rolling) aggregate for each key within a window
|
||||
*/
|
||||
<VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
|
||||
final Aggregator<? super K, ? super V, VR> aggregator,
|
||||
final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized);
|
||||
|
||||
/**
|
||||
* Aggregate the values of records in this stream by the grouped key and defined windows.
|
||||
* Records with {@code null} key or value are ignored.
|
||||
* Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
|
||||
* allows the result to have a different type than the input values.
|
||||
* The result is written into a local {@link WindowStore} (which is basically an ever-updating materialized view)
|
||||
* that can be queried using the store name as provided with {@link Materialized}.
|
||||
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
|
||||
* <p>
|
||||
* The specified {@link Initializer} is applied directly before the first input record (per key) in each window is
|
||||
* processed to provide an initial intermediate aggregation result that is used to process the first record for
|
||||
* the window (per key).
|
||||
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
|
||||
* aggregate (or for the very first record using the intermediate aggregation result provided via the
|
||||
* {@link Initializer}) and the record's value.
|
||||
* Thus, {@code aggregate()} can be used to compute aggregate functions like count (c.f. {@link #count()}).
|
||||
* <p>
|
||||
* Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates
|
||||
* to the same window and key if caching is enabled on the {@link Materialized} instance.
|
||||
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
|
||||
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
|
||||
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
|
||||
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
|
||||
* <pre>{@code
|
||||
* KafkaStreams streams = ... // counting words
|
||||
* Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
|
||||
* StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedWindowStore());
|
||||
* ReadOnlyWindowStore<K, ValueAndTimestamp<VR>> localWindowStore = streams.store(storeQueryParams);
|
||||
*
|
||||
* K key = "some-word";
|
||||
* long fromTime = ...;
|
||||
* long toTime = ...;
|
||||
* WindowStoreIterator<ValueAndTimestamp<VR>> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore} -- regardless of what
|
||||
* is specified in the parameter {@code materialized}) will be backed by an internal changelog topic that will be created in Kafka.
|
||||
* Therefore, the store name defined by the {@link Materialized} instance must be a valid Kafka topic name and
|
||||
* cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
|
||||
* The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
|
||||
* user-specified in {@link StreamsConfig} via parameter
|
||||
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
|
||||
* name of the store defined in {@link Materialized}, and "-changelog" is a fixed suffix.
|
||||
* <p>
|
||||
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
|
||||
*
|
||||
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}.
|
||||
* @param aggregator an {@link Aggregator} that computes a new aggregate result. Cannot be {@code null}.
|
||||
* @param named a {@link Named} config used to name the processor in the topology. Cannot be {@code null}.
|
||||
* @param materialized a {@link Materialized} config used to materialize a state store. Cannot be {@code null}.
|
||||
* @param <VR> the value type of the resulting {@link KTable}
|
||||
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
|
||||
* the latest (rolling) aggregate for each key within a window
|
||||
*/
|
||||
<VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
|
||||
final Aggregator<? super K, ? super V, VR> aggregator,
|
||||
final Named named,
|
||||
final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized);
|
||||
|
||||
/**
|
||||
* Combine the values of records in this stream by the grouped key and defined windows.
|
||||
* Records with {@code null} key or value are ignored.
|
||||
* Combining implies that the type of the aggregate result is the same as the type of the input value
|
||||
* (c.f. {@link #aggregate(Initializer, Aggregator)}).
|
||||
* (cf. {@link #aggregate(Initializer, Aggregator)}).
|
||||
* <p>
|
||||
* The result is written into a local {@link WindowStore} (which is basically an ever-updating materialized view).
|
||||
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
|
||||
|
@ -655,6 +446,223 @@ public interface TimeWindowedKStream<K, V> {
|
|||
final Named named,
|
||||
final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);
|
||||
|
||||
/**
|
||||
* Aggregate the values of records in this stream by the grouped key and defined windows.
|
||||
* Records with {@code null} key or value are ignored.
|
||||
* Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
|
||||
* allows the result to have a different type than the input values.
|
||||
* The result is written into a local {@link WindowStore} (which is basically an ever-updating materialized view).
|
||||
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
|
||||
* <p>
|
||||
* The specified {@link Initializer} is applied directly before the first input record (per key) in each window is
|
||||
* processed to provide an initial intermediate aggregation result that is used to process the first record for
|
||||
* the window (per key).
|
||||
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
|
||||
* aggregate (or for the very first record using the intermediate aggregation result provided via the
|
||||
* {@link Initializer}) and the record's value.
|
||||
* Thus, {@code aggregate()} can be used to compute aggregate functions like count (cf. {@link #count()}).
|
||||
* <p>
|
||||
* The default key and value serde from the config will be used for serializing the result.
|
||||
* If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Materialized)}.
|
||||
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
|
||||
* the same window and key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
|
||||
* an internal changelog topic that will be created in Kafka.
|
||||
* The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
|
||||
* user-specified in {@link StreamsConfig} via parameter
|
||||
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
|
||||
* and "-changelog" is a fixed suffix.
|
||||
* Note that the internal store name may not be queryable through Interactive Queries.
|
||||
* <p>
|
||||
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
|
||||
*
|
||||
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}.
|
||||
* @param aggregator an {@link Aggregator} that computes a new aggregate result. Cannot be {@code null}.
|
||||
* @param <VOut> the value type of the resulting {@link KTable}
|
||||
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
|
||||
* the latest (rolling) aggregate for each key within a window
|
||||
*/
|
||||
<VOut> KTable<Windowed<K>, VOut> aggregate(final Initializer<VOut> initializer,
|
||||
final Aggregator<? super K, ? super V, VOut> aggregator);
|
||||
|
||||
/**
|
||||
* Aggregate the values of records in this stream by the grouped key and defined windows.
|
||||
* Records with {@code null} key or value are ignored.
|
||||
* Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
|
||||
* allows the result to have a different type than the input values.
|
||||
* The result is written into a local {@link WindowStore} (which is basically an ever-updating materialized view).
|
||||
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
|
||||
* <p>
|
||||
* The specified {@link Initializer} is applied directly before the first input record (per key) in each window is
|
||||
* processed to provide an initial intermediate aggregation result that is used to process the first record for
|
||||
* the window (per key).
|
||||
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
|
||||
* aggregate (or for the very first record using the intermediate aggregation result provided via the
|
||||
* {@link Initializer}) and the record's value.
|
||||
* Thus, {@code aggregate()} can be used to compute aggregate functions like count (cf. {@link #count()}).
|
||||
* <p>
|
||||
* The default key and value serde from the config will be used for serializing the result.
|
||||
* If a different serde is required then you should use
|
||||
* {@link #aggregate(Initializer, Aggregator, Named, Materialized)}.
|
||||
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
|
||||
* the same window and key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct
|
||||
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
|
||||
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
|
||||
* an internal changelog topic that will be created in Kafka.
|
||||
* The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
|
||||
* user-specified in {@link StreamsConfig} via parameter
|
||||
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
|
||||
* and "-changelog" is a fixed suffix.
|
||||
* Note that the internal store name may not be queryable through Interactive Queries.
|
||||
* <p>
|
||||
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
|
||||
*
|
||||
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}.
|
||||
* @param aggregator an {@link Aggregator} that computes a new aggregate result. Cannot be {@code null}.
|
||||
* @param named a {@link Named} config used to name the processor in the topology. Cannot be {@code null}.
|
||||
* @param <VOut> the value type of the resulting {@link KTable}
|
||||
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
|
||||
* the latest (rolling) aggregate for each key within a window
|
||||
*/
|
||||
<VOut> KTable<Windowed<K>, VOut> aggregate(final Initializer<VOut> initializer,
|
||||
final Aggregator<? super K, ? super V, VOut> aggregator,
|
||||
final Named named);
|
||||
|
||||
/**
|
||||
* Aggregate the values of records in this stream by the grouped key and defined windows.
|
||||
* Records with {@code null} key or value are ignored.
|
||||
* Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
|
||||
* allows the result to have a different type than the input values.
|
||||
* The result is written into a local {@link WindowStore} (which is basically an ever-updating materialized view)
|
||||
* that can be queried using the store name as provided with {@link Materialized}.
|
||||
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
|
||||
* <p>
|
||||
* The specified {@link Initializer} is applied directly before the first input record (per key) in each window is
|
||||
* processed to provide an initial intermediate aggregation result that is used to process the first record for
|
||||
* the window (per key).
|
||||
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
|
||||
* aggregate (or for the very first record using the intermediate aggregation result provided via the
|
||||
* {@link Initializer}) and the record's value.
|
||||
* Thus, {@code aggregate()} can be used to compute aggregate functions like count (cf. {@link #count()}).
|
||||
* <p>
|
||||
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
|
||||
* the same window and key if caching is enabled on the {@link Materialized} instance.
|
||||
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
|
||||
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
|
||||
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
|
||||
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
|
||||
* <pre>{@code
|
||||
* KafkaStreams streams = ... // counting words
|
||||
* Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
|
||||
* StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedWindowStore());
|
||||
* ReadOnlyWindowStore<K, ValueAndTimestamp<VR>> localWindowStore = streams.store(storeQueryParams);
|
||||
*
|
||||
* K key = "some-word";
|
||||
* long fromTime = ...;
|
||||
* long toTime = ...;
|
||||
* WindowStoreIterator<ValueAndTimestamp<VR>> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore} -- regardless of what
|
||||
* is specified in the parameter {@code materialized}) will be backed by an internal changelog topic that will be created in Kafka.
|
||||
* Therefore, the store name defined by the {@link Materialized} instance must be a valid Kafka topic name and
|
||||
* cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
|
||||
* The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
|
||||
* user-specified in {@link StreamsConfig} via parameter
|
||||
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
|
||||
* name of the store defined in {@link Materialized}, and "-changelog" is a fixed suffix.
|
||||
* <p>
|
||||
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
|
||||
*
|
||||
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}.
|
||||
* @param aggregator an {@link Aggregator} that computes a new aggregate result. Cannot be {@code null}.
|
||||
* @param materialized a {@link Materialized} config used to materialize a state store. Cannot be {@code null}.
|
||||
* @param <VOut> the value type of the resulting {@link KTable}
|
||||
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
|
||||
* the latest (rolling) aggregate for each key within a window
|
||||
*/
|
||||
<VOut> KTable<Windowed<K>, VOut> aggregate(final Initializer<VOut> initializer,
|
||||
final Aggregator<? super K, ? super V, VOut> aggregator,
|
||||
final Materialized<K, VOut, WindowStore<Bytes, byte[]>> materialized);
|
||||
|
||||
/**
|
||||
* Aggregate the values of records in this stream by the grouped key and defined windows.
|
||||
* Records with {@code null} key or value are ignored.
|
||||
* Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
|
||||
* allows the result to have a different type than the input values.
|
||||
* The result is written into a local {@link WindowStore} (which is basically an ever-updating materialized view)
|
||||
* that can be queried using the store name as provided with {@link Materialized}.
|
||||
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
|
||||
* <p>
|
||||
* The specified {@link Initializer} is applied directly before the first input record (per key) in each window is
|
||||
* processed to provide an initial intermediate aggregation result that is used to process the first record for
|
||||
* the window (per key).
|
||||
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
|
||||
* aggregate (or for the very first record using the intermediate aggregation result provided via the
|
||||
* {@link Initializer}) and the record's value.
|
||||
* Thus, {@code aggregate()} can be used to compute aggregate functions like count (cf. {@link #count()}).
|
||||
* <p>
|
||||
* Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates
|
||||
* to the same window and key if caching is enabled on the {@link Materialized} instance.
|
||||
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
|
||||
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
|
||||
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
|
||||
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
|
||||
* <pre>{@code
|
||||
* KafkaStreams streams = ... // counting words
|
||||
* Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
|
||||
* StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedWindowStore());
|
||||
* ReadOnlyWindowStore<K, ValueAndTimestamp<VR>> localWindowStore = streams.store(storeQueryParams);
|
||||
*
|
||||
* K key = "some-word";
|
||||
* long fromTime = ...;
|
||||
* long toTime = ...;
|
||||
* WindowStoreIterator<ValueAndTimestamp<VR>> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore} -- regardless of what
|
||||
* is specified in the parameter {@code materialized}) will be backed by an internal changelog topic that will be created in Kafka.
|
||||
* Therefore, the store name defined by the {@link Materialized} instance must be a valid Kafka topic name and
|
||||
* cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
|
||||
* The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
|
||||
* user-specified in {@link StreamsConfig} via parameter
|
||||
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
|
||||
* name of the store defined in {@link Materialized}, and "-changelog" is a fixed suffix.
|
||||
* <p>
|
||||
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
|
||||
*
|
||||
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}.
|
||||
* @param aggregator an {@link Aggregator} that computes a new aggregate result. Cannot be {@code null}.
|
||||
* @param named a {@link Named} config used to name the processor in the topology. Cannot be {@code null}.
|
||||
* @param materialized a {@link Materialized} config used to materialize a state store. Cannot be {@code null}.
|
||||
* @param <VOut> the value type of the resulting {@link KTable}
|
||||
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
|
||||
* the latest (rolling) aggregate for each key within a window
|
||||
*/
|
||||
<VOut> KTable<Windowed<K>, VOut> aggregate(final Initializer<VOut> initializer,
|
||||
final Aggregator<? super K, ? super V, VOut> aggregator,
|
||||
final Named named,
|
||||
final Materialized<K, VOut, WindowStore<Bytes, byte[]>> materialized);
|
||||
|
||||
/**
|
||||
* Configure when the aggregated result will be emitted for {@code TimeWindowedKStream}.
|
||||
* <p>
|
||||
|
|
|
@ -104,53 +104,6 @@ public class SlidingWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
|
|||
false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
|
||||
final Aggregator<? super K, ? super V, VR> aggregator) {
|
||||
return aggregate(initializer, aggregator, Materialized.with(keySerde, null));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
|
||||
final Aggregator<? super K, ? super V, VR> aggregator,
|
||||
final Named named) {
|
||||
return aggregate(initializer, aggregator, named, Materialized.with(keySerde, null));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
|
||||
final Aggregator<? super K, ? super V, VR> aggregator,
|
||||
final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized) {
|
||||
return aggregate(initializer, aggregator, NamedInternal.empty(), materialized);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
|
||||
final Aggregator<? super K, ? super V, VR> aggregator,
|
||||
final Named named,
|
||||
final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized) {
|
||||
Objects.requireNonNull(initializer, "initializer can't be null");
|
||||
Objects.requireNonNull(aggregator, "aggregator can't be null");
|
||||
Objects.requireNonNull(materialized, "materialized can't be null");
|
||||
final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal =
|
||||
new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
|
||||
if (materializedInternal.keySerde() == null) {
|
||||
materializedInternal.withKeySerde(keySerde);
|
||||
}
|
||||
final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
|
||||
final StoreFactory storeFactory = new SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
|
||||
|
||||
return aggregateBuilder.buildWindowed(
|
||||
new NamedInternal(aggregateName),
|
||||
storeFactory.storeName(),
|
||||
windows.gracePeriodMs(),
|
||||
new KStreamSlidingWindowAggregate<>(windows, storeFactory, emitStrategy, initializer, aggregator),
|
||||
materializedInternal.queryableStoreName(),
|
||||
materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) : null,
|
||||
materializedInternal.valueSerde(),
|
||||
false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
|
||||
return reduce(reducer, NamedInternal.empty());
|
||||
|
@ -192,20 +145,68 @@ public class SlidingWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
|
|||
new NamedInternal(reduceName),
|
||||
storeFactory.storeName(),
|
||||
windows.gracePeriodMs(),
|
||||
new KStreamSlidingWindowAggregate<>(windows, storeFactory, emitStrategy, aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
|
||||
new KStreamSlidingWindowAggregate<>(windows, storeFactory, emitStrategy, aggregateBuilder.reduceInitializer, aggregatorFromReducer(reducer)),
|
||||
materializedInternal.queryableStoreName(),
|
||||
materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) : null,
|
||||
materializedInternal.valueSerde(),
|
||||
false);
|
||||
}
|
||||
|
||||
private Aggregator<K, V, V> aggregatorFromReducer(final Reducer<V> reducer) {
|
||||
return (aggKey, value, aggregate) -> aggregate == null ? value : reducer.apply(aggregate, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <VOut> KTable<Windowed<K>, VOut> aggregate(final Initializer<VOut> initializer,
|
||||
final Aggregator<? super K, ? super V, VOut> aggregator) {
|
||||
return aggregate(initializer, aggregator, Materialized.with(keySerde, null));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <VOut> KTable<Windowed<K>, VOut> aggregate(final Initializer<VOut> initializer,
|
||||
final Aggregator<? super K, ? super V, VOut> aggregator,
|
||||
final Named named) {
|
||||
return aggregate(initializer, aggregator, named, Materialized.with(keySerde, null));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <VOut> KTable<Windowed<K>, VOut> aggregate(final Initializer<VOut> initializer,
|
||||
final Aggregator<? super K, ? super V, VOut> aggregator,
|
||||
final Materialized<K, VOut, WindowStore<Bytes, byte[]>> materialized) {
|
||||
return aggregate(initializer, aggregator, NamedInternal.empty(), materialized);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <VOut> KTable<Windowed<K>, VOut> aggregate(final Initializer<VOut> initializer,
|
||||
final Aggregator<? super K, ? super V, VOut> aggregator,
|
||||
final Named named,
|
||||
final Materialized<K, VOut, WindowStore<Bytes, byte[]>> materialized) {
|
||||
Objects.requireNonNull(initializer, "initializer can't be null");
|
||||
Objects.requireNonNull(aggregator, "aggregator can't be null");
|
||||
Objects.requireNonNull(materialized, "materialized can't be null");
|
||||
final MaterializedInternal<K, VOut, WindowStore<Bytes, byte[]>> materializedInternal =
|
||||
new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
|
||||
if (materializedInternal.keySerde() == null) {
|
||||
materializedInternal.withKeySerde(keySerde);
|
||||
}
|
||||
final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
|
||||
final StoreFactory storeFactory = new SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
|
||||
|
||||
return aggregateBuilder.buildWindowed(
|
||||
new NamedInternal(aggregateName),
|
||||
storeFactory.storeName(),
|
||||
windows.gracePeriodMs(),
|
||||
new KStreamSlidingWindowAggregate<>(windows, storeFactory, emitStrategy, initializer, aggregator),
|
||||
materializedInternal.queryableStoreName(),
|
||||
materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) : null,
|
||||
materializedInternal.valueSerde(),
|
||||
false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeWindowedKStream<K, V> emitStrategy(final EmitStrategy emitStrategy) {
|
||||
this.emitStrategy = emitStrategy;
|
||||
return this;
|
||||
}
|
||||
|
||||
private Aggregator<K, V, V> aggregatorForReducer(final Reducer<V> reducer) {
|
||||
return (aggKey, value, aggregate) -> aggregate == null ? value : reducer.apply(aggregate, value);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -121,60 +121,6 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
|
|||
false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
|
||||
final Aggregator<? super K, ? super V, VR> aggregator) {
|
||||
return aggregate(initializer, aggregator, Materialized.with(keySerde, null));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
|
||||
final Aggregator<? super K, ? super V, VR> aggregator,
|
||||
final Named named) {
|
||||
return aggregate(initializer, aggregator, named, Materialized.with(keySerde, null));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
|
||||
final Aggregator<? super K, ? super V, VR> aggregator,
|
||||
final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized) {
|
||||
return aggregate(initializer, aggregator, NamedInternal.empty(), materialized);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
|
||||
final Aggregator<? super K, ? super V, VR> aggregator,
|
||||
final Named named,
|
||||
final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized) {
|
||||
Objects.requireNonNull(initializer, "initializer can't be null");
|
||||
Objects.requireNonNull(aggregator, "aggregator can't be null");
|
||||
Objects.requireNonNull(materialized, "materialized can't be null");
|
||||
final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal =
|
||||
new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
|
||||
if (materializedInternal.keySerde() == null) {
|
||||
materializedInternal.withKeySerde(keySerde);
|
||||
}
|
||||
|
||||
final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
|
||||
final StoreFactory storeFactory = new WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
|
||||
|
||||
return aggregateBuilder.buildWindowed(
|
||||
new NamedInternal(aggregateName),
|
||||
storeFactory.storeName(),
|
||||
windows.gracePeriodMs(),
|
||||
new KStreamWindowAggregate<>(
|
||||
windows,
|
||||
storeFactory,
|
||||
emitStrategy,
|
||||
initializer,
|
||||
aggregator),
|
||||
materializedInternal.queryableStoreName(),
|
||||
materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
|
||||
materializedInternal.valueSerde(),
|
||||
false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
|
||||
return reduce(reducer, NamedInternal.empty());
|
||||
|
@ -221,7 +167,65 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
|
|||
storeFactory,
|
||||
emitStrategy,
|
||||
aggregateBuilder.reduceInitializer,
|
||||
aggregatorForReducer(reducer)),
|
||||
aggregatorFromReducer(reducer)),
|
||||
materializedInternal.queryableStoreName(),
|
||||
materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
|
||||
materializedInternal.valueSerde(),
|
||||
false);
|
||||
}
|
||||
|
||||
private Aggregator<K, V, V> aggregatorFromReducer(final Reducer<V> reducer) {
|
||||
return (aggKey, value, aggregate) -> aggregate == null ? value : reducer.apply(aggregate, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <VOut> KTable<Windowed<K>, VOut> aggregate(final Initializer<VOut> initializer,
|
||||
final Aggregator<? super K, ? super V, VOut> aggregator) {
|
||||
return aggregate(initializer, aggregator, Materialized.with(keySerde, null));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <VOut> KTable<Windowed<K>, VOut> aggregate(final Initializer<VOut> initializer,
|
||||
final Aggregator<? super K, ? super V, VOut> aggregator,
|
||||
final Named named) {
|
||||
return aggregate(initializer, aggregator, named, Materialized.with(keySerde, null));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public <VOut> KTable<Windowed<K>, VOut> aggregate(final Initializer<VOut> initializer,
|
||||
final Aggregator<? super K, ? super V, VOut> aggregator,
|
||||
final Materialized<K, VOut, WindowStore<Bytes, byte[]>> materialized) {
|
||||
return aggregate(initializer, aggregator, NamedInternal.empty(), materialized);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <VOut> KTable<Windowed<K>, VOut> aggregate(final Initializer<VOut> initializer,
|
||||
final Aggregator<? super K, ? super V, VOut> aggregator,
|
||||
final Named named,
|
||||
final Materialized<K, VOut, WindowStore<Bytes, byte[]>> materialized) {
|
||||
Objects.requireNonNull(initializer, "initializer can't be null");
|
||||
Objects.requireNonNull(aggregator, "aggregator can't be null");
|
||||
Objects.requireNonNull(materialized, "materialized can't be null");
|
||||
final MaterializedInternal<K, VOut, WindowStore<Bytes, byte[]>> materializedInternal =
|
||||
new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
|
||||
if (materializedInternal.keySerde() == null) {
|
||||
materializedInternal.withKeySerde(keySerde);
|
||||
}
|
||||
|
||||
final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
|
||||
final StoreFactory storeFactory = new WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
|
||||
|
||||
return aggregateBuilder.buildWindowed(
|
||||
new NamedInternal(aggregateName),
|
||||
storeFactory.storeName(),
|
||||
windows.gracePeriodMs(),
|
||||
new KStreamWindowAggregate<>(
|
||||
windows,
|
||||
storeFactory,
|
||||
emitStrategy,
|
||||
initializer,
|
||||
aggregator),
|
||||
materializedInternal.queryableStoreName(),
|
||||
materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
|
||||
materializedInternal.valueSerde(),
|
||||
|
@ -238,7 +242,4 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
|
|||
return this;
|
||||
}
|
||||
|
||||
private Aggregator<K, V, V> aggregatorForReducer(final Reducer<V> reducer) {
|
||||
return (aggKey, value, aggregate) -> aggregate == null ? value : reducer.apply(aggregate, value);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue