MINOR: Streams API JavaDoc improvements

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy, Guozhang Wang

Closes #2437 from mjsax/javaDocImprovements7
This commit is contained in:
Matthias J. Sax 2017-01-26 21:50:44 -08:00 committed by Guozhang Wang
parent 89fb02aa81
commit 4277645d25
11 changed files with 404 additions and 392 deletions

View File

@ -18,6 +18,7 @@
package org.apache.kafka.streams.kstream; package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.streams.processor.TimestampExtractor;
import java.util.Map; import java.util.Map;
@ -50,7 +51,7 @@ import java.util.Map;
* This implies, that each input record defines its own window with start and end time being relative to the record's * This implies, that each input record defines its own window with start and end time being relative to the record's
* timestamp. * timestamp.
* <p> * <p>
* For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}. * For time semantics, see {@link TimestampExtractor}.
* *
* @see TimeWindows * @see TimeWindows
* @see UnlimitedWindows * @see UnlimitedWindows
@ -61,7 +62,7 @@ import java.util.Map;
* @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde) * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde)
* @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows) * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows)
* @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows) * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows)
* @see org.apache.kafka.streams.processor.TimestampExtractor * @see TimestampExtractor
*/ */
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class JoinWindows extends Windows<Window> { public class JoinWindows extends Windows<Window> {

View File

@ -22,6 +22,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.StreamsConfig;
/** /**
* {@link KGroupedStream} is an abstraction of a <i>grouped</i> record stream of key-value pairs. * {@link KGroupedStream} is an abstraction of a <i>grouped</i> record stream of key-value pairs.
@ -49,11 +50,11 @@ public interface KGroupedStream<K, V> {
* the same key. * the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the cache size. * parallel running Kafka Streams instances, and the cache size.
* You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter * You can configure the cache size via {@link StreamsConfig} parameter
* {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}. * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
* <p> * <p>
* To query the local {@link KeyValueStore} it must be obtained via * To query the local {@link KeyValueStore} it must be obtained via
* {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // counting words * KafkaStreams streams = ... // counting words
* ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
@ -65,8 +66,8 @@ public interface KGroupedStream<K, V> {
* <p> * <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
* user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter * user-specified in {@link StreamsConfig} via parameter
* {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
* provide {@code storeName}, and "-changelog" is a fixed suffix. * provide {@code storeName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
* *
@ -83,7 +84,7 @@ public interface KGroupedStream<K, V> {
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p> * <p>
* To query the local {@link KeyValueStore} it must be obtained via * To query the local {@link KeyValueStore} it must be obtained via
* {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
* Use {@link StateStoreSupplier#name()} to get the store name: * Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // counting words * KafkaStreams streams = ... // counting words
@ -116,11 +117,11 @@ public interface KGroupedStream<K, V> {
* the same window and key. * the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the cache size. * parallel running Kafka Streams instances, and the cache size.
* You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter * You can configure the cache size via {@link StreamsConfig} parameter
* {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}. * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
* <p> * <p>
* To query the local windowed {@link KeyValueStore} it must be obtained via * To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // counting words * KafkaStreams streams = ... // counting words
* ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore()); * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
@ -134,8 +135,8 @@ public interface KGroupedStream<K, V> {
* <p> * <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
* user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter * user-specified in {@link StreamsConfig StreamsConfig} via parameter
* {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
* provide {@code storeName}, and "-changelog" is a fixed suffix. * provide {@code storeName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
* *
@ -158,7 +159,7 @@ public interface KGroupedStream<K, V> {
* "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID. * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
* <p> * <p>
* To query the local windowed {@link KeyValueStore} it must be obtained via * To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
* Use {@link StateStoreSupplier#name()} to get the store name: * Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // counting words * KafkaStreams streams = ... // counting words
@ -191,7 +192,7 @@ public interface KGroupedStream<K, V> {
* "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID. * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
* <p> * <p>
* To query the local {@link SessionStore} it must be obtained via * To query the local {@link SessionStore} it must be obtained via
* {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
* Use {@link StateStoreSupplier#name()} to get the store name: * Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // counting words * KafkaStreams streams = ... // counting words
@ -221,7 +222,7 @@ public interface KGroupedStream<K, V> {
* "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID. * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
* <p> * <p>
* To query the local {@link SessionStore} it must be obtained via * To query the local {@link SessionStore} it must be obtained via
* {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
* Use {@link StateStoreSupplier#name()} to get the store name: * Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // counting words * KafkaStreams streams = ... // counting words
@ -256,8 +257,8 @@ public interface KGroupedStream<K, V> {
* the same key. * the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the cache size. * parallel running Kafka Streams instances, and the cache size.
* You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter * You can configure the cache size via {@link StreamsConfig} parameter
* {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}. * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
* <p> * <p>
* The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
* aggregate and the record's value. * aggregate and the record's value.
@ -266,7 +267,7 @@ public interface KGroupedStream<K, V> {
* Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max. * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
* <p> * <p>
* To query the local {@link KeyValueStore} it must be obtained via * To query the local {@link KeyValueStore} it must be obtained via
* {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // compute sum * KafkaStreams streams = ... // compute sum
* ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
@ -278,8 +279,8 @@ public interface KGroupedStream<K, V> {
* <p> * <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
* user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter * user-specified in {@link StreamsConfig} via parameter
* {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
* provide {@code storeName}, and "-changelog" is a fixed suffix. * provide {@code storeName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
* *
@ -307,7 +308,7 @@ public interface KGroupedStream<K, V> {
* max. * max.
* <p> * <p>
* To query the local {@link KeyValueStore} it must be obtained via * To query the local {@link KeyValueStore} it must be obtained via
* {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
* Use {@link StateStoreSupplier#name()} to get the store name: * Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // compute sum * KafkaStreams streams = ... // compute sum
@ -343,8 +344,8 @@ public interface KGroupedStream<K, V> {
* the same key. * the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the cache size. * parallel running Kafka Streams instances, and the cache size.
* You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter * You can configure the cache size via {@link StreamsConfig} parameter
* {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}. * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
* <p> * <p>
* The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
* aggregate and the record's value. * aggregate and the record's value.
@ -353,7 +354,7 @@ public interface KGroupedStream<K, V> {
* Thus, {@code reduce(Reducer, Windows, String)} can be used to compute aggregate functions like sum, min, or max. * Thus, {@code reduce(Reducer, Windows, String)} can be used to compute aggregate functions like sum, min, or max.
* <p> * <p>
* To query the local windowed {@link KeyValueStore} it must be obtained via * To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // compute sum * KafkaStreams streams = ... // compute sum
* ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore()); * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
@ -367,8 +368,8 @@ public interface KGroupedStream<K, V> {
* <p> * <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
* user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter * user-specified in {@link StreamsConfig} via parameter
* {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
* provide {@code storeName}, and "-changelog" is a fixed suffix. * provide {@code storeName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
* *
@ -402,7 +403,7 @@ public interface KGroupedStream<K, V> {
* min, or max. * min, or max.
* <p> * <p>
* To query the local windowed {@link KeyValueStore} it must be obtained via * To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
* Use {@link StateStoreSupplier#name()} to get the store name: * Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // compute sum * KafkaStreams streams = ... // compute sum
@ -440,8 +441,8 @@ public interface KGroupedStream<K, V> {
* the same key. * the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the cache size. * parallel running Kafka Streams instances, and the cache size.
* You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter * You can configure the cache size via {@link StreamsConfig} parameter
* {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}. * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
* <p> * <p>
* The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
* aggregate and the record's value. * aggregate and the record's value.
@ -451,7 +452,7 @@ public interface KGroupedStream<K, V> {
* or max. * or max.
* <p> * <p>
* To query the local {@link SessionStore} it must be obtained via * To query the local {@link SessionStore} it must be obtained via
* {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // compute sum * KafkaStreams streams = ... // compute sum
* ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeName, QueryableStoreTypes.<String, Long>sessionStore()); * ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeName, QueryableStoreTypes.<String, Long>sessionStore());
@ -463,8 +464,8 @@ public interface KGroupedStream<K, V> {
* <p> * <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
* user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter * user-specified in {@link StreamsConfig} via parameter
* {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
* provide {@code storeName}, and "-changelog" is a fixed suffix. * provide {@code storeName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
* @param reducer the instance of {@link Reducer} * @param reducer the instance of {@link Reducer}
@ -491,8 +492,8 @@ public interface KGroupedStream<K, V> {
* the same key. * the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the cache size. * parallel running Kafka Streams instances, and the cache size.
* You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter * You can configure the cache size via {@link StreamsConfig} parameter
* {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}. * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
* <p> * <p>
* The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
* aggregate and the record's value. * aggregate and the record's value.
@ -502,7 +503,7 @@ public interface KGroupedStream<K, V> {
* sum, min, or max. * sum, min, or max.
* <p> * <p>
* To query the local {@link SessionStore} it must be obtained via * To query the local {@link SessionStore} it must be obtained via
* {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // compute sum * KafkaStreams streams = ... // compute sum
* ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeName, QueryableStoreTypes.<String, Long>sessionStore()); * ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeName, QueryableStoreTypes.<String, Long>sessionStore());
@ -514,8 +515,8 @@ public interface KGroupedStream<K, V> {
* <p> * <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
* user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter * user-specified in {@link StreamsConfig} via parameter
* {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
* provide {@code storeName}, and "-changelog" is a fixed suffix. * provide {@code storeName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
* @param reducer the instance of {@link Reducer} * @param reducer the instance of {@link Reducer}
@ -543,8 +544,8 @@ public interface KGroupedStream<K, V> {
* the same key. * the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the cache size. * parallel running Kafka Streams instances, and the cache size.
* You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter * You can configure the cache size via {@link StreamsConfig} parameter
* {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}. * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
* <p> * <p>
* The specified {@link Initializer} is applied once directly before the first input record is processed to * The specified {@link Initializer} is applied once directly before the first input record is processed to
* provide an initial intermediate aggregation result that is used to process the first record. * provide an initial intermediate aggregation result that is used to process the first record.
@ -555,7 +556,7 @@ public interface KGroupedStream<K, V> {
* count (c.f. {@link #count(String)}) * count (c.f. {@link #count(String)})
* <p> * <p>
* To query the local {@link KeyValueStore} it must be obtained via * To query the local {@link KeyValueStore} it must be obtained via
* {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // some aggregation on value type double * KafkaStreams streams = ... // some aggregation on value type double
* ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
@ -567,8 +568,8 @@ public interface KGroupedStream<K, V> {
* <p> * <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
* user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter * user-specified in {@link StreamsConfig} via parameter
* {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
* provide {@code storeName}, and "-changelog" is a fixed suffix. * provide {@code storeName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
* *
@ -603,7 +604,7 @@ public interface KGroupedStream<K, V> {
* like count (c.f. {@link #count(String)}) * like count (c.f. {@link #count(String)})
* <p> * <p>
* To query the local {@link KeyValueStore} it must be obtained via * To query the local {@link KeyValueStore} it must be obtained via
* {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
* Use {@link StateStoreSupplier#name()} to get the store name: * Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // some aggregation on value type double * KafkaStreams streams = ... // some aggregation on value type double
@ -642,8 +643,8 @@ public interface KGroupedStream<K, V> {
* the same key. * the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the cache size. * parallel running Kafka Streams instances, and the cache size.
* You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter * You can configure the cache size via {@link StreamsConfig} parameter
* {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}. * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
* <p> * <p>
* The specified {@link Initializer} is applied once per window directly before the first input record is * The specified {@link Initializer} is applied once per window directly before the first input record is
* processed to provide an initial intermediate aggregation result that is used to process the first record. * processed to provide an initial intermediate aggregation result that is used to process the first record.
@ -654,7 +655,7 @@ public interface KGroupedStream<K, V> {
* functions like count (c.f. {@link #count(String)}) * functions like count (c.f. {@link #count(String)})
* <p> * <p>
* To query the local windowed {@link KeyValueStore} it must be obtained via * To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // some windowed aggregation on value type double * KafkaStreams streams = ... // some windowed aggregation on value type double
* ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore()); * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
@ -668,8 +669,8 @@ public interface KGroupedStream<K, V> {
* <p> * <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
* user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter * user-specified in {@link StreamsConfig} via parameter
* {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
* provide {@code storeName}, and "-changelog" is a fixed suffix. * provide {@code storeName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
* *
@ -711,7 +712,7 @@ public interface KGroupedStream<K, V> {
* functions like count (c.f. {@link #count(String)}) TODO add more examples. * functions like count (c.f. {@link #count(String)}) TODO add more examples.
* <p> * <p>
* To query the local windowed {@link KeyValueStore} it must be obtained via * To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
* Use {@link StateStoreSupplier#name()} to get the store name: * Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // some windowed aggregation on value type double TODO update example * KafkaStreams streams = ... // some windowed aggregation on value type double TODO update example
@ -758,7 +759,7 @@ public interface KGroupedStream<K, V> {
* aggregate functions like count (c.f. {@link #count(String)}) * aggregate functions like count (c.f. {@link #count(String)})
* <p> * <p>
* To query the local {@link SessionStore} it must be obtained via * To query the local {@link SessionStore} it must be obtained via
* {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
* Use {@link StateStoreSupplier#name()} to get the store name: * Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // some windowed aggregation on value type double * KafkaStreams streams = ... // some windowed aggregation on value type double
@ -808,7 +809,7 @@ public interface KGroupedStream<K, V> {
* to compute aggregate functions like count (c.f. {@link #count(String)}) * to compute aggregate functions like count (c.f. {@link #count(String)})
* <p> * <p>
* To query the local {@link SessionStore} it must be obtained via * To query the local {@link SessionStore} it must be obtained via
* {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
* Use {@link StateStoreSupplier#name()} to get the store name: * Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // some windowed aggregation on value type double * KafkaStreams streams = ... // some windowed aggregation on value type double

View File

@ -17,14 +17,14 @@
package org.apache.kafka.streams.kstream; package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.streams.KeyValue;
/** /**
* The {@link KeyValueMapper} interface for mapping a {@link org.apache.kafka.streams.KeyValue key-value pair} to a new * The {@link KeyValueMapper} interface for mapping a {@link KeyValue key-value pair} to a new value of arbitrary type.
* value of arbitrary type. For example, it can be used to * For example, it can be used to
* <ul> * <ul>
* <li>map from an input {@link org.apache.kafka.streams.KeyValue key-value pair} to an output * <li>map from an input {@link KeyValue} pair to an output {@link KeyValue} pair with different key and/or value type
* {@link org.apache.kafka.streams.KeyValue key-value pair} with different key and/or value type (for this case * (for this case output type {@code VR == }{@link KeyValue KeyValue&lt;NewKeyType,NewValueType&gt;})</li>
* output type {@code VR == }{@link org.apache.kafka.streams.KeyValue KeyValue&lt;NewKeyType,NewValueType&gt;})</li>
* <li>map from an input record to a new key (with arbitrary key type as specified by {@code VR})</li> * <li>map from an input record to a new key (with arbitrary key type as specified by {@code VR})</li>
* </ul> * </ul>
* This is a stateless record-by-record operation, i.e, {@link #apply(Object, Object)} is invoked individually for each * This is a stateless record-by-record operation, i.e, {@link #apply(Object, Object)} is invoked individually for each

View File

@ -17,10 +17,10 @@
package org.apache.kafka.streams.kstream; package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.streams.KeyValue;
/** /**
* The {@link Predicate} interface represents a predicate (boolean-valued function) of a * The {@link Predicate} interface represents a predicate (boolean-valued function) of a {@link KeyValue} pair.
* {@link org.apache.kafka.streams.KeyValue key-value pair}.
* This is a stateless record-by-record operation, i.e, {@link #test(Object, Object)} is invoked individually for each * This is a stateless record-by-record operation, i.e, {@link #test(Object, Object)} is invoked individually for each
* record of a stream. * record of a stream.
* *
@ -40,8 +40,7 @@ public interface Predicate<K, V> {
* *
* @param key the key of the record * @param key the key of the record
* @param value the value of the record * @param value the value of the record
* @return {@code true} if the {@link org.apache.kafka.streams.KeyValue key-value pair} satisfies the * @return {@code true} if the {@link KeyValue} pair satisfies the predicate&mdash;{@code false} otherwise
* predicate&mdash;{@code false} otherwise
*/ */
boolean test(final K key, final V value); boolean test(final K key, final V value);
} }

View File

@ -18,13 +18,13 @@
package org.apache.kafka.streams.kstream; package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.streams.KeyValue;
/** /**
* The {@link Reducer} interface for combining two values of the same type into a new value. * The {@link Reducer} interface for combining two values of the same type into a new value.
* In contrast to {@link Aggregator} the result type must be the same as the input type. * In contrast to {@link Aggregator} the result type must be the same as the input type.
* <p> * <p>
* The provided values can be either original values from input {@link org.apache.kafka.streams.KeyValue KeyValue} pair * The provided values can be either original values from input {@link KeyValue} pair records or be a previously
* records or be a previously computed result from {@link Reducer#apply(Object, Object)}. * computed result from {@link Reducer#apply(Object, Object)}.
* <p> * <p>
* {@link Reducer} can be used to implement aggregation functions like sum, min, or max. * {@link Reducer} can be used to implement aggregation functions like sum, min, or max.
* *

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream; package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.streams.processor.TimestampExtractor;
/** /**
* A session based window specification used for aggregating events into sessions. * A session based window specification used for aggregating events into sessions.
@ -53,7 +54,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
* The previous 2 sessions would be merged into a single session with start time 10 and end time 20. * The previous 2 sessions would be merged into a single session with start time 10 and end time 20.
* The aggregate value for this session would be the result of aggregating all 4 values. * The aggregate value for this session would be the result of aggregating all 4 values.
* <p> * <p>
* For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}. * For time semantics, see {@link TimestampExtractor}.
* *
* @see TimeWindows * @see TimeWindows
* @see UnlimitedWindows * @see UnlimitedWindows

View File

@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.TimestampExtractor;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -41,7 +42,7 @@ import java.util.Map;
* For example, hopping windows with size of 5000ms and advance of 3000ms, have window boundaries * For example, hopping windows with size of 5000ms and advance of 3000ms, have window boundaries
* [0;5000),[3000;8000),... and not [1000;6000),[4000;9000),... or even something "random" like [1452;6452),[4452;9452),... * [0;5000),[3000;8000),... and not [1000;6000),[4000;9000),... or even something "random" like [1452;6452),[4452;9452),...
* <p> * <p>
* For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}. * For time semantics, see {@link TimestampExtractor}.
* *
* @see SessionWindows * @see SessionWindows
* @see UnlimitedWindows * @see UnlimitedWindows
@ -52,7 +53,7 @@ import java.util.Map;
* @see KGroupedStream#reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier) * @see KGroupedStream#reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
* @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String) * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String)
* @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier) * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
* @see org.apache.kafka.streams.processor.TimestampExtractor * @see TimestampExtractor
*/ */
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class TimeWindows extends Windows<TimeWindow> { public class TimeWindows extends Windows<TimeWindow> {

View File

@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.streams.kstream.internals.UnlimitedWindow; import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
import org.apache.kafka.streams.processor.TimestampExtractor;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -30,7 +31,7 @@ import java.util.Map;
* It has a fixed starting point while its window end is defined as infinite. * It has a fixed starting point while its window end is defined as infinite.
* With this regard, it is a fixed-size window with infinite window size. * With this regard, it is a fixed-size window with infinite window size.
* <p> * <p>
* For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}. * For time semantics, see {@link TimestampExtractor}.
* *
* @see TimeWindows * @see TimeWindows
* @see SessionWindows * @see SessionWindows
@ -41,7 +42,7 @@ import java.util.Map;
* @see KGroupedStream#reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier) * @see KGroupedStream#reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
* @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String) * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String)
* @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier) * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
* @see org.apache.kafka.streams.processor.TimestampExtractor * @see TimestampExtractor
*/ */
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class UnlimitedWindows extends Windows<UnlimitedWindow> { public class UnlimitedWindows extends Windows<UnlimitedWindow> {

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream; package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.streams.processor.TimestampExtractor;
/** /**
* A single window instance, defined by its start and end timestamp. * A single window instance, defined by its start and end timestamp.
@ -24,13 +25,13 @@ import org.apache.kafka.common.annotation.InterfaceStability;
* window implementations. * window implementations.
* <p> * <p>
* To specify how {@link Window} boundaries are defined use {@link Windows}. * To specify how {@link Window} boundaries are defined use {@link Windows}.
* For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}. * For time semantics, see {@link TimestampExtractor}.
* *
* @see Windows * @see Windows
* @see org.apache.kafka.streams.kstream.internals.TimeWindow * @see org.apache.kafka.streams.kstream.internals.TimeWindow
* @see org.apache.kafka.streams.kstream.internals.SessionWindow * @see org.apache.kafka.streams.kstream.internals.SessionWindow
* @see org.apache.kafka.streams.kstream.internals.UnlimitedWindow * @see org.apache.kafka.streams.kstream.internals.UnlimitedWindow
* @see org.apache.kafka.streams.processor.TimestampExtractor * @see TimestampExtractor
*/ */
@InterfaceStability.Unstable @InterfaceStability.Unstable
public abstract class Window { public abstract class Window {

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream; package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.streams.processor.TimestampExtractor;
import java.util.Map; import java.util.Map;
@ -25,14 +26,14 @@ import java.util.Map;
* maintain duration. * maintain duration.
* <p> * <p>
* If not explicitly specified, the default maintain duration is 1 day. * If not explicitly specified, the default maintain duration is 1 day.
* For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}. * For time semantics, see {@link TimestampExtractor}.
* *
* @param <W> type of the window instance * @param <W> type of the window instance
* @see TimeWindows * @see TimeWindows
* @see UnlimitedWindows * @see UnlimitedWindows
* @see JoinWindows * @see JoinWindows
* @see SessionWindows * @see SessionWindows
* @see org.apache.kafka.streams.processor.TimestampExtractor * @see TimestampExtractor
*/ */
@InterfaceStability.Unstable @InterfaceStability.Unstable
public abstract class Windows<W extends Window> { public abstract class Windows<W extends Window> {