diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java index dc672e95eb6..498fbf65dbb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java @@ -18,38 +18,14 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.state.SessionStore; -import java.time.Duration; /** - * {@code SessionWindowedCogroupKStream} is an abstraction of a windowed record stream of {@link KeyValue} pairs. - * It is an intermediate representation of a {@link CogroupedKStream} in order to apply a windowed aggregation operation - * on the original {@link KGroupedStream} records resulting in a windowed {@link KTable} (a windowed - * {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed}). - *

- * {@link SessionWindows} are dynamic data driven windows. - * They have no fixed time boundaries, rather the size of the window is determined by the records. - *

- * The result is written into a local {@link SessionStore} (which is basically an ever-updating - * materialized view) that can be queried using the name provided in the {@link Materialized} instance. - * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where - * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID. - * New events are added to sessions until their grace period ends (see {@link SessionWindows#ofInactivityGapAndGrace(Duration, Duration)}). - *

- * A {@code SessionWindowedCogroupedKStream} must be obtained from a {@link CogroupedKStream} via - * {@link CogroupedKStream#windowedBy(SessionWindows)}. - * - * @param Type of keys - * @param Type of values - * @see KStream - * @see KGroupedStream - * @see SessionWindows - * @see CogroupedKStream + * Same as a {@link SessionWindowedKStream}, however, for multiple co-grouped {@link KStream KStreams}. */ public interface SessionWindowedCogroupedKStream { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java index 3dc22e682f1..f18c7c2af54 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java @@ -18,37 +18,35 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.SessionStore; import java.time.Duration; /** - * {@code SessionWindowedKStream} is an abstraction of a windowed record stream of {@link KeyValue} pairs. - * It is an intermediate representation after a grouping and windowing of a {@link KStream} before an aggregation is - * applied to the new (partitioned) windows resulting in a windowed {@link KTable} (a windowed - * {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed}). - *

- * {@link SessionWindows} are dynamic data driven windows. - * They have no fixed time boundaries, rather the size of the window is determined by the records. - *

- * The result is written into a local {@link SessionStore} (which is basically an ever-updating + * {@code SessionWindowedKStream} is an abstraction of a windowed record stream of {@link Record key-value} pairs. + * It is an intermediate representation of a {@link KStream}, that is aggregated into a windowed {@link KTable} + * (a windowed {@link KTable} is a {@link KTable} with key type {@link Windowed Windowed}). + * + *

A {@code SessionWindowedKStream} represents a {@link SessionWindows session window} type. + * + *

The result is written into a local {@link SessionStore} (which is basically an ever-updating * materialized view) that can be queried using the name provided in the {@link Materialized} instance. * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID. - * New events are added to sessions until their grace period ends (see {@link SessionWindows#ofInactivityGapAndGrace(Duration, Duration)}). - *

- * A {@code SessionWindowedKStream} must be obtained from a {@link KGroupedStream} via - * {@link KGroupedStream#windowedBy(SessionWindows)}. + * New events are added to {@link SessionWindows} until their grace period ends + * (see {@link SessionWindows#ofInactivityGapAndGrace(Duration, Duration)}). * - * @param Type of keys - * @param Type of values - * @see KStream - * @see KGroupedStream - * @see SessionWindows + *

A {@code SessionWindowedKStream} is obtained from a {@link KStream} by {@link KStream#groupByKey() grouping} and + * {@link KGroupedStream#windowedBy(SessionWindows) windowing}. + * + * @param the key type of this session-windowed stream + * @param the value type of this session-windowed stream + * + * @see TimeWindowedKStream */ public interface SessionWindowedKStream { @@ -209,238 +207,12 @@ public interface SessionWindowedKStream { KTable, Long> count(final Named named, final Materialized> materialized); - /** - * Aggregate the values of records in this stream by the grouped key and defined sessions. - * Note that sessions are generated on a per-key basis and records with different keys create independent sessions. - * Records with {@code null} key or value are ignored. - * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example, - * allows the result to have a different type than the input values. - * The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view). - * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. - *

- * The specified {@link Initializer} is applied directly before the first input record per session is processed to - * provide an initial intermediate aggregation result that is used to process the first record per session. - * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current - * aggregate (or for the very first record using the intermediate aggregation result provided via the - * {@link Initializer}) and the record's value. - * The specified {@link Merger} is used to merge two existing sessions into one, i.e., when the windows overlap, - * they are merged into a single session and the old sessions are discarded. - * Thus, {@code aggregate()} can be used to compute aggregate functions like count (c.f. {@link #count()}). - *

- * The default key and value serde from the config will be used for serializing the result. - * If a different serde is required then you should use - * {@link #aggregate(Initializer, Aggregator, Merger, Materialized)}. - *

- * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to - * the same window and key. - * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of - * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and - * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. - *

- * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. - * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name - * and "-changelog" is a fixed suffix. - * Note that the internal store name may not be queryable through Interactive Queries. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - * - * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}. - * @param aggregator an {@link Aggregator} that computes a new aggregate result. Cannot be {@code null}. - * @param sessionMerger a {@link Merger} that combines two aggregation results. Cannot be {@code null}. - * @param the value type of the resulting {@link KTable} - * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent - * the latest (rolling) aggregate for each key per session - */ - KTable, VR> aggregate(final Initializer initializer, - final Aggregator aggregator, - final Merger sessionMerger); - - /** - * Aggregate the values of records in this stream by the grouped key and defined sessions. - * Note that sessions are generated on a per-key basis and records with different keys create independent sessions. - * Records with {@code null} key or value are ignored. - * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example, - * allows the result to have a different type than the input values. - * The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view). - * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. - *

- * The specified {@link Initializer} is applied directly before the first input record per session is processed to - * provide an initial intermediate aggregation result that is used to process the first record per session. - * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current - * aggregate (or for the very first record using the intermediate aggregation result provided via the - * {@link Initializer}) and the record's value. - * The specified {@link Merger} is used to merge two existing sessions into one, i.e., when the windows overlap, - * they are merged into a single session and the old sessions are discarded. - * Thus, {@code aggregate()} can be used to compute aggregate functions like count (c.f. {@link #count()}). - *

- * The default key and value serde from the config will be used for serializing the result. - * If a different serde is required then you should use - * {@link #aggregate(Initializer, Aggregator, Merger, Named, Materialized)}. - *

- * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to - * the same window and key. - * The rate of propagated updates depends on your input data rate, the number of distinct - * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} - * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and - * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. - *

- * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. - * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name - * and "-changelog" is a fixed suffix. - * Note that the internal store name may not be queryable through Interactive Queries. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - * - * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}. - * @param aggregator an {@link Aggregator} that computes a new aggregate result. Cannot be {@code null}. - * @param sessionMerger a {@link Merger} that combines two aggregation results. Cannot be {@code null}. - * @param named a {@link Named} config used to name the processor in the topology. Cannot be {@code null}. - * @param the value type of the resulting {@link KTable} - * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent - * the latest (rolling) aggregate for each key per session - */ - KTable, VR> aggregate(final Initializer initializer, - final Aggregator aggregator, - final Merger sessionMerger, - final Named named); - - /** - * Aggregate the values of records in this stream by the grouped key and defined sessions. - * Note that sessions are generated on a per-key basis and records with different keys create independent sessions. - * Records with {@code null} key or value are ignored. - * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example, - * allows the result to have a different type than the input values. - * The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view) - * that can be queried using the store name as provided with {@link Materialized}. - * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. - *

- * The specified {@link Initializer} is applied directly before the first input record per session is processed to - * provide an initial intermediate aggregation result that is used to process the first record per session. - * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current - * aggregate (or for the very first record using the intermediate aggregation result provided via the - * {@link Initializer}) and the record's value. - * The specified {@link Merger} is used to merge two existing sessions into one, i.e., when the windows overlap, - * they are merged into a single session and the old sessions are discarded. - * Thus, {@code aggregate()} can be used to compute aggregate functions like count (c.f. {@link #count()}). - *

- * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to - * the same window and key if caching is enabled on the {@link Materialized} instance. - * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of - * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and - * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. - *

- * To query the local {@link SessionStore} it must be obtained via - * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}: - *

{@code
-     * KafkaStreams streams = ... // some windowed aggregation on value type double
-     * Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
-     * StoreQueryParameters> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.sessionStore());
-     * ReadOnlySessionStore sessionStore = streams.store(storeQueryParams);
-     * String key = "some-key";
-     * KeyValueIterator, Long> aggForKeyForSession = sessionStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to - * query the value of the key on a parallel running instance of your Kafka Streams application. - *

- * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. - * Therefore, the store name defined by the {@link Materialized} instance must be a valid Kafka topic name and - * cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. - * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the - * provide store name defined in {@link Materialized}, and "-changelog" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - * - * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}. - * @param aggregator an {@link Aggregator} that computes a new aggregate result. Cannot be {@code null}. - * @param sessionMerger a {@link Merger} that combines two aggregation results. Cannot be {@code null}. - * @param materialized a {@link Materialized} config used to materialize a state store. Cannot be {@code null}. - * @param the value type of the resulting {@link KTable} - * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent - * the latest (rolling) aggregate for each key per session - */ - KTable, VR> aggregate(final Initializer initializer, - final Aggregator aggregator, - final Merger sessionMerger, - final Materialized> materialized); - - /** - * Aggregate the values of records in this stream by the grouped key and defined sessions. - * Note that sessions are generated on a per-key basis and records with different keys create independent sessions. - * Records with {@code null} key or value are ignored. - * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example, - * allows the result to have a different type than the input values. - * The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view) - * that can be queried using the store name as provided with {@link Materialized}. - * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. - *

- * The specified {@link Initializer} is applied directly before the first input record per session is processed to - * provide an initial intermediate aggregation result that is used to process the first record per session. - * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current - * aggregate (or for the very first record using the intermediate aggregation result provided via the - * {@link Initializer}) and the record's value. - * The specified {@link Merger} is used to merge two existing sessions into one, i.e., when the windows overlap, - * they are merged into a single session and the old sessions are discarded. - * Thus, {@code aggregate()} can be used to compute aggregate functions like count (c.f. {@link #count()}). - *

- * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates - * to the same window and key if caching is enabled on the {@link Materialized} instance. - * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct - * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} - * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and - * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. - *

- * To query the local {@link SessionStore} it must be obtained via - * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}: - *

{@code
-     * KafkaStreams streams = ... // some windowed aggregation on value type double
-     * Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
-     * StoreQueryParameters> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.sessionStore());
-     * ReadOnlySessionStore sessionStore = streams.store(storeQueryParams);
-     * String key = "some-key";
-     * KeyValueIterator, Long> aggForKeyForSession = sessionStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to - * query the value of the key on a parallel running instance of your Kafka Streams application. - *

- * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. - * Therefore, the store name defined by the {@link Materialized} instance must be a valid Kafka topic name and - * cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. - * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the - * provide store name defined in {@link Materialized}, and "-changelog" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - * - * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}. - * @param aggregator an {@link Aggregator} that computes a new aggregate result. Cannot be {@code null}. - * @param sessionMerger a {@link Merger} that combines two aggregation results. Cannot be {@code null}. - * @param named a {@link Named} config used to name the processor in the topology. Cannot be {@code null}. - * @param materialized a {@link Materialized} config used to materialize a state store. Cannot be {@code null}. - * @param the value type of the resulting {@link KTable} - * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent - * the latest (rolling) aggregate for each key per session - */ - KTable, VR> aggregate(final Initializer initializer, - final Aggregator aggregator, - final Merger sessionMerger, - final Named named, - final Materialized> materialized); - /** * Combine the values of records in this stream by the grouped key and defined sessions. * Note that sessions are generated on a per-key basis and records with different keys create independent sessions. * Records with {@code null} key or value are ignored. * Combining implies that the type of the aggregate result is the same as the type of the input value - * (c.f. {@link #aggregate(Initializer, Aggregator, Merger)}). + * (cf. {@link #aggregate(Initializer, Aggregator, Merger)}). * The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view). * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. * The default key and value serde from the config will be used for serializing the result. @@ -485,7 +257,7 @@ public interface SessionWindowedKStream { * Note that sessions are generated on a per-key basis and records with different keys create independent sessions. * Records with {@code null} key or value are ignored. * Combining implies that the type of the aggregate result is the same as the type of the input value - * (c.f. {@link #aggregate(Initializer, Aggregator, Merger)}). + * (cf. {@link #aggregate(Initializer, Aggregator, Merger)}). * The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view). * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. * The default key and value serde from the config will be used for serializing the result. @@ -531,7 +303,7 @@ public interface SessionWindowedKStream { * Note that sessions are generated on a per-key basis and records with different keys create independent sessions. * Records with {@code null} key or value are ignored. * Combining implies that the type of the aggregate result is the same as the type of the input value - * (c.f. {@link #aggregate(Initializer, Aggregator, Merger)}). + * (cf. {@link #aggregate(Initializer, Aggregator, Merger)}). * The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view) * that can be queried using the store name as provided with {@link Materialized}. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. @@ -592,7 +364,7 @@ public interface SessionWindowedKStream { * Note that sessions are generated on a per-key basis and records with different keys create independent sessions. * Records with {@code null} key or value are ignored. * Combining implies that the type of the aggregate result is the same as the type of the input value - * (c.f. {@link #aggregate(Initializer, Aggregator, Merger)}). + * (cf. {@link #aggregate(Initializer, Aggregator, Merger)}). * The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view) * that can be queried using the store name as provided with {@link Materialized}. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. @@ -650,6 +422,232 @@ public interface SessionWindowedKStream { final Named named, final Materialized> materialized); + /** + * Aggregate the values of records in this stream by the grouped key and defined sessions. + * Note that sessions are generated on a per-key basis and records with different keys create independent sessions. + * Records with {@code null} key or value are ignored. + * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example, + * allows the result to have a different type than the input values. + * The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view). + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + *

+ * The specified {@link Initializer} is applied directly before the first input record per session is processed to + * provide an initial intermediate aggregation result that is used to process the first record per session. + * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current + * aggregate (or for the very first record using the intermediate aggregation result provided via the + * {@link Initializer}) and the record's value. + * The specified {@link Merger} is used to merge two existing sessions into one, i.e., when the windows overlap, + * they are merged into a single session and the old sessions are discarded. + * Thus, {@code aggregate()} can be used to compute aggregate functions like count (cf. {@link #count()}). + *

+ * The default key and value serde from the config will be used for serializing the result. + * If a different serde is required then you should use + * {@link #aggregate(Initializer, Aggregator, Merger, Materialized)}. + *

+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same window and key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. + *

+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name + * and "-changelog" is a fixed suffix. + * Note that the internal store name may not be queryable through Interactive Queries. + *

+ * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * + * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}. + * @param aggregator an {@link Aggregator} that computes a new aggregate result. Cannot be {@code null}. + * @param sessionMerger a {@link Merger} that combines two aggregation results. Cannot be {@code null}. + * @param the value type of the resulting {@link KTable} + * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent + * the latest (rolling) aggregate for each key per session + */ + KTable, VOut> aggregate(final Initializer initializer, + final Aggregator aggregator, + final Merger sessionMerger); + + /** + * Aggregate the values of records in this stream by the grouped key and defined sessions. + * Note that sessions are generated on a per-key basis and records with different keys create independent sessions. + * Records with {@code null} key or value are ignored. + * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example, + * allows the result to have a different type than the input values. + * The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view). + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + *

+ * The specified {@link Initializer} is applied directly before the first input record per session is processed to + * provide an initial intermediate aggregation result that is used to process the first record per session. + * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current + * aggregate (or for the very first record using the intermediate aggregation result provided via the + * {@link Initializer}) and the record's value. + * The specified {@link Merger} is used to merge two existing sessions into one, i.e., when the windows overlap, + * they are merged into a single session and the old sessions are discarded. + * Thus, {@code aggregate()} can be used to compute aggregate functions like count (cf. {@link #count()}). + *

+ * The default key and value serde from the config will be used for serializing the result. + * If a different serde is required then you should use + * {@link #aggregate(Initializer, Aggregator, Merger, Named, Materialized)}. + *

+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same window and key. + * The rate of propagated updates depends on your input data rate, the number of distinct + * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} + * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. + *

+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name + * and "-changelog" is a fixed suffix. + * Note that the internal store name may not be queryable through Interactive Queries. + *

+ * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * + * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}. + * @param aggregator an {@link Aggregator} that computes a new aggregate result. Cannot be {@code null}. + * @param sessionMerger a {@link Merger} that combines two aggregation results. Cannot be {@code null}. + * @param named a {@link Named} config used to name the processor in the topology. Cannot be {@code null}. + * @param the value type of the resulting {@link KTable} + * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent + * the latest (rolling) aggregate for each key per session + */ + KTable, VOut> aggregate(final Initializer initializer, + final Aggregator aggregator, + final Merger sessionMerger, + final Named named); + + /** + * Aggregate the values of records in this stream by the grouped key and defined sessions. + * Note that sessions are generated on a per-key basis and records with different keys create independent sessions. + * Records with {@code null} key or value are ignored. + * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example, + * allows the result to have a different type than the input values. + * The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view) + * that can be queried using the store name as provided with {@link Materialized}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + *

+ * The specified {@link Initializer} is applied directly before the first input record per session is processed to + * provide an initial intermediate aggregation result that is used to process the first record per session. + * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current + * aggregate (or for the very first record using the intermediate aggregation result provided via the + * {@link Initializer}) and the record's value. + * The specified {@link Merger} is used to merge two existing sessions into one, i.e., when the windows overlap, + * they are merged into a single session and the old sessions are discarded. + * Thus, {@code aggregate()} can be used to compute aggregate functions like count (cf. {@link #count()}). + *

+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same window and key if caching is enabled on the {@link Materialized} instance. + * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. + *

+ * To query the local {@link SessionStore} it must be obtained via + * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}: + *

{@code
+     * KafkaStreams streams = ... // some windowed aggregation on value type double
+     * Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+     * StoreQueryParameters> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.sessionStore());
+     * ReadOnlySessionStore sessionStore = streams.store(storeQueryParams);
+     * String key = "some-key";
+     * KeyValueIterator, Long> aggForKeyForSession = sessionStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + *

+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * Therefore, the store name defined by the {@link Materialized} instance must be a valid Kafka topic name and + * cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. + * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the + * provide store name defined in {@link Materialized}, and "-changelog" is a fixed suffix. + *

+ * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * + * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}. + * @param aggregator an {@link Aggregator} that computes a new aggregate result. Cannot be {@code null}. + * @param sessionMerger a {@link Merger} that combines two aggregation results. Cannot be {@code null}. + * @param materialized a {@link Materialized} config used to materialize a state store. Cannot be {@code null}. + * @param the value type of the resulting {@link KTable} + * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent + * the latest (rolling) aggregate for each key per session + */ + KTable, VOut> aggregate(final Initializer initializer, + final Aggregator aggregator, + final Merger sessionMerger, + final Materialized> materialized); + + /** + * Aggregate the values of records in this stream by the grouped key and defined sessions. + * Note that sessions are generated on a per-key basis and records with different keys create independent sessions. + * Records with {@code null} key or value are ignored. + * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example, + * allows the result to have a different type than the input values. + * The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view) + * that can be queried using the store name as provided with {@link Materialized}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + *

+ * The specified {@link Initializer} is applied directly before the first input record per session is processed to + * provide an initial intermediate aggregation result that is used to process the first record per session. + * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current + * aggregate (or for the very first record using the intermediate aggregation result provided via the + * {@link Initializer}) and the record's value. + * The specified {@link Merger} is used to merge two existing sessions into one, i.e., when the windows overlap, + * they are merged into a single session and the old sessions are discarded. + * Thus, {@code aggregate()} can be used to compute aggregate functions like count (cf. {@link #count()}). + *

+ * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates + * to the same window and key if caching is enabled on the {@link Materialized} instance. + * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct + * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} + * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. + *

+ * To query the local {@link SessionStore} it must be obtained via + * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}: + *

{@code
+     * KafkaStreams streams = ... // some windowed aggregation on value type double
+     * Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+     * StoreQueryParameters> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.sessionStore());
+     * ReadOnlySessionStore sessionStore = streams.store(storeQueryParams);
+     * String key = "some-key";
+     * KeyValueIterator, Long> aggForKeyForSession = sessionStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + *

+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * Therefore, the store name defined by the {@link Materialized} instance must be a valid Kafka topic name and + * cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. + * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the + * provide store name defined in {@link Materialized}, and "-changelog" is a fixed suffix. + *

+ * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * + * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}. + * @param aggregator an {@link Aggregator} that computes a new aggregate result. Cannot be {@code null}. + * @param sessionMerger a {@link Merger} that combines two aggregation results. Cannot be {@code null}. + * @param named a {@link Named} config used to name the processor in the topology. Cannot be {@code null}. + * @param materialized a {@link Materialized} config used to materialize a state store. Cannot be {@code null}. + * @param the value type of the resulting {@link KTable} + * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent + * the latest (rolling) aggregate for each key per session + */ + KTable, VOut> aggregate(final Initializer initializer, + final Aggregator aggregator, + final Merger sessionMerger, + final Named named, + final Materialized> materialized); + /** * Configure when the aggregated result will be emitted for {@code SessionWindowedKStream}. *

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java index c5fabdb72cb..e054446ff27 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java @@ -48,7 +48,7 @@ import static org.apache.kafka.streams.kstream.Windows.NO_GRACE_PERIOD; * We'd have 2 sessions for key A. * One starting from time 10 and ending at time 12 and another starting and ending at time 20. * The length of the session is driven by the timestamps of the data within the session. - * Thus, session windows are no fixed-size windows (c.f. {@link TimeWindows} and {@link JoinWindows}). + * Thus, session windows are no fixed-size windows (cf. {@link TimeWindows} and {@link JoinWindows}). *

* If we then received another record: *

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index 0c0f557b5c9..2d4c512cd57 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -90,12 +90,6 @@ public class SessionWindowedKStreamImpl extends AbstractStream imple
         return doCount(named, materialized);
     }
 
-    @Override
-    public SessionWindowedKStream emitStrategy(final EmitStrategy emitStrategy) {
-        this.emitStrategy = emitStrategy;
-        return this;
-    }
-
     private KTable, Long> doCount(final Named named,
                                               final Materialized> materialized) {
         final MaterializedInternal> materializedInternal =
@@ -152,7 +146,7 @@ public class SessionWindowedKStreamImpl extends AbstractStream imple
         Objects.requireNonNull(reducer, "reducer can't be null");
         Objects.requireNonNull(named, "named can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final Aggregator reduceAggregator = aggregatorForReducer(reducer);
+        final Aggregator reduceAggregator = aggregatorFromReducer(reducer);
         final MaterializedInternal> materializedInternal =
             new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
         if (materializedInternal.keySerde() == null) {
@@ -176,7 +170,7 @@ public class SessionWindowedKStreamImpl extends AbstractStream imple
                 emitStrategy,
                 aggregateBuilder.reduceInitializer,
                 reduceAggregator,
-                mergerForAggregator(reduceAggregator)
+                mergerFromAggregator(reduceAggregator)
             ),
             materializedInternal.queryableStoreName(),
             materializedInternal.keySerde() != null ? new WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null,
@@ -184,40 +178,48 @@ public class SessionWindowedKStreamImpl extends AbstractStream imple
             false);
     }
 
+    private Aggregator aggregatorFromReducer(final Reducer reducer) {
+        return (aggKey, value, aggregate) -> aggregate == null ? value : reducer.apply(aggregate, value);
+    }
+
+    private Merger mergerFromAggregator(final Aggregator aggregator) {
+        return (aggKey, aggOne, aggTwo) -> aggregator.apply(aggKey, aggTwo, aggOne);
+    }
+
     @Override
-    public  KTable, T> aggregate(final Initializer initializer,
-                                                final Aggregator aggregator,
-                                                final Merger sessionMerger) {
+    public  KTable, VOut> aggregate(final Initializer initializer,
+                                                      final Aggregator aggregator,
+                                                      final Merger sessionMerger) {
         return aggregate(initializer, aggregator, sessionMerger, NamedInternal.empty());
     }
 
     @Override
-    public  KTable, T> aggregate(final Initializer initializer,
-                                                final Aggregator aggregator,
-                                                final Merger sessionMerger,
-                                                final Named named) {
+    public  KTable, VOut> aggregate(final Initializer initializer,
+                                                      final Aggregator aggregator,
+                                                      final Merger sessionMerger,
+                                                      final Named named) {
         return aggregate(initializer, aggregator, sessionMerger, named, Materialized.with(keySerde, null));
     }
 
     @Override
-    public  KTable, VR> aggregate(final Initializer initializer,
-                                                  final Aggregator aggregator,
-                                                  final Merger sessionMerger,
-                                                  final Materialized> materialized) {
+    public  KTable, VOut> aggregate(final Initializer initializer,
+                                                      final Aggregator aggregator,
+                                                      final Merger sessionMerger,
+                                                      final Materialized> materialized) {
         return aggregate(initializer, aggregator, sessionMerger, NamedInternal.empty(), materialized);
     }
 
     @Override
-    public  KTable, VR> aggregate(final Initializer initializer,
-                                                  final Aggregator aggregator,
-                                                  final Merger sessionMerger,
-                                                  final Named named,
-                                                  final Materialized> materialized) {
+    public  KTable, VOut> aggregate(final Initializer initializer,
+                                                      final Aggregator aggregator,
+                                                      final Merger sessionMerger,
+                                                      final Named named,
+                                                      final Materialized> materialized) {
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(aggregator, "aggregator can't be null");
         Objects.requireNonNull(sessionMerger, "sessionMerger can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal> materializedInternal =
+        final MaterializedInternal> materializedInternal =
             new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
 
         if (materializedInternal.keySerde() == null) {
@@ -245,11 +247,10 @@ public class SessionWindowedKStreamImpl extends AbstractStream imple
             false);
     }
 
-    private Merger mergerForAggregator(final Aggregator aggregator) {
-        return (aggKey, aggOne, aggTwo) -> aggregator.apply(aggKey, aggTwo, aggOne);
+    @Override
+    public SessionWindowedKStream emitStrategy(final EmitStrategy emitStrategy) {
+        this.emitStrategy = emitStrategy;
+        return this;
     }
 
-    private Aggregator aggregatorForReducer(final Reducer reducer) {
-        return (aggKey, value, aggregate) -> aggregate == null ? value : reducer.apply(aggregate, value);
-    }
 }