KAFKA-12562: Fix KafkaStreams#store old references in comments (#13774)

Following method was deprecated in 2.5 and was removed in 3.0.0.

// KafkaStreams.java
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType);

However, many comments reference the removed method which can be confusing in generated JavaDocs. The code in java doc comments has been changed to reflect the new method, store(final StoreQueryParameters<T> storeQueryParameters).

Also, minor changes to variable names in java doc to be context specific.

Reviewer: Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
Milind Mantri 2023-06-01 12:46:00 +05:30 committed by GitHub
parent 560ab2cc31
commit 4b46bb4904
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 86 additions and 47 deletions

View File

@ -243,7 +243,8 @@ public class StreamsBuilder {
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}: * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... * KafkaStreams streams = ...
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams);
* K key = "some-key"; * K key = "some-key";
* ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>
@ -432,7 +433,8 @@ public class StreamsBuilder {
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}: * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... * KafkaStreams streams = ...
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams);
* K key = "some-key"; * K key = "some-key";
* ValueAndTimestamp<V> valueForKey = localStore.get(key); * ValueAndTimestamp<V> valueForKey = localStore.get(key);
* }</pre> * }</pre>
@ -476,7 +478,8 @@ public class StreamsBuilder {
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}: * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... * KafkaStreams streams = ...
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams);
* K key = "some-key"; * K key = "some-key";
* ValueAndTimestamp<V> valueForKey = localStore.get(key); * ValueAndTimestamp<V> valueForKey = localStore.get(key);
* }</pre> * }</pre>

View File

@ -86,7 +86,8 @@ public interface CogroupedKStream<K, VOut> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // some aggregation on value type double * KafkaStreams streams = ... // some aggregation on value type double
* String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VOut>> timestampedKeyValueStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>> localStore = streams.store(storeQueryParams);
* K key = "some-key"; * K key = "some-key";
* ValueAndTimestamp<VOut> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * ValueAndTimestamp<VOut> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>
@ -136,7 +137,8 @@ public interface CogroupedKStream<K, VOut> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // some aggregation on value type double * KafkaStreams streams = ... // some aggregation on value type double
* String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VOut>> timestampedKeyValueStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>> localStore = streams.store(storeQueryParams);
* K key = "some-key"; * K key = "some-key";
* ValueAndTimestamp<VOut> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * ValueAndTimestamp<VOut> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>
@ -187,7 +189,8 @@ public interface CogroupedKStream<K, VOut> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // some aggregation on value type double * KafkaStreams streams = ... // some aggregation on value type double
* String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VOut>> timestampedKeyValueStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>> localStore = streams.store(storeQueryParams);
* K key = "some-key"; * K key = "some-key";
* ValueAndTimestamp<VOut> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * ValueAndTimestamp<VOut> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>
@ -240,7 +243,8 @@ public interface CogroupedKStream<K, VOut> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // some aggregation on value type double * KafkaStreams streams = ... // some aggregation on value type double
* String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VOut>> timestampedKeyValueStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>> localStore = streams.store(storeQueryParams);
* K key = "some-key"; * K key = "some-key";
* ValueAndTimestamp<VOut> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * ValueAndTimestamp<VOut> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>

View File

@ -120,7 +120,8 @@ public interface KGroupedStream<K, V> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // counting words * KafkaStreams streams = ... // counting words
* String queryableStoreName = "storeName"; // the store name should be the name of the store as defined by the Materialized instance * String queryableStoreName = "storeName"; // the store name should be the name of the store as defined by the Materialized instance
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<Long>>timestampedKeyValueStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore = streams.store(storeQueryParams);
* K key = "some-word"; * K key = "some-word";
* ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>
@ -166,7 +167,8 @@ public interface KGroupedStream<K, V> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // counting words * KafkaStreams streams = ... // counting words
* String queryableStoreName = "storeName"; // the store name should be the name of the store as defined by the Materialized instance * String queryableStoreName = "storeName"; // the store name should be the name of the store as defined by the Materialized instance
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<Long>>timestampedKeyValueStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore = streams.store(storeQueryParams);
* K key = "some-word"; * K key = "some-word";
* ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>
@ -270,7 +272,8 @@ public interface KGroupedStream<K, V> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // compute sum * KafkaStreams streams = ... // compute sum
* String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams);
* K key = "some-key"; * K key = "some-key";
* ValueAndTimestamp<V> reduceForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * ValueAndTimestamp<V> reduceForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>
@ -334,7 +337,8 @@ public interface KGroupedStream<K, V> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // compute sum * KafkaStreams streams = ... // compute sum
* String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams);
* K key = "some-key"; * K key = "some-key";
* ValueAndTimestamp<V> reduceForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * ValueAndTimestamp<V> reduceForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>
@ -439,7 +443,8 @@ public interface KGroupedStream<K, V> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // some aggregation on value type double * KafkaStreams streams = ... // some aggregation on value type double
* String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VR>>timestampedKeyValueStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore = streams.store(storeQueryParams);
* K key = "some-key"; * K key = "some-key";
* ValueAndTimestamp<VR> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * ValueAndTimestamp<VR> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>
@ -498,7 +503,8 @@ public interface KGroupedStream<K, V> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // some aggregation on value type double * KafkaStreams streams = ... // some aggregation on value type double
* String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, ValueAndTimestamp<VR>>timestampedKeyValueStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore = streams.store(storeQueryParams);
* K key = "some-key"; * K key = "some-key";
* ValueAndTimestamp<VR> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * ValueAndTimestamp<VR> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>

View File

@ -59,7 +59,8 @@ public interface KGroupedTable<K, V> {
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}: * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // counting words * KafkaStreams streams = ... // counting words
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<Long>> timestampedKeyValueStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore = streams.store(storeQueryParams);
* K key = "some-word"; * K key = "some-word";
* ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>
@ -102,7 +103,8 @@ public interface KGroupedTable<K, V> {
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}: * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // counting words * KafkaStreams streams = ... // counting words
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<Long>> timestampedKeyValueStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore = streams.store(storeQueryParams);
* K key = "some-word"; * K key = "some-word";
* ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>
@ -230,7 +232,8 @@ public interface KGroupedTable<K, V> {
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}: * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // counting words * KafkaStreams streams = ... // counting words
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>> timestampedKeyValueStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams);
* K key = "some-word"; * K key = "some-word";
* ValueAndTimestamp<V> reduceForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * ValueAndTimestamp<V> reduceForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>
@ -303,7 +306,8 @@ public interface KGroupedTable<K, V> {
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}: * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // counting words * KafkaStreams streams = ... // counting words
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>> timestampedKeyValueStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams);
* K key = "some-word"; * K key = "some-word";
* ValueAndTimestamp<V> reduceForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * ValueAndTimestamp<V> reduceForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>
@ -441,7 +445,8 @@ public interface KGroupedTable<K, V> {
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}: * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // counting words * KafkaStreams streams = ... // counting words
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VR>> timestampedKeyValueStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore = streams.store(storeQueryParams);
* K key = "some-word"; * K key = "some-word";
* ValueAndTimestamp<VR> aggregateForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * ValueAndTimestamp<VR> aggregateForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>
@ -525,7 +530,8 @@ public interface KGroupedTable<K, V> {
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}: * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // counting words * KafkaStreams streams = ... // counting words
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VR>> timestampedKeyValueStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore = streams.store(storeQueryParams);
* K key = "some-word"; * K key = "some-word";
* ValueAndTimestamp<VR> aggregateForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * ValueAndTimestamp<VR> aggregateForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>

View File

@ -54,7 +54,8 @@ import java.util.function.Function;
* streams.start() * streams.start()
* ... * ...
* final String queryableStoreName = table.queryableStoreName(); // returns null if KTable is not queryable * final String queryableStoreName = table.queryableStoreName(); // returns null if KTable is not queryable
* ReadOnlyKeyValueStore view = streams.store(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore()); * final StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> view = streams.store(storeQueryParams);
* view.get(key); * view.get(key);
*}</pre> *}</pre>
*<p> *<p>
@ -135,7 +136,8 @@ public interface KTable<K, V> {
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}: * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // filtering words * KafkaStreams streams = ... // filtering words
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams);
* K key = "some-word"; * K key = "some-word";
* ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>
@ -174,7 +176,8 @@ public interface KTable<K, V> {
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}: * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // filtering words * KafkaStreams streams = ... // filtering words
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams);
* K key = "some-word"; * K key = "some-word";
* ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>
@ -260,7 +263,8 @@ public interface KTable<K, V> {
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}: * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // filtering words * KafkaStreams streams = ... // filtering words
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams);
* K key = "some-word"; * K key = "some-word";
* ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>
@ -298,7 +302,8 @@ public interface KTable<K, V> {
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}: * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // filtering words * KafkaStreams streams = ... // filtering words
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams);
* K key = "some-word"; * K key = "some-word";
* ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>

View File

@ -174,7 +174,8 @@ public interface SessionWindowedCogroupedKStream<K, V> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // counting words * KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
* ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>sessionStore()); * StoreQueryParameters<ReadOnlySessionStore<String, Long>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.sessionStore());
* ReadOnlySessionStore<String,Long> localWindowStore = streams.store(storeQueryParams);
* *
* String key = "some-word"; * String key = "some-word";
* long fromTime = ...; * long fromTime = ...;
@ -234,7 +235,8 @@ public interface SessionWindowedCogroupedKStream<K, V> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // some windowed aggregation on value type double * KafkaStreams streams = ... // some windowed aggregation on value type double
* Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance * Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
* ReadOnlySessionStore<String, Long> sessionStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>sessionStore()); * StoreQueryParameters<ReadOnlySessionStore<String, Long>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.sessionStore());
* ReadOnlySessionStore<String,Long> localWindowStore = streams.store(storeQueryParams);
* String key = "some-key"; * String key = "some-key";
* KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances) * KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>

View File

@ -134,9 +134,10 @@ public interface SessionWindowedKStream<K, V> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // compute sum * KafkaStreams streams = ... // compute sum
* Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance * Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
* ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>); * StoreQueryParameters<ReadOnlySessionStore<String, Long>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.sessionStore());
* ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeQueryParams);
* String key = "some-key"; * String key = "some-key";
* KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances) * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = sessionStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application. * query the value of the key on a parallel running instance of your Kafka Streams application.
@ -180,9 +181,10 @@ public interface SessionWindowedKStream<K, V> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // compute sum * KafkaStreams streams = ... // compute sum
* Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance * Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
* ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>); * StoreQueryParameters<ReadOnlySessionStore<String, Long>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.sessionStore());
* ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeQueryParams);
* String key = "some-key"; * String key = "some-key";
* KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances) * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = sessionStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application. * query the value of the key on a parallel running instance of your Kafka Streams application.
@ -338,9 +340,10 @@ public interface SessionWindowedKStream<K, V> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // some windowed aggregation on value type double * KafkaStreams streams = ... // some windowed aggregation on value type double
* Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance * Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
* ReadOnlySessionStore<String, Long> sessionStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>sessionStore()); * StoreQueryParameters<ReadOnlySessionStore<String, Long>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.sessionStore());
* ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeQueryParams);
* String key = "some-key"; * String key = "some-key";
* KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances) * KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = sessionStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application. * query the value of the key on a parallel running instance of your Kafka Streams application.
@ -399,9 +402,10 @@ public interface SessionWindowedKStream<K, V> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // some windowed aggregation on value type double * KafkaStreams streams = ... // some windowed aggregation on value type double
* Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance * Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
* ReadOnlySessionStore<String, Long> sessionStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>sessionStore()); * StoreQueryParameters<ReadOnlySessionStore<String, Long>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.sessionStore());
* ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeQueryParams);
* String key = "some-key"; * String key = "some-key";
* KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances) * KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = sessionStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application. * query the value of the key on a parallel running instance of your Kafka Streams application.
@ -557,9 +561,10 @@ public interface SessionWindowedKStream<K, V> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // compute sum * KafkaStreams streams = ... // compute sum
* Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance * Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
* ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>); * StoreQueryParameters<ReadOnlySessionStore<String, Long>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.sessionStore());
* ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeQueryParams);
* String key = "some-key"; * String key = "some-key";
* KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances) * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = sessionStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application. * query the value of the key on a parallel running instance of your Kafka Streams application.
@ -617,9 +622,10 @@ public interface SessionWindowedKStream<K, V> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // compute sum * KafkaStreams streams = ... // compute sum
* Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance * Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
* ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>); * StoreQueryParameters<ReadOnlySessionStore<String, Long>> storeQueryParams = StoreQueryParameters.fromNameAndType(QueryableStoreTypes.sessionStore());
* ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeQueryParams);
* String key = "some-key"; * String key = "some-key";
* KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances) * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = sessionStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre> * }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application. * query the value of the key on a parallel running instance of your Kafka Streams application.

View File

@ -164,8 +164,8 @@ public interface TimeWindowedCogroupedKStream<K, V> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // counting words * KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
* ReadOnlyWindowStore<K, ValueAndTimestamp<V>> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedWindowStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedWindowStore());
* * ReadOnlyWindowStore<K, ValueAndTimestamp<VR>> localWindowStore = streams.store(storeQueryParams);
* K key = "some-word"; * K key = "some-word";
* long fromTime = ...; * long fromTime = ...;
* long toTime = ...; * long toTime = ...;
@ -221,7 +221,8 @@ public interface TimeWindowedCogroupedKStream<K, V> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // counting words * KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
* ReadOnlyWindowStore<K, ValueAndTimestamp<V>> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedWindowStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedWindowStore());
* ReadOnlyWindowStore<K, ValueAndTimestamp<VR>> localWindowStore = streams.store(storeQueryParams);
* *
* K key = "some-word"; * K key = "some-word";
* long fromTime = ...; * long fromTime = ...;

View File

@ -134,7 +134,8 @@ public interface TimeWindowedKStream<K, V> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // counting words * KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
* ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<Long>>timestampedWindowStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedWindowStore());
* ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> localWindowStore = streams.store(storeQueryParams);
* *
* K key = "some-word"; * K key = "some-word";
* long fromTime = ...; * long fromTime = ...;
@ -183,7 +184,8 @@ public interface TimeWindowedKStream<K, V> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // counting words * KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
* ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<Long>>timestampedWindowStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedWindowStore());
* ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> localWindowStore = streams.store(storeQueryParams);
* *
* K key = "some-word"; * K key = "some-word";
* long fromTime = ...; * long fromTime = ...;
@ -334,7 +336,8 @@ public interface TimeWindowedKStream<K, V> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // counting words * KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
* ReadOnlyWindowStore<K, ValueAndTimestamp<VR>> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VR>>timestampedWindowStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedWindowStore());
* ReadOnlyWindowStore<K, ValueAndTimestamp<VR>> localWindowStore = streams.store(storeQueryParams);
* *
* K key = "some-word"; * K key = "some-word";
* long fromTime = ...; * long fromTime = ...;
@ -395,7 +398,8 @@ public interface TimeWindowedKStream<K, V> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // counting words * KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
* ReadOnlyWindowStore<K, ValueAndTimestamp<VR>> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VR>>timestampedWindowStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedWindowStore());
* ReadOnlyWindowStore<K, ValueAndTimestamp<VR>> localWindowStore = streams.store(storeQueryParams);
* *
* K key = "some-word"; * K key = "some-word";
* long fromTime = ...; * long fromTime = ...;
@ -555,7 +559,8 @@ public interface TimeWindowedKStream<K, V> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // counting words * KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
* ReadOnlyWindowStore<K, ValueAndTimestamp<V>> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedWindowStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedWindowStore());
* ReadOnlyWindowStore<K, ValueAndTimestamp<V>> localWindowStore = streams.store(storeQueryParams);
* *
* K key = "some-word"; * K key = "some-word";
* long fromTime = ...; * long fromTime = ...;
@ -618,7 +623,8 @@ public interface TimeWindowedKStream<K, V> {
* <pre>{@code * <pre>{@code
* KafkaStreams streams = ... // counting words * KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
* ReadOnlyWindowStore<K, ValueAndTimestamp<V>> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedWindowStore()); * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedWindowStore());
* ReadOnlyWindowStore<K, ValueAndTimestamp<V>> localWindowStore = streams.store(storeQueryParams);
* *
* K key = "some-word"; * K key = "some-word";
* long fromTime = ...; * long fromTime = ...;