KAFKA-6354: Update KStream JavaDoc using new State Store API (#4456)

Updates KStream JavaDoc and web page documentations using new State Store API

Author: Yu Liu <yu.liu003@gmail.com>

Reviewers: Guozhang Wang <guozhang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Yu 2018-02-02 20:24:20 +01:00 committed by Matthias J. Sax
parent 8b60c8b141
commit f6bbec4af2
4 changed files with 24 additions and 32 deletions

View File

@ -209,22 +209,18 @@
</ul>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Creating a persistent key-value store:</span>
<span class="c1">// here, we create a `KeyValueStore&lt;String, Long&gt;` named &quot;persistent-counts&quot;.</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.processor.StateStoreSupplier</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.StoreBuilder</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.Stores</span><span class="o">;</span>
<span class="c1">// Note: The `Stores` factory returns a supplier for the state store,</span>
<span class="c1">// because that&#39;s what you typically need to pass as API parameter.</span>
<span class="n">StateStoreSupplier</span> <span class="n">countStoreSupplier</span> <span class="o">=</span>
<span class="n">Stores</span><span class="o">.</span><span class="na">create</span><span class="o">(</span><span class="s">&quot;persistent-counts&quot;</span><span class="o">)</span>
<span class="o">.</span><span class="na">withKeys</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">())</span>
<span class="o">.</span><span class="na">withValues</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">())</span>
<span class="o">.</span><span class="na">persistent</span><span class="o">()</span>
<span class="o">.</span><span class="na">build</span><span class="o">();</span>
<span class="c1">// Using a `KeyValueStoreBuilder` to build a `KeyValueStore`.</span>
<span class="n">StoreBuilder</span><span class="o">&lt;</span><span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">countStoreSupplier</span> <span class="o">=</span>
<span class="n">Stores</span><span class="o">.</span><span class="na">keyValueStoreBuilder</span><span class="o">(</span>
<span class="n">Stores</span><span class="o">.</span><span class="na">persistentKeyValueStore</span><span class="o">(</span><span class="s">&quot;persistent-counts&quot;</span><span class="o">),</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">());</span>
<span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">countStore</span> <span class="o">=</span> <span class="n">countStoreSupplier</span><span class="o">.</span><span class="na">build</span><span class="o">();</span>
</pre></div>
</div>
<p class="last">See
<a class="reference external" href="../javadocs/org/apache/kafka/streams/state/Stores.PersistentKeyValueFactory.html">PersistentKeyValueFactory</a> for
detailed factory options.</p>
</td>
</tr>
<tr class="row-odd"><td>In-memory
@ -242,22 +238,18 @@
</ul>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Creating an in-memory key-value store:</span>
<span class="c1">// here, we create a `KeyValueStore&lt;String, Long&gt;` named &quot;inmemory-counts&quot;.</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.processor.StateStoreSupplier</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.StoreBuilder</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.Stores</span><span class="o">;</span>
<span class="c1">// Note: The `Stores` factory returns a supplier for the state store,</span>
<span class="c1">// because that&#39;s what you typically need to pass as API parameter.</span>
<span class="n">StateStoreSupplier</span> <span class="n">countStoreSupplier</span> <span class="o">=</span>
<span class="n">Stores</span><span class="o">.</span><span class="na">create</span><span class="o">(</span><span class="s">&quot;inmemory-counts&quot;</span><span class="o">)</span>
<span class="o">.</span><span class="na">withKeys</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">())</span>
<span class="o">.</span><span class="na">withValues</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">())</span>
<span class="o">.</span><span class="na">inMemory</span><span class="o">()</span>
<span class="o">.</span><span class="na">build</span><span class="o">();</span>
<span class="c1">// Using a `KeyValueStoreBuilder` to build a `KeyValueStore`.</span>
<span class="n">StoreBuilder</span><span class="o">&lt;</span><span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">countStoreSupplier</span> <span class="o">=</span>
<span class="n">Stores</span><span class="o">.</span><span class="na">keyValueStoreBuilder</span><span class="o">(</span>
<span class="n">Stores</span><span class="o">.</span><span class="na">inMemoryKeyValueStore</span><span class="o">(</span><span class="s">&quot;inmemory-counts&quot;</span><span class="o">),</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">());</span>
<span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">countStore</span> <span class="o">=</span> <span class="n">countStoreSupplier</span><span class="o">.</span><span class="na">build</span><span class="o">();</span>
</pre></div>
</div>
<p class="last">See
<a class="reference external" href="../javadocs/org/apache/kafka/streams/state/Stores.InMemoryKeyValueFactory.html">InMemoryKeyValueFactory</a> for
detailed factory options.</p>
</td>
</tr>
</tbody>
@ -332,8 +324,8 @@
The primary interface to implement for the store is
<code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.StateStore</span></code>. Kafka Streams also has a few extended interfaces such
as <code class="docutils literal"><span class="pre">KeyValueStore</span></code>.</p>
<p>You also need to provide a &#8220;factory&#8221; for the store by implementing the
<code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.StateStoreSupplier</span></code> interface, which Kafka Streams uses to create instances of
<p>You also need to provide a &#8220;builder&#8221; for the store by implementing the
<code class="docutils literal"><span class="pre">org.apache.kafka.streams.state.StoreBuilder</span></code> interface, which Kafka Streams uses to create instances of
your store.</p>
</div>
</div>

View File

@ -131,7 +131,7 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
* Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
* Use {@link org.apache.kafka.streams.processor.StateStore#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // counting words
* String queryableStoreName = storeSupplier.name();
@ -154,7 +154,7 @@ public interface KGroupedStream<K, V> {
* Count the number of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* provided by the given {@code storeSupplier}.
* provided by the given {@code materialized}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
@ -576,7 +576,7 @@ public interface KGroupedStream<K, V> {
* Combining implies that the type of the aggregate result is the same as the type of the input value
* (c.f. {@link #aggregate(Initializer, Aggregator, Materialized)}).
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* provided by the given {@code storeSupplier}.
* provided by the given {@code materialized}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
@ -592,7 +592,7 @@ public interface KGroupedStream<K, V> {
* <p>
* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
* value as-is.
* Thus, {@code reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier)} can be used to compute aggregate functions like sum, min, or
* Thus, {@code reduce(Reducer, Materialized)} can be used to compute aggregate functions like sum, min, or
* max.
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to

View File

@ -230,7 +230,7 @@ public interface SessionWindowedKStream<K, V> {
* <p>
* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
* value as-is.
* Thus, {@code reduce(Reducer, SessionWindows, StateStoreSupplier)} can be used to compute aggregate functions like
* Thus, {@code reduce(Reducer, Materialized)} can be used to compute aggregate functions like
* sum, min, or max.
* <p>
* Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to

View File

@ -55,7 +55,7 @@ class InnerMeteredKeyValueStore<K, IK, V, IV> extends WrappedStateStore.Abstract
/**
* For a period of time we will have 2 store hierarchies. 1 which is built by a
* {@link org.apache.kafka.streams.processor.StateStoreSupplier} where the outer most store will be of user defined
* {@link org.apache.kafka.streams.state.StoreSupplier} where the outer most store will be of user defined
* type, i.e, &lt;String,Integer&gt;, and another where the outermost store will be of type &lt;Bytes,byte[]&gt;
* This interface is so we don't need to have 2 complete implementations for collecting the metrics, rather
* we just provide an instance of this to do the type conversions from the outer store types to the inner store types.