MINOR: cleanup top level class JavaDocs for main interfaces of Kafka Streams DSL (4/N) (#18884)

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-02-18 16:22:18 -08:00 committed by GitHub
parent 490ba8a8a3
commit 900d81b345
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 280 additions and 305 deletions

View File

@ -18,38 +18,14 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.SessionStore;
import java.time.Duration;
/** /**
* {@code SessionWindowedCogroupKStream} is an abstraction of a <i>windowed</i> record stream of {@link KeyValue} pairs. * Same as a {@link SessionWindowedKStream}, however, for multiple co-grouped {@link KStream KStreams}.
* It is an intermediate representation of a {@link CogroupedKStream} in order to apply a windowed aggregation operation
* on the original {@link KGroupedStream} records resulting in a windowed {@link KTable} (a <emph>windowed</emph>
* {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}).
* <p>
* {@link SessionWindows} are dynamic data driven windows.
* They have no fixed time boundaries, rather the size of the window is determined by the records.
* <p>
* The result is written into a local {@link SessionStore} (which is basically an ever-updating
* materialized view) that can be queried using the name provided in the {@link Materialized} instance.
* Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
* "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
* New events are added to sessions until their grace period ends (see {@link SessionWindows#ofInactivityGapAndGrace(Duration, Duration)}).
* <p>
* A {@code SessionWindowedCogroupedKStream} must be obtained from a {@link CogroupedKStream} via
* {@link CogroupedKStream#windowedBy(SessionWindows)}.
*
* @param <K> Type of keys
* @param <V> Type of values
* @see KStream
* @see KGroupedStream
* @see SessionWindows
* @see CogroupedKStream
*/ */
public interface SessionWindowedCogroupedKStream<K, V> { public interface SessionWindowedCogroupedKStream<K, V> {

View File

@ -18,37 +18,35 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.SessionStore;
import java.time.Duration; import java.time.Duration;
/** /**
* {@code SessionWindowedKStream} is an abstraction of a <i>windowed</i> record stream of {@link KeyValue} pairs. * {@code SessionWindowedKStream} is an abstraction of a <em>windowed</em> record stream of {@link Record key-value} pairs.
* It is an intermediate representation after a grouping and windowing of a {@link KStream} before an aggregation is * It is an intermediate representation of a {@link KStream}, that is aggregated into a windowed {@link KTable}
* applied to the new (partitioned) windows resulting in a windowed {@link KTable} (a <emph>windowed</emph> * (a <em>windowed</em> {@link KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}).
* {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}). *
* <p> * <p>A {@code SessionWindowedKStream} represents a {@link SessionWindows session window} type.
* {@link SessionWindows} are dynamic data driven windows. *
* They have no fixed time boundaries, rather the size of the window is determined by the records. * <p>The result is written into a local {@link SessionStore} (which is basically an ever-updating
* <p>
* The result is written into a local {@link SessionStore} (which is basically an ever-updating
* materialized view) that can be queried using the name provided in the {@link Materialized} instance. * materialized view) that can be queried using the name provided in the {@link Materialized} instance.
* Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
* "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID. * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
* New events are added to sessions until their grace period ends (see {@link SessionWindows#ofInactivityGapAndGrace(Duration, Duration)}). * New events are added to {@link SessionWindows} until their grace period ends
* <p> * (see {@link SessionWindows#ofInactivityGapAndGrace(Duration, Duration)}).
* A {@code SessionWindowedKStream} must be obtained from a {@link KGroupedStream} via
* {@link KGroupedStream#windowedBy(SessionWindows)}.
* *
* @param <K> Type of keys * <p>A {@code SessionWindowedKStream} is obtained from a {@link KStream} by {@link KStream#groupByKey() grouping} and
* @param <V> Type of values * {@link KGroupedStream#windowedBy(SessionWindows) windowing}.
* @see KStream *
* @see KGroupedStream * @param <K> the key type of this session-windowed stream
* @see SessionWindows * @param <V> the value type of this session-windowed stream
*
* @see TimeWindowedKStream
*/ */
public interface SessionWindowedKStream<K, V> { public interface SessionWindowedKStream<K, V> {
@ -209,238 +207,12 @@ public interface SessionWindowedKStream<K, V> {
KTable<Windowed<K>, Long> count(final Named named, KTable<Windowed<K>, Long> count(final Named named,
final Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized); final Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized);
/**
* Aggregate the values of records in this stream by the grouped key and defined sessions.
* Note that sessions are generated on a per-key basis and records with different keys create independent sessions.
* Records with {@code null} key or value are ignored.
* Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
* allows the result to have a different type than the input values.
* The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view).
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* The specified {@link Initializer} is applied directly before the first input record per session is processed to
* provide an initial intermediate aggregation result that is used to process the first record per session.
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* The specified {@link Merger} is used to merge two existing sessions into one, i.e., when the windows overlap,
* they are merged into a single session and the old sessions are discarded.
* Thus, {@code aggregate()} can be used to compute aggregate functions like count (c.f. {@link #count()}).
* <p>
* The default key and value serde from the config will be used for serializing the result.
* If a different serde is required then you should use
* {@link #aggregate(Initializer, Aggregator, Merger, Materialized)}.
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
* Note that the internal store name may not be queryable through Interactive Queries.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}.
* @param aggregator an {@link Aggregator} that computes a new aggregate result. Cannot be {@code null}.
* @param sessionMerger a {@link Merger} that combines two aggregation results. Cannot be {@code null}.
* @param <VR> the value type of the resulting {@link KTable}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key per session
*/
<VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Merger<? super K, VR> sessionMerger);
/**
* Aggregate the values of records in this stream by the grouped key and defined sessions.
* Note that sessions are generated on a per-key basis and records with different keys create independent sessions.
* Records with {@code null} key or value are ignored.
* Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
* allows the result to have a different type than the input values.
* The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view).
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* The specified {@link Initializer} is applied directly before the first input record per session is processed to
* provide an initial intermediate aggregation result that is used to process the first record per session.
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* The specified {@link Merger} is used to merge two existing sessions into one, i.e., when the windows overlap,
* they are merged into a single session and the old sessions are discarded.
* Thus, {@code aggregate()} can be used to compute aggregate functions like count (c.f. {@link #count()}).
* <p>
* The default key and value serde from the config will be used for serializing the result.
* If a different serde is required then you should use
* {@link #aggregate(Initializer, Aggregator, Merger, Named, Materialized)}.
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
* Note that the internal store name may not be queryable through Interactive Queries.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}.
* @param aggregator an {@link Aggregator} that computes a new aggregate result. Cannot be {@code null}.
* @param sessionMerger a {@link Merger} that combines two aggregation results. Cannot be {@code null}.
* @param named a {@link Named} config used to name the processor in the topology. Cannot be {@code null}.
* @param <VR> the value type of the resulting {@link KTable}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key per session
*/
<VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Merger<? super K, VR> sessionMerger,
final Named named);
/**
* Aggregate the values of records in this stream by the grouped key and defined sessions.
* Note that sessions are generated on a per-key basis and records with different keys create independent sessions.
* Records with {@code null} key or value are ignored.
* Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
* allows the result to have a different type than the input values.
* The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view)
* that can be queried using the store name as provided with {@link Materialized}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* The specified {@link Initializer} is applied directly before the first input record per session is processed to
* provide an initial intermediate aggregation result that is used to process the first record per session.
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* The specified {@link Merger} is used to merge two existing sessions into one, i.e., when the windows overlap,
* they are merged into a single session and the old sessions are discarded.
* Thus, {@code aggregate()} can be used to compute aggregate functions like count (c.f. {@link #count()}).
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link SessionStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // some windowed aggregation on value type double
* Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
* StoreQueryParameters<ReadOnlySessionStore<String, Long>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.sessionStore());
* ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeQueryParams);
* String key = "some-key";
* KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = sessionStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* Therefore, the store name defined by the {@link Materialized} instance must be a valid Kafka topic name and
* cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
* The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
* provide store name defined in {@link Materialized}, and "-changelog" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}.
* @param aggregator an {@link Aggregator} that computes a new aggregate result. Cannot be {@code null}.
* @param sessionMerger a {@link Merger} that combines two aggregation results. Cannot be {@code null}.
* @param materialized a {@link Materialized} config used to materialize a state store. Cannot be {@code null}.
* @param <VR> the value type of the resulting {@link KTable}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key per session
*/
<VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Merger<? super K, VR> sessionMerger,
final Materialized<K, VR, SessionStore<Bytes, byte[]>> materialized);
/**
* Aggregate the values of records in this stream by the grouped key and defined sessions.
* Note that sessions are generated on a per-key basis and records with different keys create independent sessions.
* Records with {@code null} key or value are ignored.
* Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
* allows the result to have a different type than the input values.
* The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view)
* that can be queried using the store name as provided with {@link Materialized}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* The specified {@link Initializer} is applied directly before the first input record per session is processed to
* provide an initial intermediate aggregation result that is used to process the first record per session.
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* The specified {@link Merger} is used to merge two existing sessions into one, i.e., when the windows overlap,
* they are merged into a single session and the old sessions are discarded.
* Thus, {@code aggregate()} can be used to compute aggregate functions like count (c.f. {@link #count()}).
* <p>
* Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link SessionStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // some windowed aggregation on value type double
* Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
* StoreQueryParameters<ReadOnlySessionStore<String, Long>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.sessionStore());
* ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeQueryParams);
* String key = "some-key";
* KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = sessionStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* Therefore, the store name defined by the {@link Materialized} instance must be a valid Kafka topic name and
* cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
* The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
* provide store name defined in {@link Materialized}, and "-changelog" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}.
* @param aggregator an {@link Aggregator} that computes a new aggregate result. Cannot be {@code null}.
* @param sessionMerger a {@link Merger} that combines two aggregation results. Cannot be {@code null}.
* @param named a {@link Named} config used to name the processor in the topology. Cannot be {@code null}.
* @param materialized a {@link Materialized} config used to materialize a state store. Cannot be {@code null}.
* @param <VR> the value type of the resulting {@link KTable}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key per session
*/
<VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Merger<? super K, VR> sessionMerger,
final Named named,
final Materialized<K, VR, SessionStore<Bytes, byte[]>> materialized);
/** /**
* Combine the values of records in this stream by the grouped key and defined sessions. * Combine the values of records in this stream by the grouped key and defined sessions.
* Note that sessions are generated on a per-key basis and records with different keys create independent sessions. * Note that sessions are generated on a per-key basis and records with different keys create independent sessions.
* Records with {@code null} key or value are ignored. * Records with {@code null} key or value are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value * Combining implies that the type of the aggregate result is the same as the type of the input value
* (c.f. {@link #aggregate(Initializer, Aggregator, Merger)}). * (cf. {@link #aggregate(Initializer, Aggregator, Merger)}).
* The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view). * The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view).
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* The default key and value serde from the config will be used for serializing the result. * The default key and value serde from the config will be used for serializing the result.
@ -485,7 +257,7 @@ public interface SessionWindowedKStream<K, V> {
* Note that sessions are generated on a per-key basis and records with different keys create independent sessions. * Note that sessions are generated on a per-key basis and records with different keys create independent sessions.
* Records with {@code null} key or value are ignored. * Records with {@code null} key or value are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value * Combining implies that the type of the aggregate result is the same as the type of the input value
* (c.f. {@link #aggregate(Initializer, Aggregator, Merger)}). * (cf. {@link #aggregate(Initializer, Aggregator, Merger)}).
* The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view). * The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view).
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* The default key and value serde from the config will be used for serializing the result. * The default key and value serde from the config will be used for serializing the result.
@ -531,7 +303,7 @@ public interface SessionWindowedKStream<K, V> {
* Note that sessions are generated on a per-key basis and records with different keys create independent sessions. * Note that sessions are generated on a per-key basis and records with different keys create independent sessions.
* Records with {@code null} key or value are ignored. * Records with {@code null} key or value are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value * Combining implies that the type of the aggregate result is the same as the type of the input value
* (c.f. {@link #aggregate(Initializer, Aggregator, Merger)}). * (cf. {@link #aggregate(Initializer, Aggregator, Merger)}).
* The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view) * The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view)
* that can be queried using the store name as provided with {@link Materialized}. * that can be queried using the store name as provided with {@link Materialized}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@ -592,7 +364,7 @@ public interface SessionWindowedKStream<K, V> {
* Note that sessions are generated on a per-key basis and records with different keys create independent sessions. * Note that sessions are generated on a per-key basis and records with different keys create independent sessions.
* Records with {@code null} key or value are ignored. * Records with {@code null} key or value are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value * Combining implies that the type of the aggregate result is the same as the type of the input value
* (c.f. {@link #aggregate(Initializer, Aggregator, Merger)}). * (cf. {@link #aggregate(Initializer, Aggregator, Merger)}).
* The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view) * The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view)
* that can be queried using the store name as provided with {@link Materialized}. * that can be queried using the store name as provided with {@link Materialized}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@ -650,6 +422,232 @@ public interface SessionWindowedKStream<K, V> {
final Named named, final Named named,
final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized); final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized);
/**
* Aggregate the values of records in this stream by the grouped key and defined sessions.
* Note that sessions are generated on a per-key basis and records with different keys create independent sessions.
* Records with {@code null} key or value are ignored.
* Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
* allows the result to have a different type than the input values.
* The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view).
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* The specified {@link Initializer} is applied directly before the first input record per session is processed to
* provide an initial intermediate aggregation result that is used to process the first record per session.
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* The specified {@link Merger} is used to merge two existing sessions into one, i.e., when the windows overlap,
* they are merged into a single session and the old sessions are discarded.
* Thus, {@code aggregate()} can be used to compute aggregate functions like count (cf. {@link #count()}).
* <p>
* The default key and value serde from the config will be used for serializing the result.
* If a different serde is required then you should use
* {@link #aggregate(Initializer, Aggregator, Merger, Materialized)}.
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
* Note that the internal store name may not be queryable through Interactive Queries.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}.
* @param aggregator an {@link Aggregator} that computes a new aggregate result. Cannot be {@code null}.
* @param sessionMerger a {@link Merger} that combines two aggregation results. Cannot be {@code null}.
* @param <VOut> the value type of the resulting {@link KTable}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key per session
*/
<VOut> KTable<Windowed<K>, VOut> aggregate(final Initializer<VOut> initializer,
final Aggregator<? super K, ? super V, VOut> aggregator,
final Merger<? super K, VOut> sessionMerger);
/**
* Aggregate the values of records in this stream by the grouped key and defined sessions.
* Note that sessions are generated on a per-key basis and records with different keys create independent sessions.
* Records with {@code null} key or value are ignored.
* Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
* allows the result to have a different type than the input values.
* The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view).
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* The specified {@link Initializer} is applied directly before the first input record per session is processed to
* provide an initial intermediate aggregation result that is used to process the first record per session.
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* The specified {@link Merger} is used to merge two existing sessions into one, i.e., when the windows overlap,
* they are merged into a single session and the old sessions are discarded.
* Thus, {@code aggregate()} can be used to compute aggregate functions like count (cf. {@link #count()}).
* <p>
* The default key and value serde from the config will be used for serializing the result.
* If a different serde is required then you should use
* {@link #aggregate(Initializer, Aggregator, Merger, Named, Materialized)}.
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
* Note that the internal store name may not be queryable through Interactive Queries.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}.
* @param aggregator an {@link Aggregator} that computes a new aggregate result. Cannot be {@code null}.
* @param sessionMerger a {@link Merger} that combines two aggregation results. Cannot be {@code null}.
* @param named a {@link Named} config used to name the processor in the topology. Cannot be {@code null}.
* @param <VOut> the value type of the resulting {@link KTable}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key per session
*/
<VOut> KTable<Windowed<K>, VOut> aggregate(final Initializer<VOut> initializer,
final Aggregator<? super K, ? super V, VOut> aggregator,
final Merger<? super K, VOut> sessionMerger,
final Named named);
/**
* Aggregate the values of records in this stream by the grouped key and defined sessions.
* Note that sessions are generated on a per-key basis and records with different keys create independent sessions.
* Records with {@code null} key or value are ignored.
* Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
* allows the result to have a different type than the input values.
* The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view)
* that can be queried using the store name as provided with {@link Materialized}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* The specified {@link Initializer} is applied directly before the first input record per session is processed to
* provide an initial intermediate aggregation result that is used to process the first record per session.
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* The specified {@link Merger} is used to merge two existing sessions into one, i.e., when the windows overlap,
* they are merged into a single session and the old sessions are discarded.
* Thus, {@code aggregate()} can be used to compute aggregate functions like count (cf. {@link #count()}).
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link SessionStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // some windowed aggregation on value type double
* Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
* StoreQueryParameters<ReadOnlySessionStore<String, Long>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.sessionStore());
* ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeQueryParams);
* String key = "some-key";
* KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = sessionStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* Therefore, the store name defined by the {@link Materialized} instance must be a valid Kafka topic name and
* cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
* The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
* provide store name defined in {@link Materialized}, and "-changelog" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}.
* @param aggregator an {@link Aggregator} that computes a new aggregate result. Cannot be {@code null}.
* @param sessionMerger a {@link Merger} that combines two aggregation results. Cannot be {@code null}.
* @param materialized a {@link Materialized} config used to materialize a state store. Cannot be {@code null}.
* @param <VOut> the value type of the resulting {@link KTable}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key per session
*/
<VOut> KTable<Windowed<K>, VOut> aggregate(final Initializer<VOut> initializer,
final Aggregator<? super K, ? super V, VOut> aggregator,
final Merger<? super K, VOut> sessionMerger,
final Materialized<K, VOut, SessionStore<Bytes, byte[]>> materialized);
/**
* Aggregate the values of records in this stream by the grouped key and defined sessions.
* Note that sessions are generated on a per-key basis and records with different keys create independent sessions.
* Records with {@code null} key or value are ignored.
* Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
* allows the result to have a different type than the input values.
* The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view)
* that can be queried using the store name as provided with {@link Materialized}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* The specified {@link Initializer} is applied directly before the first input record per session is processed to
* provide an initial intermediate aggregation result that is used to process the first record per session.
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* The specified {@link Merger} is used to merge two existing sessions into one, i.e., when the windows overlap,
* they are merged into a single session and the old sessions are discarded.
* Thus, {@code aggregate()} can be used to compute aggregate functions like count (cf. {@link #count()}).
* <p>
* Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link SessionStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // some windowed aggregation on value type double
* Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
* StoreQueryParameters<ReadOnlySessionStore<String, Long>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.sessionStore());
* ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeQueryParams);
* String key = "some-key";
* KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = sessionStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* Therefore, the store name defined by the {@link Materialized} instance must be a valid Kafka topic name and
* cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
* The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
* provide store name defined in {@link Materialized}, and "-changelog" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}.
* @param aggregator an {@link Aggregator} that computes a new aggregate result. Cannot be {@code null}.
* @param sessionMerger a {@link Merger} that combines two aggregation results. Cannot be {@code null}.
* @param named a {@link Named} config used to name the processor in the topology. Cannot be {@code null}.
* @param materialized a {@link Materialized} config used to materialize a state store. Cannot be {@code null}.
* @param <VOut> the value type of the resulting {@link KTable}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key per session
*/
<VOut> KTable<Windowed<K>, VOut> aggregate(final Initializer<VOut> initializer,
final Aggregator<? super K, ? super V, VOut> aggregator,
final Merger<? super K, VOut> sessionMerger,
final Named named,
final Materialized<K, VOut, SessionStore<Bytes, byte[]>> materialized);
/** /**
* Configure when the aggregated result will be emitted for {@code SessionWindowedKStream}. * Configure when the aggregated result will be emitted for {@code SessionWindowedKStream}.
* <p> * <p>

View File

@ -48,7 +48,7 @@ import static org.apache.kafka.streams.kstream.Windows.NO_GRACE_PERIOD;
* We'd have 2 sessions for key A. * We'd have 2 sessions for key A.
* One starting from time 10 and ending at time 12 and another starting and ending at time 20. * One starting from time 10 and ending at time 12 and another starting and ending at time 20.
* The length of the session is driven by the timestamps of the data within the session. * The length of the session is driven by the timestamps of the data within the session.
* Thus, session windows are no fixed-size windows (c.f. {@link TimeWindows} and {@link JoinWindows}). * Thus, session windows are no fixed-size windows (cf. {@link TimeWindows} and {@link JoinWindows}).
* <p> * <p>
* If we then received another record: * If we then received another record:
* <pre> * <pre>

View File

@ -90,12 +90,6 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
return doCount(named, materialized); return doCount(named, materialized);
} }
@Override
public SessionWindowedKStream<K, V> emitStrategy(final EmitStrategy emitStrategy) {
this.emitStrategy = emitStrategy;
return this;
}
private KTable<Windowed<K>, Long> doCount(final Named named, private KTable<Windowed<K>, Long> doCount(final Named named,
final Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized) { final Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized) {
final MaterializedInternal<K, Long, SessionStore<Bytes, byte[]>> materializedInternal = final MaterializedInternal<K, Long, SessionStore<Bytes, byte[]>> materializedInternal =
@ -152,7 +146,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
Objects.requireNonNull(reducer, "reducer can't be null"); Objects.requireNonNull(reducer, "reducer can't be null");
Objects.requireNonNull(named, "named can't be null"); Objects.requireNonNull(named, "named can't be null");
Objects.requireNonNull(materialized, "materialized can't be null"); Objects.requireNonNull(materialized, "materialized can't be null");
final Aggregator<K, V, V> reduceAggregator = aggregatorForReducer(reducer); final Aggregator<K, V, V> reduceAggregator = aggregatorFromReducer(reducer);
final MaterializedInternal<K, V, SessionStore<Bytes, byte[]>> materializedInternal = final MaterializedInternal<K, V, SessionStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized, builder, REDUCE_NAME); new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
if (materializedInternal.keySerde() == null) { if (materializedInternal.keySerde() == null) {
@ -176,7 +170,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
emitStrategy, emitStrategy,
aggregateBuilder.reduceInitializer, aggregateBuilder.reduceInitializer,
reduceAggregator, reduceAggregator,
mergerForAggregator(reduceAggregator) mergerFromAggregator(reduceAggregator)
), ),
materializedInternal.queryableStoreName(), materializedInternal.queryableStoreName(),
materializedInternal.keySerde() != null ? new WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null, materializedInternal.keySerde() != null ? new WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null,
@ -184,40 +178,48 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
false); false);
} }
private Aggregator<K, V, V> aggregatorFromReducer(final Reducer<V> reducer) {
return (aggKey, value, aggregate) -> aggregate == null ? value : reducer.apply(aggregate, value);
}
private Merger<K, V> mergerFromAggregator(final Aggregator<K, V, V> aggregator) {
return (aggKey, aggOne, aggTwo) -> aggregator.apply(aggKey, aggTwo, aggOne);
}
@Override @Override
public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, public <VOut> KTable<Windowed<K>, VOut> aggregate(final Initializer<VOut> initializer,
final Aggregator<? super K, ? super V, T> aggregator, final Aggregator<? super K, ? super V, VOut> aggregator,
final Merger<? super K, T> sessionMerger) { final Merger<? super K, VOut> sessionMerger) {
return aggregate(initializer, aggregator, sessionMerger, NamedInternal.empty()); return aggregate(initializer, aggregator, sessionMerger, NamedInternal.empty());
} }
@Override @Override
public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, public <VOut> KTable<Windowed<K>, VOut> aggregate(final Initializer<VOut> initializer,
final Aggregator<? super K, ? super V, T> aggregator, final Aggregator<? super K, ? super V, VOut> aggregator,
final Merger<? super K, T> sessionMerger, final Merger<? super K, VOut> sessionMerger,
final Named named) { final Named named) {
return aggregate(initializer, aggregator, sessionMerger, named, Materialized.with(keySerde, null)); return aggregate(initializer, aggregator, sessionMerger, named, Materialized.with(keySerde, null));
} }
@Override @Override
public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, public <VOut> KTable<Windowed<K>, VOut> aggregate(final Initializer<VOut> initializer,
final Aggregator<? super K, ? super V, VR> aggregator, final Aggregator<? super K, ? super V, VOut> aggregator,
final Merger<? super K, VR> sessionMerger, final Merger<? super K, VOut> sessionMerger,
final Materialized<K, VR, SessionStore<Bytes, byte[]>> materialized) { final Materialized<K, VOut, SessionStore<Bytes, byte[]>> materialized) {
return aggregate(initializer, aggregator, sessionMerger, NamedInternal.empty(), materialized); return aggregate(initializer, aggregator, sessionMerger, NamedInternal.empty(), materialized);
} }
@Override @Override
public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, public <VOut> KTable<Windowed<K>, VOut> aggregate(final Initializer<VOut> initializer,
final Aggregator<? super K, ? super V, VR> aggregator, final Aggregator<? super K, ? super V, VOut> aggregator,
final Merger<? super K, VR> sessionMerger, final Merger<? super K, VOut> sessionMerger,
final Named named, final Named named,
final Materialized<K, VR, SessionStore<Bytes, byte[]>> materialized) { final Materialized<K, VOut, SessionStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(initializer, "initializer can't be null"); Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null"); Objects.requireNonNull(aggregator, "aggregator can't be null");
Objects.requireNonNull(sessionMerger, "sessionMerger can't be null"); Objects.requireNonNull(sessionMerger, "sessionMerger can't be null");
Objects.requireNonNull(materialized, "materialized can't be null"); Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materializedInternal = final MaterializedInternal<K, VOut, SessionStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
if (materializedInternal.keySerde() == null) { if (materializedInternal.keySerde() == null) {
@ -245,11 +247,10 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
false); false);
} }
private Merger<K, V> mergerForAggregator(final Aggregator<K, V, V> aggregator) { @Override
return (aggKey, aggOne, aggTwo) -> aggregator.apply(aggKey, aggTwo, aggOne); public SessionWindowedKStream<K, V> emitStrategy(final EmitStrategy emitStrategy) {
this.emitStrategy = emitStrategy;
return this;
} }
private Aggregator<K, V, V> aggregatorForReducer(final Reducer<V> reducer) {
return (aggKey, value, aggregate) -> aggregate == null ? value : reducer.apply(aggregate, value);
}
} }