MINOR: doc changes for KIP-372 (#5790)

Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>
This commit is contained in:
Bill Bejeck 2018-10-15 22:01:21 -04:00 committed by Matthias J. Sax
parent 763b72c9a5
commit 8182c4fc7d
3 changed files with 28 additions and 15 deletions

View File

@ -551,7 +551,7 @@
<span class="c1">// When the key and/or value types do not match the configured</span> <span class="c1">// When the key and/or value types do not match the configured</span>
<span class="c1">// default serdes, we must explicitly specify serdes.</span> <span class="c1">// default serdes, we must explicitly specify serdes.</span>
<span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">groupedStream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">groupByKey</span><span class="o">(</span> <span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">groupedStream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">groupByKey</span><span class="o">(</span>
<span class="n">Serialized</span><span class="o">.</span><span class="na">with</span><span class="o">(</span> <span class="n">Grouped</span><span class="o">.</span><span class="na">with</span><span class="o">(</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">ByteArray</span><span class="o">(),</span> <span class="cm">/* key */</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">ByteArray</span><span class="o">(),</span> <span class="cm">/* key */</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">())</span> <span class="cm">/* value */</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">())</span> <span class="cm">/* value */</span>
<span class="o">);</span> <span class="o">);</span>
@ -594,7 +594,7 @@
<span class="c1">// Group the stream by a new key and key type</span> <span class="c1">// Group the stream by a new key and key type</span>
<span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">groupedStream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span> <span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">groupedStream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span>
<span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">value</span><span class="o">,</span> <span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">value</span><span class="o">,</span>
<span class="n">Serialized</span><span class="o">.</span><span class="na">with</span><span class="o">(</span> <span class="n">Grouped</span><span class="o">.</span><span class="na">with</span><span class="o">(</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key (note: type was modified) */</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key (note: type was modified) */</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">())</span> <span class="cm">/* value */</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">())</span> <span class="cm">/* value */</span>
<span class="o">);</span> <span class="o">);</span>
@ -602,7 +602,7 @@
<span class="c1">// Group the table by a new key and key type, and also modify the value and value type.</span> <span class="c1">// Group the table by a new key and key type, and also modify the value and value type.</span>
<span class="n">KGroupedTable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">groupedTable</span> <span class="o">=</span> <span class="n">table</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span> <span class="n">KGroupedTable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">groupedTable</span> <span class="o">=</span> <span class="n">table</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span>
<span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">KeyValue</span><span class="o">.</span><span class="na">pair</span><span class="o">(</span><span class="n">value</span><span class="o">,</span> <span class="n">value</span><span class="o">.</span><span class="na">length</span><span class="o">()),</span> <span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">KeyValue</span><span class="o">.</span><span class="na">pair</span><span class="o">(</span><span class="n">value</span><span class="o">,</span> <span class="n">value</span><span class="o">.</span><span class="na">length</span><span class="o">()),</span>
<span class="n">Serialized</span><span class="o">.</span><span class="na">with</span><span class="o">(</span> <span class="n">Grouped</span><span class="o">.</span><span class="na">with</span><span class="o">(</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key (note: type was modified) */</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key (note: type was modified) */</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">Integer</span><span class="o">())</span> <span class="cm">/* value (note: type was modified) */</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">Integer</span><span class="o">())</span> <span class="cm">/* value (note: type was modified) */</span>
<span class="o">);</span> <span class="o">);</span>
@ -618,7 +618,7 @@
<span class="k">return</span> <span class="n">value</span><span class="o">;</span> <span class="k">return</span> <span class="n">value</span><span class="o">;</span>
<span class="o">}</span> <span class="o">}</span>
<span class="o">},</span> <span class="o">},</span>
<span class="n">Serialized</span><span class="o">.</span><span class="na">with</span><span class="o">(</span> <span class="n">Grouped</span><span class="o">.</span><span class="na">with</span><span class="o">(</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key (note: type was modified) */</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key (note: type was modified) */</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">())</span> <span class="cm">/* value */</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">())</span> <span class="cm">/* value */</span>
<span class="o">);</span> <span class="o">);</span>
@ -631,7 +631,7 @@
<span class="k">return</span> <span class="n">KeyValue</span><span class="o">.</span><span class="na">pair</span><span class="o">(</span><span class="n">value</span><span class="o">,</span> <span class="n">value</span><span class="o">.</span><span class="na">length</span><span class="o">());</span> <span class="k">return</span> <span class="n">KeyValue</span><span class="o">.</span><span class="na">pair</span><span class="o">(</span><span class="n">value</span><span class="o">,</span> <span class="n">value</span><span class="o">.</span><span class="na">length</span><span class="o">());</span>
<span class="o">}</span> <span class="o">}</span>
<span class="o">},</span> <span class="o">},</span>
<span class="n">Serialized</span><span class="o">.</span><span class="na">with</span><span class="o">(</span> <span class="n">Grouped</span><span class="o">.</span><span class="na">with</span><span class="o">(</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key (note: type was modified) */</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key (note: type was modified) */</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">Integer</span><span class="o">())</span> <span class="cm">/* value (note: type was modified) */</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">Integer</span><span class="o">())</span> <span class="cm">/* value (note: type was modified) */</span>
<span class="o">);</span> <span class="o">);</span>
@ -1330,7 +1330,7 @@
<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="o">...;</span> <span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="o">...;</span>
<span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">groupedStream</span> <span class="o">=</span> <span class="n">wordCounts</span> <span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">groupedStream</span> <span class="o">=</span> <span class="n">wordCounts</span>
<span class="o">.</span><span class="na">groupByKey</span><span class="o">(</span><span class="n">Serialized</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">Integer</span><span class="o">()));</span> <span class="o">.</span><span class="na">groupByKey</span><span class="o">(</span><span class="n">Grouped</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">Integer</span><span class="o">()));</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">aggregated</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">aggregate</span><span class="o">(</span> <span class="n">KTable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">aggregated</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">aggregate</span><span class="o">(</span>
<span class="o">()</span> <span class="o">-&gt;</span> <span class="mi">0</span><span class="o">,</span> <span class="cm">/* initializer */</span> <span class="o">()</span> <span class="o">-&gt;</span> <span class="mi">0</span><span class="o">,</span> <span class="cm">/* initializer */</span>
@ -3355,18 +3355,18 @@ object WordCountApplication extends App {
} }
} }
</pre> </pre>
<p>In the above code snippet, we don't have to provide any SerDes, <code class="docutils literal"><span class="pre">Serialized</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code> explicitly. They will also not be dependent on any SerDes specified in the config. <strong>In fact all SerDes specified in the config will be ignored by the Scala APIs</strong>. All SerDes and <code class="docutils literal"><span class="pre">Serialized</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code> will be handled through implicit SerDes as discussed later in the <a href="#scala-dsl-implicit-serdes">Implicit SerDes</a> section. The complete independence from configuration based SerDes is what makes this library completely typesafe. Any missing instances of SerDes, <code class="docutils literal"><span class="pre">Serialized</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code> will be flagged as a compile time error.</p> <p>In the above code snippet, we don't have to provide any SerDes, <code class="docutils literal"><span class="pre">Grouped</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code> explicitly. They will also not be dependent on any SerDes specified in the config. <strong>In fact all SerDes specified in the config will be ignored by the Scala APIs</strong>. All SerDes and <code class="docutils literal"><span class="pre">Grouped</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code> will be handled through implicit SerDes as discussed later in the <a href="#scala-dsl-implicit-serdes">Implicit SerDes</a> section. The complete independence from configuration based SerDes is what makes this library completely typesafe. Any missing instances of SerDes, <code class="docutils literal"><span class="pre">Grouped</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code> will be flagged as a compile time error.</p>
</div> </div>
<div class="section" id="scala-dsl-implicit-serdes"> <div class="section" id="scala-dsl-implicit-serdes">
<span id="streams-developer-guide-dsl-scala-dsl-implicit-serdes"></span><h3><a class="toc-backref" href="#id29">Implicit SerDes</a><a class="headerlink" href="#scala-dsl-implicit-serdes" title="Permalink to this headline"></a></h3> <span id="streams-developer-guide-dsl-scala-dsl-implicit-serdes"></span><h3><a class="toc-backref" href="#id29">Implicit SerDes</a><a class="headerlink" href="#scala-dsl-implicit-serdes" title="Permalink to this headline"></a></h3>
<p>One of the common complaints of Scala users with the Java API has been the repetitive usage of the SerDes in API invocations. Many of the APIs need to take the SerDes through abstractions like <code class="docutils literal"><span class="pre">Serialized</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code>. And the user has to supply them every time through the with function of these classes.</p> <p>One of the common complaints of Scala users with the Java API has been the repetitive usage of the SerDes in API invocations. Many of the APIs need to take the SerDes through abstractions like <code class="docutils literal"><span class="pre">Grouped</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code>. And the user has to supply them every time through the with function of these classes.</p>
<p>The library uses the power of <a href="https://docs.scala-lang.org/tour/implicit-parameters.html">Scala implicit parameters</a> to alleviate this concern. As a user you can provide implicit SerDes or implicit values of <code class="docutils literal"><span class="pre">Serialized</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code> once and make your code less verbose. In fact you can just have the implicit SerDes in scope and the library will make the instances of <code class="docutils literal"><span class="pre">Serialized</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code> available in scope.</p> <p>The library uses the power of <a href="https://docs.scala-lang.org/tour/implicit-parameters.html">Scala implicit parameters</a> to alleviate this concern. As a user you can provide implicit SerDes or implicit values of <code class="docutils literal"><span class="pre">Grouped</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code> once and make your code less verbose. In fact you can just have the implicit SerDes in scope and the library will make the instances of <code class="docutils literal"><span class="pre">Grouped</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code> available in scope.</p>
<p>The library also bundles all implicit SerDes of the commonly used primitive types in a Scala module - so just import the module vals and have all SerDes in scope. A similar strategy of modular implicits can be adopted for any user-defined SerDes as well (User-defined SerDes are discussed in the next section).</p> <p>The library also bundles all implicit SerDes of the commonly used primitive types in a Scala module - so just import the module vals and have all SerDes in scope. A similar strategy of modular implicits can be adopted for any user-defined SerDes as well (User-defined SerDes are discussed in the next section).</p>
<p>Here's an example:</p> <p>Here's an example:</p>
<pre class="brush: scala;"> <pre class="brush: scala;">
// DefaultSerdes brings into scope implicit SerDes (mostly for primitives) // DefaultSerdes brings into scope implicit SerDes (mostly for primitives)
// that will set up all Serialized, Produced, Consumed and Joined instances. // that will set up all Grouped, Produced, Consumed and Joined instances.
// So all APIs below that accept Serialized, Produced, Consumed or Joined will // So all APIs below that accept Grouped, Produced, Consumed or Joined will
// get these instances automatically // get these instances automatically
import Serdes._ import Serdes._
@ -3376,7 +3376,7 @@ val userClicksStream: KStream[String, Long] = builder.stream(userClicksTopic)
val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic) val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic)
// The following code fragment does not have a single instance of Serialized, // The following code fragment does not have a single instance of Grouped,
// Produced, Consumed or Joined supplied explicitly. // Produced, Consumed or Joined supplied explicitly.
// All of them are taken care of by the implicit SerDes imported by DefaultSerdes // All of them are taken care of by the implicit SerDes imported by DefaultSerdes
val clicksPerRegion: KTable[String, Long] = val clicksPerRegion: KTable[String, Long] =

View File

@ -136,7 +136,7 @@
<span class="c1">// Define the processing topology (here: WordCount)</span> <span class="c1">// Define the processing topology (here: WordCount)</span>
<span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">groupedByWord</span> <span class="o">=</span> <span class="n">textLines</span> <span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">groupedByWord</span> <span class="o">=</span> <span class="n">textLines</span>
<span class="o">.</span><span class="na">flatMapValues</span><span class="o">(</span><span class="n">value</span> <span class="o">-&gt;</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">toLowerCase</span><span class="o">().</span><span class="na">split</span><span class="o">(</span><span class="s">&quot;\\W+&quot;</span><span class="o">)))</span> <span class="o">.</span><span class="na">flatMapValues</span><span class="o">(</span><span class="n">value</span> <span class="o">-&gt;</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">toLowerCase</span><span class="o">().</span><span class="na">split</span><span class="o">(</span><span class="s">&quot;\\W+&quot;</span><span class="o">)))</span>
<span class="o">.</span><span class="na">groupBy</span><span class="o">((</span><span class="n">key</span><span class="o">,</span> <span class="n">word</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">word</span><span class="o">,</span> <span class="n">Serialized</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">stringSerde</span><span class="o">,</span> <span class="n">stringSerde</span><span class="o">));</span> <span class="o">.</span><span class="na">groupBy</span><span class="o">((</span><span class="n">key</span><span class="o">,</span> <span class="n">word</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">word</span><span class="o">,</span> <span class="n">Grouped</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">stringSerde</span><span class="o">,</span> <span class="n">stringSerde</span><span class="o">));</span>
<span class="c1">// Create a key-value store named &quot;CountsKeyValueStore&quot; for the all-time word counts</span> <span class="c1">// Create a key-value store named &quot;CountsKeyValueStore&quot; for the all-time word counts</span>
<span class="n">groupedByWord</span><span class="o">.</span><span class="na">count</span><span class="o">(</span><span class="n">Materialized</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]&gt;</span><span class="n">as</span><span class="o">(</span><span class="s">&quot;CountsKeyValueStore&quot;</span><span class="o">));</span> <span class="n">groupedByWord</span><span class="o">.</span><span class="na">count</span><span class="o">(</span><span class="n">Materialized</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]&gt;</span><span class="n">as</span><span class="o">(</span><span class="s">&quot;CountsKeyValueStore&quot;</span><span class="o">));</span>
@ -199,7 +199,7 @@
<span class="c1">// Define the processing topology (here: WordCount)</span> <span class="c1">// Define the processing topology (here: WordCount)</span>
<span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">groupedByWord</span> <span class="o">=</span> <span class="n">textLines</span> <span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">groupedByWord</span> <span class="o">=</span> <span class="n">textLines</span>
<span class="o">.</span><span class="na">flatMapValues</span><span class="o">(</span><span class="n">value</span> <span class="o">-&gt;</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">toLowerCase</span><span class="o">().</span><span class="na">split</span><span class="o">(</span><span class="s">&quot;\\W+&quot;</span><span class="o">)))</span> <span class="o">.</span><span class="na">flatMapValues</span><span class="o">(</span><span class="n">value</span> <span class="o">-&gt;</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">toLowerCase</span><span class="o">().</span><span class="na">split</span><span class="o">(</span><span class="s">&quot;\\W+&quot;</span><span class="o">)))</span>
<span class="o">.</span><span class="na">groupBy</span><span class="o">((</span><span class="n">key</span><span class="o">,</span> <span class="n">word</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">word</span><span class="o">,</span> <span class="n">Serialized</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">stringSerde</span><span class="o">,</span> <span class="n">stringSerde</span><span class="o">));</span> <span class="o">.</span><span class="na">groupBy</span><span class="o">((</span><span class="n">key</span><span class="o">,</span> <span class="n">word</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">word</span><span class="o">,</span> <span class="n">Grouped</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">stringSerde</span><span class="o">,</span> <span class="n">stringSerde</span><span class="o">));</span>
<span class="c1">// Create a window state store named &quot;CountsWindowStore&quot; that contains the word counts for every minute</span> <span class="c1">// Create a window state store named &quot;CountsWindowStore&quot; that contains the word counts for every minute</span>
<span class="n">groupedByWord</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span><span class="n">TimeWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(<span class="n">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">60</span><span class="o">)))</span> <span class="n">groupedByWord</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span><span class="n">TimeWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(<span class="n">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">60</span><span class="o">)))</span>
@ -392,7 +392,7 @@ interactive queries</span></p>
<span class="kd">final</span> <span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">groupedByWord</span> <span class="o">=</span> <span class="n">textLines</span> <span class="kd">final</span> <span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">groupedByWord</span> <span class="o">=</span> <span class="n">textLines</span>
<span class="o">.</span><span class="na">flatMapValues</span><span class="o">(</span><span class="n">value</span> <span class="o">-&gt;</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">toLowerCase</span><span class="o">().</span><span class="na">split</span><span class="o">(</span><span class="s">&quot;\\W+&quot;</span><span class="o">)))</span> <span class="o">.</span><span class="na">flatMapValues</span><span class="o">(</span><span class="n">value</span> <span class="o">-&gt;</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">toLowerCase</span><span class="o">().</span><span class="na">split</span><span class="o">(</span><span class="s">&quot;\\W+&quot;</span><span class="o">)))</span>
<span class="o">.</span><span class="na">groupBy</span><span class="o">((</span><span class="n">key</span><span class="o">,</span> <span class="n">word</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">word</span><span class="o">,</span> <span class="n">Serialized</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">stringSerde</span><span class="o">,</span> <span class="n">stringSerde</span><span class="o">));</span> <span class="o">.</span><span class="na">groupBy</span><span class="o">((</span><span class="n">key</span><span class="o">,</span> <span class="n">word</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">word</span><span class="o">,</span> <span class="n">Grouped</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">stringSerde</span><span class="o">,</span> <span class="n">stringSerde</span><span class="o">));</span>
<span class="c1">// This call to `count()` creates a state store named &quot;word-count&quot;.</span> <span class="c1">// This call to `count()` creates a state store named &quot;word-count&quot;.</span>
<span class="c1">// The state store is discoverable and can be queried interactively.</span> <span class="c1">// The state store is discoverable and can be queried interactively.</span>

View File

@ -77,6 +77,19 @@
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-321%3A+Update+TopologyDescription+to+better+represent+Source+and+Sink+Nodes">KIP-321</a>. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-321%3A+Update+TopologyDescription+to+better+represent+Source+and+Sink+Nodes">KIP-321</a>.
</p> </p>
<p>
We've added a new class <code>Grouped</code> and deprecated <code>Serialized</code>. The intent of adding <code>Grouped</code> is the ability to
name repartition topics created when performing aggregation operations. Users can name the potential repartition topic using the
<code>Grouped#as()</code> method which takes a <code>String</code> and is used as part of the repartition topic name. The resulting repartition
topic name will still follow the pattern of <code>${application-id}-&gt;name&lt;-repartition</code>. The <code>Grouped</code> class is now favored over
<code>Serialized</code> in <code>KStream#groupByKey()</code>, <code>KStream#groupBy()</code>, and <code>KTable#groupBy()</code>.
Note that Kafka Streams does not automatically create repartition topics for aggregation operations.
Additionally, we've updated the <code>Joined</code> class with a new method <code>Joined#withName</code>
enabling users to name any repartition topics required for performing Stream/Stream or Stream/Table join. For more details repartition
topic naming, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-372%3A+Naming+Repartition+Topics+for+Joins+and+Grouping">KIP-372</a>.
</p>
<p> <p>
We've added a new config named <code>max.task.idle.ms</code> to allow users specify how to handle out-of-order data within a task that may be processing multiple We've added a new config named <code>max.task.idle.ms</code> to allow users specify how to handle out-of-order data within a task that may be processing multiple
topic-partitions (see <a href="/{{version}}/documentation/streams/core-concepts.html#streams_out_of_ordering">Out-of-Order Handling</a> section for more details). topic-partitions (see <a href="/{{version}}/documentation/streams/core-concepts.html#streams_out_of_ordering">Out-of-Order Handling</a> section for more details).