MINOR: updates docs for KIP-358 (#5796)

Reviewers: Guozhang Wang <guozhang@confluent.io>, Jim Galasyn <jim.galasyn@confluent.io>
This commit is contained in:
Matthias J. Sax 2018-10-15 17:22:03 -07:00 committed by GitHub
parent 4c602e6130
commit 0b417b8331
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 68 additions and 46 deletions

View File

@ -1021,13 +1021,13 @@
<p>The windowed <code class="docutils literal"><span class="pre">aggregate</span></code> turns a <code class="docutils literal"><span class="pre">TimeWindowedKStream&lt;K,</span> <span class="pre">V&gt;</span></code> or <code class="docutils literal"><span class="pre">SessionWindowdKStream&lt;K,</span> <span class="pre">V&gt;</span></code>
into a windowed <code class="docutils literal"><span class="pre">KTable&lt;Windowed&lt;K&gt;,</span> <span class="pre">V&gt;</span></code>.</p>
<p>Several variants of <code class="docutils literal"><span class="pre">aggregate</span></code> exist, see Javadocs for details.</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.util.concurrent.TimeUnit</span><span class="o">;</span>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.time.Duration</span><span class="o">;</span>
<span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">groupedStream</span> <span class="o">=</span> <span class="o">...;</span>
<span class="c1">// Java 8+ examples, using lambda expressions</span>
<span class="c1">// Aggregating with time-based windowing (here: with 5-minute tumbling windows)</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">timeWindowedAggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span><span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">.</span><span class="na">toMillis</span><span class="o">(</span><span class="mi">5</span><span class="o">))</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">timeWindowedAggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">))</span>
<span class="o">.</span><span class="na">aggregate</span><span class="o">(</span>
<span class="o">()</span> <span class="o">-&gt;</span> <span class="mi">0</span><span class="n">L</span><span class="o">,</span> <span class="cm">/* initializer */</span>
<span class="o">(</span><span class="n">aggKey</span><span class="o">,</span> <span class="n">newValue</span><span class="o">,</span> <span class="n">aggValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">aggValue</span> <span class="o">+</span> <span class="n">newValue</span><span class="o">,</span> <span class="cm">/* adder */</span>
@ -1035,7 +1035,7 @@
<span class="o">.</span><span class="na">withValueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">()));</span> <span class="cm">/* serde for aggregate value */</span>
<span class="c1">// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">sessionizedAggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span><span class="n">SessionWindows</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">.</span><span class="na">toMillis</span><span class="o">(</span><span class="mi">5</span><span class="o">)).</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">sessionizedAggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span><span class="n">SessionWindows</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">)).</span>
<span class="n">aggregate</span><span class="o">(</span>
<span class="o">()</span> <span class="o">-&gt;</span> <span class="mi">0</span><span class="n">L</span><span class="o">,</span> <span class="cm">/* initializer */</span>
<span class="o">(</span><span class="n">aggKey</span><span class="o">,</span> <span class="n">newValue</span><span class="o">,</span> <span class="n">aggValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">aggValue</span> <span class="o">+</span> <span class="n">newValue</span><span class="o">,</span> <span class="cm">/* adder */</span>
@ -1046,7 +1046,7 @@
<span class="c1">// Java 7 examples</span>
<span class="c1">// Aggregating with time-based windowing (here: with 5-minute tumbling windows)</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">timeWindowedAggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span><span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">.</span><span class="na">toMillis</span><span class="o">(</span><span class="mi">5</span><span class="o">))</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">timeWindowedAggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">))</span>
<span class="o">.</span><span class="na">aggregate</span><span class="o">(</span>
<span class="k">new</span> <span class="n">Initializer</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;()</span> <span class="o">{</span> <span class="cm">/* initializer */</span>
<span class="nd">@Override</span>
@ -1064,7 +1064,7 @@
<span class="o">.</span><span class="na">withValueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">()));</span>
<span class="c1">// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">sessionizedAggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span><span class="n">SessionWindows</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">.</span><span class="na">toMillis</span><span class="o">(</span><span class="mi">5</span><span class="o">)).</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">sessionizedAggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span><span class="n">SessionWindows</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">Duration</span><span class="o"><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">)).</span>
<span class="n">aggregate</span><span class="o">(</span>
<span class="k">new</span> <span class="n">Initializer</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;()</span> <span class="o">{</span> <span class="cm">/* initializer */</span>
<span class="nd">@Override</span>
@ -1144,17 +1144,17 @@
<p>The windowed <code class="docutils literal"><span class="pre">count</span></code> turns a <code class="docutils literal"><span class="pre">TimeWindowedKStream&lt;K,</span> <span class="pre">V&gt;</span></code> or <code class="docutils literal"><span class="pre">SessionWindowedKStream&lt;K,</span> <span class="pre">V&gt;</span></code>
into a windowed <code class="docutils literal"><span class="pre">KTable&lt;Windowed&lt;K&gt;,</span> <span class="pre">V&gt;</span></code>.</p>
<p>Several variants of <code class="docutils literal"><span class="pre">count</span></code> exist, see Javadocs for details.</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.util.concurrent.TimeUnit</span><span class="o">;</span>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.time.Duration</span><span class="o">;</span>
<span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">groupedStream</span> <span class="o">=</span> <span class="o">...;</span>
<span class="c1">// Counting a KGroupedStream with time-based windowing (here: with 5-minute tumbling windows)</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">aggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span>
<span class="n">TimeWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">.</span><span class="na">toMillis</span><span class="o">(</span><span class="mi">5</span><span class="o">)))</span> <span class="cm">/* time-based window */</span>
<span class="n">TimeWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">)))</span> <span class="cm">/* time-based window */</span>
<span class="o">.</span><span class="na">count</span><span class="o">();</span>
<span class="c1">// Counting a KGroupedStream with session-based windowing (here: with 5-minute inactivity gaps)</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">aggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span>
<span class="n">SessionWindows</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">.</span><span class="na">toMillis</span><span class="o">(</span><span class="mi">5</span><span class="o">)))</span> <span class="cm">/* session window */</span>
<span class="n">SessionWindows</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">)))</span> <span class="cm">/* session window */</span>
<span class="o">.</span><span class="na">count</span><span class="o">();</span>
</pre></div>
</div>
@ -1262,21 +1262,21 @@
<p>The windowed <code class="docutils literal"><span class="pre">reduce</span></code> turns a turns a <code class="docutils literal"><span class="pre">TimeWindowedKStream&lt;K,</span> <span class="pre">V&gt;</span></code> or a <code class="docutils literal"><span class="pre">SessionWindowedKStream&lt;K,</span> <span class="pre">V&gt;</span></code>
into a windowed <code class="docutils literal"><span class="pre">KTable&lt;Windowed&lt;K&gt;,</span> <span class="pre">V&gt;</span></code>.</p>
<p>Several variants of <code class="docutils literal"><span class="pre">reduce</span></code> exist, see Javadocs for details.</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.util.concurrent.TimeUnit</span><span class="o">;</span>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.time.Duration</span><span class="o">;</span>
<span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">groupedStream</span> <span class="o">=</span> <span class="o">...;</span>
<span class="c1">// Java 8+ examples, using lambda expressions</span>
<span class="c1">// Aggregating with time-based windowing (here: with 5-minute tumbling windows)</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">timeWindowedAggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span>
<span class="n">TimeWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">.</span><span class="na">toMillis</span><span class="o">(</span><span class="mi">5</span><span class="o">))</span> <span class="cm">/* time-based window */</span><span class="o">)</span>
<span class="n">TimeWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">))</span> <span class="cm">/* time-based window */</span><span class="o">)</span>
<span class="o">.</span><span class="na">reduce</span><span class="o">(</span>
<span class="o">(</span><span class="n">aggValue</span><span class="o">,</span> <span class="n">newValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">aggValue</span> <span class="o">+</span> <span class="n">newValue</span> <span class="cm">/* adder */</span>
<span class="o">);</span>
<span class="c1">// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">sessionzedAggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span>
<span class="n">SessionWindows</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">.</span><span class="na">toMillis</span><span class="o">(</span><span class="mi">5</span><span class="o">)))</span> <span class="cm">/* session window */</span>
<span class="n">SessionWindows</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">)))</span> <span class="cm">/* session window */</span>
<span class="o">.</span><span class="na">reduce</span><span class="o">(</span>
<span class="o">(</span><span class="n">aggValue</span><span class="o">,</span> <span class="n">newValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">aggValue</span> <span class="o">+</span> <span class="n">newValue</span> <span class="cm">/* adder */</span>
<span class="o">);</span>
@ -1286,7 +1286,7 @@
<span class="c1">// Aggregating with time-based windowing (here: with 5-minute tumbling windows)</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">timeWindowedAggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">..</span><span class="na">windowedBy</span><span class="o">(</span>
<span class="n">TimeWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">.</span><span class="na">toMillis</span><span class="o">(</span><span class="mi">5</span><span class="o">))</span> <span class="cm">/* time-based window */</span><span class="o">)</span>
<span class="n">TimeWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">))</span> <span class="cm">/* time-based window */</span><span class="o">)</span>
<span class="o">.</span><span class="na">reduce</span><span class="o">(</span>
<span class="k">new</span> <span class="n">Reducer</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;()</span> <span class="o">{</span> <span class="cm">/* adder */</span>
<span class="nd">@Override</span>
@ -1297,7 +1297,7 @@
<span class="c1">// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">timeWindowedAggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span>
<span class="n">SessionWindows</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">.</span><span class="na">toMillis</span><span class="o">(</span><span class="mi">5</span><span class="o">)))</span> <span class="cm">/* session window */</span>
<span class="n">SessionWindows</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">)))</span> <span class="cm">/* session window */</span>
<span class="o">.</span><span class="na">reduce</span><span class="o">(</span>
<span class="k">new</span> <span class="n">Reducer</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;()</span> <span class="o">{</span> <span class="cm">/* adder */</span>
<span class="nd">@Override</span>
@ -1761,14 +1761,14 @@
<p><strong>Data must be co-partitioned</strong>: The input data for both sides must be <a class="reference internal" href="#streams-developer-guide-dsl-joins-co-partitioning"><span class="std std-ref">co-partitioned</span></a>.</p>
<p><strong>Causes data re-partitioning of a stream if and only if the stream was marked for re-partitioning (if both are marked, both are re-partitioned).</strong></p>
<p>Several variants of <code class="docutils literal"><span class="pre">join</span></code> exists, see the Javadocs for details.</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.util.concurrent.TimeUnit</span><span class="o">;</span>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.time.Duration</span><span class="o">;</span>
<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">left</span> <span class="o">=</span> <span class="o">...;</span>
<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">right</span> <span class="o">=</span> <span class="o">...;</span>
<span class="c1">// Java 8+ example, using lambda expressions</span>
<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">joined</span> <span class="o">=</span> <span class="n">left</span><span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">right</span><span class="o">,</span>
<span class="o">(</span><span class="n">leftValue</span><span class="o">,</span> <span class="n">rightValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="s">&quot;left=&quot;</span> <span class="o">+</span> <span class="n">leftValue</span> <span class="o">+</span> <span class="s">&quot;, right=&quot;</span> <span class="o">+</span> <span class="n">rightValue</span><span class="o">,</span> <span class="cm">/* ValueJoiner */</span>
<span class="n">JoinWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">.</span><span class="na">toMillis</span><span class="o">(</span><span class="mi">5</span><span class="o">)),</span>
<span class="n">JoinWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">)),</span>
<span class="n">Joined</span><span class="o">.</span><span class="na">with</span><span class="o">(</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key */</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">(),</span> <span class="cm">/* left value */</span>
@ -1783,7 +1783,7 @@
<span class="k">return</span> <span class="s">&quot;left=&quot;</span> <span class="o">+</span> <span class="n">leftValue</span> <span class="o">+</span> <span class="s">&quot;, right=&quot;</span> <span class="o">+</span> <span class="n">rightValue</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">},</span>
<span class="n">JoinWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">.</span><span class="na">toMillis</span><span class="o">(</span><span class="mi">5</span><span class="o">)),</span>
<span class="n">JoinWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">)),</span>
<span class="n">Joined</span><span class="o">.</span><span class="na">with</span><span class="o">(</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key */</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">(),</span> <span class="cm">/* left value */</span>
@ -1820,14 +1820,14 @@
<p><strong>Data must be co-partitioned</strong>: The input data for both sides must be <a class="reference internal" href="#streams-developer-guide-dsl-joins-co-partitioning"><span class="std std-ref">co-partitioned</span></a>.</p>
<p><strong>Causes data re-partitioning of a stream if and only if the stream was marked for re-partitioning (if both are marked, both are re-partitioned).</strong></p>
<p>Several variants of <code class="docutils literal"><span class="pre">leftJoin</span></code> exists, see the Javadocs for details.</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.util.concurrent.TimeUnit</span><span class="o">;</span>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.time.Duration</span><span class="o">;</span>
<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">left</span> <span class="o">=</span> <span class="o">...;</span>
<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">right</span> <span class="o">=</span> <span class="o">...;</span>
<span class="c1">// Java 8+ example, using lambda expressions</span>
<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">joined</span> <span class="o">=</span> <span class="n">left</span><span class="o">.</span><span class="na">leftJoin</span><span class="o">(</span><span class="n">right</span><span class="o">,</span>
<span class="o">(</span><span class="n">leftValue</span><span class="o">,</span> <span class="n">rightValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="s">&quot;left=&quot;</span> <span class="o">+</span> <span class="n">leftValue</span> <span class="o">+</span> <span class="s">&quot;, right=&quot;</span> <span class="o">+</span> <span class="n">rightValue</span><span class="o">,</span> <span class="cm">/* ValueJoiner */</span>
<span class="n">JoinWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">.</span><span class="na">toMillis</span><span class="o">(</span><span class="mi">5</span><span class="o">)),</span>
<span class="n">JoinWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">)),</span>
<span class="n">Joined</span><span class="o">.</span><span class="na">with</span><span class="o">(</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key */</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">(),</span> <span class="cm">/* left value */</span>
@ -1842,7 +1842,7 @@
<span class="k">return</span> <span class="s">&quot;left=&quot;</span> <span class="o">+</span> <span class="n">leftValue</span> <span class="o">+</span> <span class="s">&quot;, right=&quot;</span> <span class="o">+</span> <span class="n">rightValue</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">},</span>
<span class="n">JoinWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">.</span><span class="na">toMillis</span><span class="o">(</span><span class="mi">5</span><span class="o">)),</span>
<span class="n">JoinWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">)),</span>
<span class="n">Joined</span><span class="o">.</span><span class="na">with</span><span class="o">(</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key */</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">(),</span> <span class="cm">/* left value */</span>
@ -1882,14 +1882,14 @@
<p><strong>Data must be co-partitioned</strong>: The input data for both sides must be <a class="reference internal" href="#streams-developer-guide-dsl-joins-co-partitioning"><span class="std std-ref">co-partitioned</span></a>.</p>
<p><strong>Causes data re-partitioning of a stream if and only if the stream was marked for re-partitioning (if both are marked, both are re-partitioned).</strong></p>
<p>Several variants of <code class="docutils literal"><span class="pre">outerJoin</span></code> exists, see the Javadocs for details.</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.util.concurrent.TimeUnit</span><span class="o">;</span>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.time.Duration</span><span class="o">;</span>
<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">left</span> <span class="o">=</span> <span class="o">...;</span>
<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">right</span> <span class="o">=</span> <span class="o">...;</span>
<span class="c1">// Java 8+ example, using lambda expressions</span>
<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">joined</span> <span class="o">=</span> <span class="n">left</span><span class="o">.</span><span class="na">outerJoin</span><span class="o">(</span><span class="n">right</span><span class="o">,</span>
<span class="o">(</span><span class="n">leftValue</span><span class="o">,</span> <span class="n">rightValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="s">&quot;left=&quot;</span> <span class="o">+</span> <span class="n">leftValue</span> <span class="o">+</span> <span class="s">&quot;, right=&quot;</span> <span class="o">+</span> <span class="n">rightValue</span><span class="o">,</span> <span class="cm">/* ValueJoiner */</span>
<span class="n">JoinWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">.</span><span class="na">toMillis</span><span class="o">(</span><span class="mi">5</span><span class="o">)),</span>
<span class="n">JoinWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">)),</span>
<span class="n">Joined</span><span class="o">.</span><span class="na">with</span><span class="o">(</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key */</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">(),</span> <span class="cm">/* left value */</span>
@ -1904,7 +1904,7 @@
<span class="k">return</span> <span class="s">&quot;left=&quot;</span> <span class="o">+</span> <span class="n">leftValue</span> <span class="o">+</span> <span class="s">&quot;, right=&quot;</span> <span class="o">+</span> <span class="n">rightValue</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">},</span>
<span class="n">JoinWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">.</span><span class="na">toMillis</span><span class="o">(</span><span class="mi">5</span><span class="o">)),</span>
<span class="n">JoinWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">)),</span>
<span class="n">Joined</span><span class="o">.</span><span class="na">with</span><span class="o">(</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key */</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">(),</span> <span class="cm">/* left value */</span>
@ -2804,7 +2804,7 @@
Old records in the state store are purged after the specified
<a class="reference internal" href="../core-concepts.html#streams_concepts_windowing"><span class="std std-ref">window retention period</span></a>.
Kafka Streams guarantees to keep a window for at least this specified time; the default value is one day and can be
changed via <code class="docutils literal"><span class="pre">Windows#until()</span></code> and <code class="docutils literal"><span class="pre">SessionWindows#until()</span></code>.</p>
changed via <code class="docutils literal"><span class="pre">Materialized#withRetention()</span></code>.</p>
<p>The DSL supports the following types of windows:</p>
<table border="1" class="docutils">
<colgroup>
@ -2857,12 +2857,12 @@ become t=300,000).</span></p>
windows with a size of 5000ms have predictable window boundaries <code class="docutils literal"><span class="pre">[0;5000),[5000;10000),...</span></code> &#8212; and <strong>not</strong>
<code class="docutils literal"><span class="pre">[1000;6000),[6000;11000),...</span></code> or even something &#8220;random&#8221; like <code class="docutils literal"><span class="pre">[1452;6452),[6452;11452),...</span></code>.</p>
<p>The following code defines a tumbling window with a size of 5 minutes:</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.util.concurrent.TimeUnit</span><span class="o">;</span>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.time.Duration</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.kstream.TimeWindows</span><span class="o">;</span>
<span class="c1">// A tumbling time window with a size of 5 minutes (and, by definition, an implicit</span>
<span class="c1">// advance interval of 5 minutes).</span>
<span class="kt">long</span> <span class="n">windowSizeMs</span> <span class="o">=</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">.</span><span class="na">toMillis</span><span class="o">(</span><span class="mi">5</span><span class="o">);</span> <span class="c1">// 5 * 60 * 1000L</span>
<span class="kt">Duration</span> <span class="n">windowSizeMs</span> <span class="o">=</span> <span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">);</span>
<span class="n">TimeWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">windowSizeMs</span><span class="o">);</span>
<span class="c1">// The above is equivalent to the following code:</span>
@ -2884,13 +2884,13 @@ become t=300,000).</span></p>
terminology in academic literature, where the semantics of sliding windows are different to those of hopping windows.</p>
</div>
<p>The following code defines a hopping window with a size of 5 minutes and an advance interval of 1 minute:</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.util.concurrent.TimeUnit</span><span class="o">;</span>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.time.Duration</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.kstream.TimeWindows</span><span class="o">;</span>
<span class="c1">// A hopping time window with a size of 5 minutes and an advance interval of 1 minute.</span>
<span class="c1">// The window&#39;s name -- the string parameter -- is used to e.g. name the backing state store.</span>
<span class="kt">long</span> <span class="n">windowSizeMs</span> <span class="o">=</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">.</span><span class="na">toMillis</span><span class="o">(</span><span class="mi">5</span><span class="o">);</span> <span class="c1">// 5 * 60 * 1000L</span>
<span class="kt">long</span> <span class="n">advanceMs</span> <span class="o">=</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">.</span><span class="na">toMillis</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span> <span class="c1">// 1 * 60 * 1000L</span>
<span class="kt">Duration</span> <span class="n">windowSizeMs</span> <span class="o">=</span> <span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">);</span>
<span class="kt">Duration</span> <span class="n">advanceMs</span> <span class="o">=</span> <span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span>
<span class="n">TimeWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">windowSizeMs</span><span class="o">).</span><span class="na">advanceBy</span><span class="o">(</span><span class="n">advanceMs</span><span class="o">);</span>
</pre></div>
</div>
@ -2938,11 +2938,11 @@ milliseconds (e.g. t=5 would become t=300,000).</span></p>
simple metrics (e.g. count of user visits on a news website or social platform) to more complex metrics (e.g. customer
conversion funnel and event flows).</p>
<p>The following code defines a session window with an inactivity gap of 5 minutes:</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.util.concurrent.TimeUnit</span><span class="o">;</span>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.time.Duration</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.kstream.SessionWindows</span><span class="o">;</span>
<span class="c1">// A session window with an inactivity gap of 5 minutes.</span>
<span class="n">SessionWindows</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">.</span><span class="na">toMillis</span><span class="o">(</span><span class="mi">5</span><span class="o">));</span>
<span class="n">SessionWindows</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">));</span>
</pre></div>
</div>
<p>Given the previous session window example, here&#8217;s what would happen on an input stream of six records.
@ -3320,8 +3320,8 @@ t=5 (blue), which lead to a merge of sessions and an extension of a session, res
<p>Here's an example of the classic WordCount program that uses the Scala <code class="docutils literal"><span class="pre">StreamsBuilder</span></code> that builds an instance of <code class="docutils literal"><span class="pre">KStream</span></code> which is a wrapper around Java <code class="docutils literal"><span class="pre">KStream</span></code>. Then we reify to a table and get a <code class="docutils literal"><span class="pre">KTable</span></code>, which, again is a wrapper around Java <code class="docutils literal"><span class="pre">KTable</span></code>.</p>
<p>The net result is that the following code is structured just like using the Java API, but with Scala and with far fewer type annotations compared to using the Java API directly from Scala. The difference in type annotation usage is more obvious when given an example. Below is an example WordCount implementation that will be used to demonstrate the differences between the Scala and Java API.</p>
<pre class="brush: scala;">
import java.time.Duration
import java.util.Properties
import java.util.concurrent.TimeUnit
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.scala.ImplicitConversions._
@ -3351,7 +3351,7 @@ object WordCountApplication extends App {
streams.start()
sys.ShutdownHookThread {
streams.close(10, TimeUnit.SECONDS)
streams.close(Duration.ofSeconds(10))
}
}
</pre>

View File

@ -202,7 +202,7 @@
<span class="o">.</span><span class="na">groupBy</span><span class="o">((</span><span class="n">key</span><span class="o">,</span> <span class="n">word</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">word</span><span class="o">,</span> <span class="n">Serialized</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">stringSerde</span><span class="o">,</span> <span class="n">stringSerde</span><span class="o">));</span>
<span class="c1">// Create a window state store named &quot;CountsWindowStore&quot; that contains the word counts for every minute</span>
<span class="n">groupedByWord</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span><span class="n">TimeWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">60000</span><span class="o">))</span>
<span class="n">groupedByWord</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span><span class="n">TimeWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(<span class="n">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">60</span><span class="o">)))</span>
<span class="o">.</span><span class="na">count</span><span class="o">(</span><span class="n">Materialized</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">WindowStore</span><span class="o">&lt;</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]&gt;</span><span class="n">as</span><span class="o">(</span><span class="s">&quot;CountsWindowStore&quot;</span><span class="o">));</span>
</pre></div>
</div>
@ -213,8 +213,8 @@
<span class="c1">// Fetch values for the key &quot;world&quot; for all of the windows available in this application instance.</span>
<span class="c1">// To get *all* available windows we fetch windows from the beginning of time until now.</span>
<span class="kt">long</span> <span class="n">timeFrom</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span class="c1">// beginning of time = oldest available</span>
<span class="kt">long</span> <span class="n">timeTo</span> <span class="o">=</span> <span class="n">System</span><span class="o">.</span><span class="na">currentTimeMillis</span><span class="o">();</span> <span class="c1">// now (in processing-time)</span>
<span class="kt">Instant</span> <span class="n">timeFrom</span> <span class="o">=</span> <span class="na">Instant</span><span class="o">.</span><span class="na">ofEpochMilli<span class="o">(</span><span class="mi">0</span><span class="o">);</span> <span class="c1">// beginning of time = oldest available</span>
<span class="kt">Instant</span> <span class="n">timeTo</span> <span class="o">=</span> <span class="n">Instant</span><span class="o">.</span><span class="na">now</span><span class="o">();</span> <span class="c1">// now (in processing-time)</span>
<span class="n">WindowStoreIterator</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">iterator</span> <span class="o">=</span> <span class="n">windowStore</span><span class="o">.</span><span class="na">fetch</span><span class="o">(</span><span class="s">&quot;world&quot;</span><span class="o">,</span> <span class="n">timeFrom</span><span class="o">,</span> <span class="n">timeTo</span><span class="o">);</span>
<span class="k">while</span> <span class="o">(</span><span class="n">iterator</span><span class="o">.</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span>
<span class="n">KeyValue</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">next</span> <span class="o">=</span> <span class="n">iterator</span><span class="o">.</span><span class="na">next</span><span class="o">();</span>
@ -295,8 +295,8 @@
<span class="kd">private</span> <span class="kd">final</span> <span class="n">StateStoreProvider</span> <span class="n">provider</span><span class="o">;</span>
<span class="kd">public</span> <span class="nf">CustomStoreTypeWrapper</span><span class="o">(</span><span class="kd">final</span> <span class="n">StateStoreProvider</span> <span class="n">provider</span><span class="o">,</span>
<span class="kd">final</span> <span class="n">String</span> <span class="n">storeName</span><span class="o">,</span>
<span class="kd">final</span> <span class="n">QueryableStoreType</span><span class="o">&lt;</span><span class="n">MyReadableCustomStore</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">V</span><span class="o">&gt;&gt;</span> <span class="n">customStoreType</span><span class="o">)</span> <span class="o">{</span>
<span class="kd">final</span> <span class="n">String</span> <span class="n">storeName</span><span class="o">,</span>
<span class="kd">final</span> <span class="n">QueryableStoreType</span><span class="o">&lt;</span><span class="n">MyReadableCustomStore</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">V</span><span class="o">&gt;&gt;</span> <span class="n">customStoreType</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// ... assign fields ...</span>
<span class="o">}</span>

View File

@ -132,8 +132,8 @@
<span class="c1">// retrieve the key-value store named &quot;Counts&quot;</span>
<span class="n">kvStore</span> <span class="o">=</span> <span class="o">(</span><span class="n">KeyValueStore</span><span class="o">)</span> <span class="n">context</span><span class="o">.</span><span class="na">getStateStore</span><span class="o">(</span><span class="s">&quot;Counts&quot;</span><span class="o">);</span>
<span class="c1">// schedule a punctuate() method every 1000 milliseconds based on stream-time</span>
<span class="k">this</span><span class="o">.</span><span class="na">context</span><span class="o">.</span><span class="na">schedule</span><span class="o">(</span><span class="mi">1000</span><span class="o">,</span> <span class="n">PunctuationType</span><span class="o">.</span><span class="na">STREAM_TIME</span><span class="o">,</span> <span class="o">(</span><span class="n">timestamp</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="c1">// schedule a punctuate() method every second based on stream-time</span>
<span class="k">this</span><span class="o">.</span><span class="na">context</span><span class="o">.</span><span class="na">schedule</span><span class="o">(</span><span class="na">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">1000</span><span class="o">),</span> <span class="n">PunctuationType</span><span class="o">.</span><span class="na">STREAM_TIME</span><span class="o">,</span> <span class="o">(</span><span class="n">timestamp</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="n">KeyValueIterator</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">iter</span> <span class="o">=</span> <span class="k">this</span><span class="o">.</span><span class="na">kvStore</span><span class="o">.</span><span class="na">all</span><span class="o">();</span>
<span class="k">while</span> <span class="o">(</span><span class="n">iter</span><span class="o">.</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span>
<span class="n">KeyValue</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">entry</span> <span class="o">=</span> <span class="n">iter</span><span class="o">.</span><span class="na">next</span><span class="o">();</span>
@ -365,11 +365,11 @@
<code class="docutils literal"><span class="pre">headers</span></code>.</p>
<p>Here is an example implementation of how to add a new header to the record:</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">public void process(String key, String value) {</span>
<span class="c1">// add a header to the elements</span>
<span class="n">context()</span><span class="o">.</span><span class="na">headers</span><span class="o">()</span><span class="o">.</span><span class="na">add</span><span class="o">.</span><span class="o">(</span><span class="s">&quot;key&quot;</span><span class="o">,</span> <span class="s">&quot;key&quot;</span>
<span class="o">}</span>
</pre></div>
<span class="c1">// add a header to the elements</span>
<span class="n">context()</span><span class="o">.</span><span class="na">headers</span><span class="o">()</span><span class="o">.</span><span class="na">add</span><span class="o">.</span><span class="o">(</span><span class="s">&quot;key&quot;</span><span class="o">,</span> <span class="s">&quot;key&quot;</span>
<span class="o">}</span>
</pre></div>
</div>
<div class="section" id="connecting-processors-and-state-stores">
<h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a><a class="headerlink" href="#connecting-processors-and-state-stores" title="Permalink to this headline"></a></h2>

View File

@ -91,7 +91,28 @@
</p>
<p>
We added a new serde for UUIDs (<code>Serdes.UUIDSerde</code>) that you can use via <code>Serdes.UUID()</code> (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-206%3A+Add+support+for+UUID+serialization+and+deserialization">KIP-206</a>).
We added a new serde for UUIDs (<code>Serdes.UUIDSerde</code>) that you can use via <code>Serdes.UUID()</code>
(cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-206%3A+Add+support+for+UUID+serialization+and+deserialization">KIP-206</a>).
</p>
<p>
We updated a list of methods that take <code>long</long> arguments as either timestamp (fix point) or duration (time period)
and replaced them with <code>Instant</code> and <code>Duration</code> parameters for improved semantics.
Some old methods base on <code>long</code> are deprecated and users are encouraged to update their code.
<br />
In particular, aggregation windows (hopping/tumbling/unlimited time windows and session windows) as well as join windows now take <code>Duration</code>
arguments to specify window size, hop, and gap parameters.
Also, window sizes and retention times are now specified as <code>Duration</code> type in <code>Stores</code> class.
The <code>Window</code> class has new methods <code>#startTime()</code> and <code>#endTime</code> that return window start/end timestamp as <code>Instant</code>.
For interactive queries, there are new <code>#fetch(...)</code> overloads taking <code>Instant</code> arguments.
Additionally, punctuations are now registerd via <code>ProcessorContext#schedule(Duration interval, ...)</code>.
For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times">KIP-358</a>.
</p>
<p>
We deprecated <code>KafkaStreams#close(...)</code> and replaced it with <code>KafkaStreams#close(Duration)</code> that accepts a single timeout argument
Note: the new <code>#close</code> method has improved (but slightly different) semantics than the old one.
For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times">KIP-358</a>.
</p>
<h3><a id="streams_api_changes_200" href="#streams_api_changes_200">Streams API changes in 2.0.0</a></h3>

View File

@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import java.time.Duration;
import java.util.Map;
import static org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS;
@ -57,7 +58,7 @@ public abstract class Windows<W extends Window> {
* @param durationMs the window retention time in milliseconds
* @return itself
* @throws IllegalArgumentException if {@code durationMs} is negative
* @deprecated since 2.1. Use {@link Materialized#withRetention(long)}
* @deprecated since 2.1. Use {@link Materialized#withRetention(Duration)}
* or directly configure the retention in a store supplier and use {@link Materialized#as(WindowBytesStoreSupplier)}.
*/
@Deprecated